create abid_utils with new ABID type for ArchiveBox IDs

This commit is contained in:
Nick Sweeting 2024-05-13 02:37:48 -07:00
parent f896e5dbeb
commit 4f9f22e024
No known key found for this signature in database
11 changed files with 572 additions and 146 deletions

.gitignore vendored
View file

@ -29,6 +29,7 @@ dist/
# vim

View file

@ -0,0 +1 @@
__package__ = 'abid_utils'

View file

@ -0,0 +1,174 @@
from typing import NamedTuple, Any, Union, Optional
import ulid
import uuid6
import hashlib
from uuid import UUID
from typeid import TypeID # type: ignore[import-untyped]
from datetime import datetime
class ABID(NamedTuple):
prefix: str # e.g. obj_
ts: str # e.g. 01HX9FPYTR
uri: str # e.g. E4A5CCD9
subtype: str # e.g. 01
rand: str # e.g. ZYEBQE
def __getattr__(self, attr: str) -> Any:
return getattr(self.ulid, attr)
def __eq__(self, other: Any) -> bool:
return self.ulid == other.ulid
except AttributeError:
return NotImplemented
def __str__(self) -> str:
return self.prefix + self.suffix
def __len__(self) -> int:
return len(self.prefix + self.suffix)
def parse(cls, buffer: Union[str, UUID, ulid.ULID, TypeID, 'ABID'], prefix=DEFAULT_ABID_PREFIX) -> 'ABID':
buffer = str(buffer)
if '_' in buffer:
prefix, suffix = buffer.split('_')
prefix, suffix = prefix.strip('_'), buffer
assert len(prefix) == ABID_PREFIX_LEN - 1 # length without trailing _
assert len(suffix) == ABID_SUFFIX_LEN
return cls(
def suffix(self):
return ''.join((self.ts, self.uri, self.subtype, self.rand))
def ulid(self) -> ulid.ULID:
return ulid.parse(self.suffix)
def uuid(self) -> UUID:
return self.ulid.uuid
def uuid6(self) -> uuid6.UUID:
return uuid6.UUID(hex=self.uuid.hex)
def typeid(self) -> TypeID:
return TypeID.from_uuid(prefix=self.prefix.strip('_'), suffix=self.uuid6)
def datetime(self) -> datetime:
return self.ulid.timestamp().datetime
def uri_hash(uri: Union[str, bytes]) -> str:
if isinstance(uri, str):
uri = uri.encode('utf-8')
return hashlib.sha256(uri).hexdigest().upper()
def abid_part_from_prefix(prefix: Optional[str]) -> str:
if prefix is None:
return 'obj_'
prefix = prefix.strip('_').lower()
assert len(prefix) == 3
return prefix + '_'
def abid_part_from_uri(uri: str) -> str:
'E4A5CCD9' # takes first 8 characters of sha256(url)
return uri_hash(uri)[:ABID_URI_LEN]
def abid_part_from_ts(ts: Optional[datetime]) -> str:
'01HX9FPYTR' # produces 10 character Timestamp section of ulid based on added date
return str(ulid.from_timestamp(ts) if ts else[:ABID_TS_LEN]
def abid_part_from_subtype(subtype: str) -> str:
Snapshots have 01 type, other objects have other subtypes like wget/media/etc.
Also allows us to change the ulid spec later by putting special sigil values here.
if len(subtype) == ABID_SUBTYPE_LEN:
return subtype
return hashlib.sha256(subtype.encode('utf-8')).hexdigest()[:ABID_SUBTYPE_LEN]
def abid_part_from_rand(rand: Union[str, UUID, None, int]) -> str:
'ZYEBQE' # takes last 6 characters of randomness from existing legacy uuid db field
if rand is None:
# if it's None we generate a new random 6 character hex string
return str([-ABID_RAND_LEN:]
elif isinstance(rand, UUID):
# if it's a uuid we take the last 6 characters of the ULID represation of it
return str(ulid.from_uuid(rand))[-ABID_RAND_LEN:]
elif isinstance(rand, str):
# if it's a string we take the last 6 characters of it verbatim
return rand[-ABID_RAND_LEN:]
elif isinstance(rand, int):
# if it's a BigAutoInteger field we convert it from an int to a 0-padded string
rand_str = str(rand)[-ABID_RAND_LEN:]
padding_needed = ABID_RAND_LEN - len(rand_str)
rand_str = ('0'*padding_needed) + rand_str
return rand_str
raise NotImplementedError('Random component of an ABID can only be computed from a str or UUID')
def abid_from_values(prefix, ts, uri, subtype, rand) -> ABID:
Return a freshly derived ABID (assembled from attrs defined in ABIDModel.abid_*_src).
abid = ABID(
assert abid.ulid and abid.uuid and abid.typeid, f'Failed to calculate {prefix}_ABID for ts={ts} uri={uri} subtyp={subtype} rand={rand}'
return abid

View file

@ -0,0 +1,7 @@
from django.apps import AppConfig
class AbidUtilsConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'abid_utils'

View file

@ -0,0 +1,279 @@
from typing import Any, Dict, Union, List, Set, cast
import ulid
from uuid import UUID
from typeid import TypeID # type: ignore[import-untyped]
from datetime import datetime
from functools import partial
from charidfield import CharIDField # type: ignore[import-untyped]
from django.db import models
from django.db.utils import OperationalError
from django_stubs_ext.db.models import TypedModelMeta
from .abid import (
# Database Field for typeid/ulid style IDs with a prefix, e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ
ABIDField = partial(
help_text="ABID-format identifier for this entity (e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ)"
class ABIDModel(models.Model):
abid_prefix: str = DEFAULT_ABID_PREFIX # e.g. 'tag_'
abid_ts_src = 'None' # e.g. 'self.created'
abid_uri_src = 'None' # e.g. 'self.uri'
abid_subtype_src = 'None' # e.g. 'self.extractor'
abid_rand_src = 'None' # e.g. 'self.uuid' or ''
# abid = ABIDField(prefix=abid_prefix, db_index=True, unique=True, null=True, blank=True, editable=True)
# created = models.DateTimeField(auto_now_add=True, blank=True, null=True, db_index=True)
# modified = models.DateTimeField(auto_now=True, blank=True, null=True, db_index=True)
# created_by = models.ForeignKeyField(get_user_model(), blank=True, null=True, db_index=True)
class Meta(TypedModelMeta):
abstract = True
def save(self, *args: Any, **kwargs: Any) -> None:
if hasattr(self, 'abid'):
self.abid: ABID = self.abid or self.calculate_abid()
print(f'[!] WARNING: {self.__class__.__name__}.abid is not a DB field so ABID will not be persisted!')
self.abid = self.calculate_abid()
super().save(*args, **kwargs)
def calculate_abid(self) -> ABID:
Return a freshly derived ABID (assembled from attrs defined in ABIDModel.abid_*_src).
prefix = self.abid_prefix
ts = eval(self.abid_ts_src)
uri = eval(self.abid_uri_src)
subtype = eval(self.abid_subtype_src)
rand = eval(self.abid_rand_src)
if (not prefix) or prefix == DEFAULT_ABID_PREFIX:
suggested_abid = self.__class__.__name__[:3].lower()
raise Exception(f'{self.__class__.__name__}.abid_prefix must be defined to calculate ABIDs (suggested: {suggested_abid})')
if not ts:
ts = datetime.utcfromtimestamp(0)
print(f'[!] WARNING: Generating ABID with ts=0000000000 placeholder because {self.__class__.__name__}.abid_ts_src={self.abid_ts_src} is unset!', ts.isoformat())
if not uri:
uri = str(self)
print(f'[!] WARNING: Generating ABID with uri=str(self) placeholder because {self.__class__.__name__}.abid_uri_src={self.abid_uri_src} is unset!', uri)
if not subtype:
subtype = self.__class__.__name__
print(f'[!] WARNING: Generating ABID with subtype={subtype} placeholder because {self.__class__.__name__}.abid_subtype_src={self.abid_subtype_src} is unset!', subtype)
if not rand:
rand = getattr(self, 'uuid', None) or getattr(self, 'id', None) or getattr(self, 'pk')
print(f'[!] WARNING: Generating ABID with placeholder because {self.__class__.__name__}.abid_rand_src={self.abid_rand_src} is unset!', rand)
abid = abid_from_values(
assert abid.ulid and abid.uuid and abid.typeid, f'Failed to calculate {prefix}_ABID for {self.__class__.__name__}'
return abid
def ABID(self) -> ABID:
ULIDParts(timestamp='01HX9FPYTR', url='E4A5CCD9', subtype='00', randomness='ZYEBQE')
return ABID.parse(self.abid) if self.abid else self.calculate_abid()
def ULID(self) -> ulid.ULID:
Get a ulid.ULID representation of the object's ABID.
return self.ABID.ulid
def UUID(self) -> UUID:
Get a uuid.UUID (v4) representation of the object's ABID.
return self.ABID.uuid
def TypeID(self) -> TypeID:
Get a typeid.TypeID (stripe-style) representation of the object's ABID.
return self.ABID.typeid
# Django helpers
def find_all_abid_prefixes() -> Dict[str, type[models.Model]]:
Return the mapping of all ABID prefixes to their models.
e.g. {'tag_': core.models.Tag, 'snp_': core.models.Snapshot, ...}
import django.apps
prefix_map = {}
for model in django.apps.apps.get_models():
abid_prefix = getattr(model, 'abid_prefix', None)
if abid_prefix:
prefix_map[abid_prefix] = model
return prefix_map
def find_prefix_for_abid(abid: ABID) -> str:
Find the correct prefix for a given ABID that may have be missing a prefix (slow).
e.g. ABID('obj_01BJQMF54D093DXEAWZ6JYRPAQ') -> 'snp_'
# if existing abid prefix is correct, lookup is easy
model = find_model_from_abid(abid)
if model:
assert issubclass(model, ABIDModel)
return model.abid_prefix
# prefix might be obj_ or missing, fuzzy-search to find any object that matches
return find_obj_from_abid_rand(abid)[0].abid_prefix
def find_model_from_abid_prefix(prefix: str) -> type[ABIDModel] | None:
Return the Django Model that corresponds to a given ABID prefix.
e.g. 'tag_' -> core.models.Tag
prefix = abid_part_from_prefix(prefix)
import django.apps
for model in django.apps.apps.get_models():
if not issubclass(model, ABIDModel): continue # skip non-ABID-enabled models
if not hasattr(model, 'objects'): continue # skip abstract models
if (model.abid_prefix == prefix):
return model
return None
def find_model_from_abid(abid: ABID) -> type[models.Model] | None:
Shortcut for find_model_from_abid_prefix(abid.prefix)
return find_model_from_abid_prefix(abid.prefix)
def find_obj_from_abid_rand(rand: Union[ABID, str], model=None) -> List[ABIDModel]:
Find an object corresponding to an ABID by exhaustively searching using its random suffix (slow).
e.g. 'obj_....................JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
# convert str to ABID if necessary
if isinstance(rand, ABID):
abid: ABID = rand
rand = str(rand)
if len(rand) < ABID_SUFFIX_LEN:
padding_needed = ABID_SUFFIX_LEN - len(rand)
rand = ('0'*padding_needed) + rand
abid = ABID.parse(rand)
import django.apps
partial_matches: List[ABIDModel] = []
models_to_try = cast(Set[type[models.Model]], set(filter(bool, (
# print(abid, abid.rand, abid.uuid, models_to_try)
for model in models_to_try:
if not issubclass(model, ABIDModel): continue # skip Models that arent ABID-enabled
if not hasattr(model, 'objects'): continue # skip abstract Models
assert hasattr(model, 'objects') # force-fix for type hint nit about missing manager
# continue on to try fuzzy searching by randomness portion derived from uuid field
qs = []
if hasattr(model, 'abid'):
qs = model.objects.filter(abid__endswith=abid.rand)
elif hasattr(model, 'uuid'):
qs = model.objects.filter(uuid__endswith=str(abid.uuid)[-ABID_RAND_LEN:])
elif hasattr(model, 'id'):
# NOTE: this only works on SQLite where every column is a string
# other DB backends like postgres dont let you do __endswith if this is a BigAutoInteger field
# try to search for uuid=...-2354352
# try to search for id=...2354352
# try to search for id=2354352
qs = model.objects.filter(
| models.Q(id__endswith=abid.rand)
| models.Q(id__startswith=str(int(abid.rand)) if abid.rand.isdigit() else abid.rand)
for obj in qs:
if obj.calculate_abid() == abid:
# found exact match, no need to keep iterating
return [obj]
except OperationalError as err:
print(f'[!] WARNING: Got error while trying to iterate through QuerySet for {model}:', err, '\n')
return partial_matches
def find_obj_from_abid(abid: ABID, model=None, fuzzy=False) -> Any:
Find an object with a given ABID by filtering possible models for a matching abid/uuid/id (fast).
e.g. 'snp_01BJQMF54D093DXEAWZ6JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
model = model or find_model_from_abid(abid)
assert model, f'Could not find model that could match this ABID type: {abid}'
if hasattr(model, 'abid'):
return model.objects.get(abid__endswith=abid.suffix)
if hasattr(model, 'uuid'):
return model.objects.get(uuid=abid.uuid)
return model.objects.get(id=abid.uuid)
except model.DoesNotExist:
# if the model has an abid field then it shouldve matched, pointless to fuzzy search in that case
if hasattr(model, 'abid') or (not fuzzy):
# continue on to try fuzzy searching by randomness portion derived from uuid field
match_by_rand = find_obj_from_abid_rand(abid, model=model)
if match_by_rand:
if match_by_rand[0].abid_prefix != abid.prefix:
print(f'[!] WARNING: fetched object {match_by_rand} even though prefix {abid.prefix} doesnt match!', abid, '\n')
return match_by_rand
raise model.DoesNotExist

View file

@ -0,0 +1,3 @@
from django.test import TestCase
# Create your tests here.

View file

@ -12,14 +12,16 @@ from signal_webhooks.models import WebhookBase
from django_stubs_ext.db.models import TypedModelMeta
from abid_utils.models import ABIDModel
def generate_secret_token() -> str:
# returns cryptographically secure string with len() == 32
return secrets.token_hex(16)
class APIToken(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
class APIToken(ABIDModel):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=True)
user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE)
token = models.CharField(max_length=32, default=generate_secret_token, unique=True)

View file

@ -1,15 +1,13 @@
__package__ = 'archivebox.core'
import uuid
import ulid
import json
import hashlib
from typeid import TypeID
from typing import Optional, List, Dict
from django_stubs_ext.db.models import TypedModelMeta
import json
from uuid import uuid4
from pathlib import Path
from typing import Optional, List, NamedTuple
from importlib import import_module
from django.db import models
from django.utils.functional import cached_property
@ -19,12 +17,15 @@ from django.urls import reverse
from django.db.models import Case, When, Value, IntegerField
from django.contrib.auth.models import User # noqa
from abid_utils.models import ABIDModel
from ..config import ARCHIVE_DIR, ARCHIVE_DIR_NAME
from ..system import get_dir_size
from ..util import parse_date, base_url, hashurl, domain
from ..util import parse_date, base_url
from ..index.schema import Link
from ..index.html import snapshot_icons
from ..extractors import get_default_archive_methods, ARCHIVE_METHODS_INDEXING_PRECEDENCE, EXTRACTORS
EXTRACTOR_CHOICES = [(extractor_name, extractor_name) for extractor_name in EXTRACTORS.keys()]
@ -33,24 +34,29 @@ STATUS_CHOICES = [
("skipped", "skipped")
JSONField = models.JSONField
except AttributeError:
import jsonfield
JSONField = jsonfield.JSONField
class ULIDParts(NamedTuple):
timestamp: str
url: str
subtype: str
randomness: str
# class BaseModel(models.Model):
# # TODO: migrate all models to a shared base class with all our standard fields and helpers:
# # ulid/created/modified/owner/is_deleted/as_json/from_json/etc.
# #
# # id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
# # ulid = models.CharField(max_length=26, null=True, blank=True, db_index=True, unique=True)
# class Meta(TypedModelMeta):
# abstract = True
class Tag(models.Model):
class Tag(ABIDModel):
Based on django-taggit model
abid_prefix = 'tag_'
abid_ts_src = 'None' # TODO: add created/modified time
abid_uri_src = ''
abid_subtype_src = '"03"'
abid_rand_src = ''
id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
name = models.CharField(unique=True, blank=False, max_length=100)
@ -59,7 +65,7 @@ class Tag(models.Model):
slug = models.SlugField(unique=True, blank=True, max_length=100)
class Meta:
class Meta(TypedModelMeta):
verbose_name = "Tag"
verbose_name_plural = "Tags"
@ -95,8 +101,16 @@ class Tag(models.Model):
return super().save(*args, **kwargs)
class Snapshot(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
class Snapshot(ABIDModel):
abid_prefix = 'snp_'
abid_ts_src = 'self.added'
abid_uri_src = 'self.url'
abid_subtype_src = '"01"'
abid_rand_src = ''
id = models.UUIDField(primary_key=True, default=uuid4, editable=True)
# ulid = models.CharField(max_length=26, null=True, blank=True, db_index=True, unique=True)
url = models.URLField(unique=True, db_index=True)
timestamp = models.CharField(max_length=32, unique=True, db_index=True)
@ -109,37 +123,6 @@ class Snapshot(models.Model):
keys = ('url', 'timestamp', 'title', 'tags', 'updated')
def ulid_from_timestamp(self):
return str(ulid.from_timestamp(self.added))[:10]
def ulid_from_urlhash(self):
return str(ulid.from_randomness(self.url_hash))[10:18]
def ulid_from_type(self):
return '00'
def ulid_from_randomness(self):
return str(ulid.from_uuid([20:]
def ulid_tuple(self) -> ULIDParts:
return ULIDParts(self.ulid_from_timestamp, self.ulid_from_urlhash, self.ulid_from_type, self.ulid_from_randomness)
def ulid(self):
return ulid.parse(''.join(self.ulid_tuple))
def uuid(self):
return self.ulid.uuid
def typeid(self):
return TypeID.from_uuid(prefix='snapshot', suffix=self.ulid.uuid)
def __repr__(self) -> str:
title = self.title or '-'
@ -169,7 +152,7 @@ class Snapshot(models.Model):
from ..index import load_link_details
return load_link_details(self.as_link())
def tags_str(self, nocache=True) -> str:
def tags_str(self, nocache=True) -> str | None:
cache_key = f'{}-{(self.updated or self.added).timestamp()}-tags'
calc_tags_str = lambda: ','.join(self.tags.order_by('name').values_list('name', flat=True))
if nocache:
@ -200,14 +183,9 @@ class Snapshot(models.Model):
return self.as_link().is_archived
def num_outputs(self):
def num_outputs(self) -> int:
return self.archiveresult_set.filter(status='succeeded').count()
def url_hash(self):
# return hashurl(self.url)
return hashlib.sha256(self.url.encode('utf-8')).hexdigest()[:16].upper()
def base_url(self):
return base_url(self.url)
@ -243,7 +221,7 @@ class Snapshot(models.Model):
return None
def headers(self) -> Optional[dict]:
def headers(self) -> Optional[Dict[str, str]]:
return json.loads((Path(self.link_dir) / 'headers.json').read_text(encoding='utf-8').strip())
except Exception:
@ -299,30 +277,31 @@ class Snapshot(models.Model):
def get_storage_dir(self, create=True, symlink=True) -> Path:
date_str = self.added.strftime('%Y%m%d')
domain_str = domain(self.url)
abs_storage_dir = Path(ARCHIVE_DIR) / 'snapshots' / date_str / domain_str / str(self.ulid)
# def get_storage_dir(self, create=True, symlink=True) -> Path:
# date_str = self.added.strftime('%Y%m%d')
# domain_str = domain(self.url)
# abs_storage_dir = Path(ARCHIVE_DIR) / 'snapshots' / date_str / domain_str / str(self.ulid)
if create and not abs_storage_dir.is_dir():
abs_storage_dir.mkdir(parents=True, exist_ok=True)
# if create and not abs_storage_dir.is_dir():
# abs_storage_dir.mkdir(parents=True, exist_ok=True)
if symlink:
Path(ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_id' / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_domain' / domain_str / date_str / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_date' / date_str / domain_str / str(self.ulid),
for link_path in LINK_PATHS:
link_path.parent.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# if symlink:
# Path(ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
# # Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_id' / str(self.ulid),
# Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_date' / date_str / domain_str / str(self.ulid),
# Path(ARCHIVE_DIR).parent / 'index' / 'snapshots_by_domain' / domain_str / date_str / str(self.ulid),
# ]
# for link_path in LINK_PATHS:
# link_path.parent.mkdir(parents=True, exist_ok=True)
# try:
# link_path.symlink_to(abs_storage_dir)
# except FileExistsError:
# link_path.unlink()
# link_path.symlink_to(abs_storage_dir)
# return abs_storage_dir
return abs_storage_dir
class ArchiveResultManager(models.Manager):
def indexable(self, sorted: bool = True):
@ -335,15 +314,21 @@ class ArchiveResultManager(models.Manager):
return qs
class ArchiveResult(models.Model):
class ArchiveResult(ABIDModel):
abid_prefix = 'res_'
abid_ts_src = 'self.snapshot.added'
abid_uri_src = 'self.snapshot.url'
abid_subtype_src = 'self.extractor'
abid_rand_src = 'self.uuid'
id = models.AutoField(primary_key=True, serialize=False, verbose_name='ID')
uuid = models.UUIDField(default=uuid.uuid4, editable=True)
uuid = models.UUIDField(default=uuid4, editable=True)
# ulid = models.CharField(max_length=26, null=True, blank=True, db_index=True, unique=True)
snapshot = models.ForeignKey(Snapshot, on_delete=models.CASCADE)
extractor = models.CharField(choices=EXTRACTOR_CHOICES, max_length=32)
cmd = JSONField()
cmd = models.JSONField()
pwd = models.CharField(max_length=256)
cmd_version = models.CharField(max_length=128, default=None, null=True, blank=True)
output = models.CharField(max_length=1024)
@ -353,6 +338,9 @@ class ArchiveResult(models.Model):
objects = ArchiveResultManager()
class Meta(TypedModelMeta):
verbose_name = 'Result'
def __str__(self):
return self.extractor
@ -360,40 +348,6 @@ class ArchiveResult(models.Model):
def snapshot_dir(self):
return Path(self.snapshot.link_dir)
def ulid_from_timestamp(self):
return self.snapshot.ulid_from_timestamp
def ulid_from_urlhash(self):
return self.snapshot.ulid_from_urlhash
def ulid_from_snapshot(self):
return str(self.snapshot.ulid)[:18]
def ulid_from_type(self):
return hashlib.sha256(self.extractor.encode('utf-8')).hexdigest()[:2]
def ulid_from_randomness(self):
return str(ulid.from_uuid(self.uuid))[20:]
def ulid_tuple(self) -> ULIDParts:
return ULIDParts(self.ulid_from_timestamp, self.ulid_from_urlhash, self.ulid_from_type, self.ulid_from_randomness)
def ulid(self):
final_ulid = ulid.parse(''.join(self.ulid_tuple))
# TODO: migrate self.uuid to match this new uuid
# self.uuid = final_ulid.uuid
return final_ulid
def typeid(self):
return TypeID.from_uuid(prefix='result', suffix=self.ulid.uuid)
def extractor_module(self):
@ -422,31 +376,31 @@ class ArchiveResult(models.Model):
return Path(self.output_path()).exists()
def get_storage_dir(self, create=True, symlink=True):
date_str = self.snapshot.added.strftime('%Y%m%d')
domain_str = domain(self.snapshot.url)
abs_storage_dir = Path(ARCHIVE_DIR) / 'results' / date_str / domain_str / str(self.ulid)
# def get_storage_dir(self, create=True, symlink=True):
# date_str = self.snapshot.added.strftime('%Y%m%d')
# domain_str = domain(self.snapshot.url)
# abs_storage_dir = Path(ARCHIVE_DIR) / 'results' / date_str / domain_str / self.extractor / str(self.ulid)
if create and not abs_storage_dir.is_dir():
abs_storage_dir.mkdir(parents=True, exist_ok=True)
# if create and not abs_storage_dir.is_dir():
# abs_storage_dir.mkdir(parents=True, exist_ok=True)
if symlink:
Path(ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'results_by_id' / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'results_by_date' / date_str / domain_str / self.extractor / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'results_by_domain' / domain_str / date_str / self.extractor / str(self.ulid),
Path(ARCHIVE_DIR).parent / 'index' / 'results_by_type' / self.extractor / date_str / domain_str / str(self.ulid),
for link_path in LINK_PATHS:
link_path.parent.mkdir(parents=True, exist_ok=True)
except FileExistsError:
# if symlink:
# Path(ARCHIVE_DIR).parent / 'index' / 'all_by_id' / str(self.ulid),
# # Path(ARCHIVE_DIR).parent / 'index' / 'results_by_id' / str(self.ulid),
# # Path(ARCHIVE_DIR).parent / 'index' / 'results_by_date' / date_str / domain_str / self.extractor / str(self.ulid),
# Path(ARCHIVE_DIR).parent / 'index' / 'results_by_domain' / domain_str / date_str / self.extractor / str(self.ulid),
# Path(ARCHIVE_DIR).parent / 'index' / 'results_by_type' / self.extractor / date_str / domain_str / str(self.ulid),
# ]
# for link_path in LINK_PATHS:
# link_path.parent.mkdir(parents=True, exist_ok=True)
# try:
# link_path.symlink_to(abs_storage_dir)
# except FileExistsError:
# link_path.unlink()
# link_path.symlink_to(abs_storage_dir)
return abs_storage_dir
# return abs_storage_dir
def symlink_index(self, create=True):
abs_result_dir = self.get_storage_dir(create=create)
# def symlink_index(self, create=True):
# abs_result_dir = self.get_storage_dir(create=create)

View file

@ -62,6 +62,7 @@ INSTALLED_APPS = [
@ -258,6 +259,9 @@ DATABASES = {
# as much as I'd love this to be a UUID or ULID field, it's not supported yet as of Django 5.0
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
'default': {'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'cache'},

View file

@ -39,6 +39,7 @@ dependencies = [
homepage = ""