ArchiveBox/archivebox/abid_utils/models.py
2024-08-20 19:26:40 -07:00

344 lines
13 KiB
Python

"""
This file provides the Django ABIDField and ABIDModel base model to inherit from.
It implements the ArchiveBox ID (ABID) interfaces including abid_values, generate_abid, .abid, .uuid, .id.
"""
from typing import Any, Dict, Union, List, Set, NamedTuple, cast
from ulid import ULID
from uuid import uuid4, UUID
from typeid import TypeID # type: ignore[import-untyped]
from datetime import datetime
from functools import partial
from charidfield import CharIDField # type: ignore[import-untyped]
from django.conf import settings
from django.db import models
from django.utils import timezone
from django.db.utils import OperationalError
from django.contrib.auth import get_user_model
from django_stubs_ext.db.models import TypedModelMeta
from .abid import (
ABID,
ABID_LEN,
ABID_RAND_LEN,
ABID_SUFFIX_LEN,
DEFAULT_ABID_PREFIX,
DEFAULT_ABID_URI_SALT,
abid_part_from_prefix,
abid_from_values
)
####################################################
# Database Field for typeid/ulid style IDs with a prefix, e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ
ABIDField = partial(
CharIDField,
max_length=ABID_LEN,
help_text="ABID-format identifier for this entity (e.g. snp_01BJQMF54D093DXEAWZ6JYRPAQ)",
default=None,
null=True,
blank=True,
db_index=True,
unique=True,
)
def get_or_create_system_user_pk(username='system'):
"""Get or create a system user with is_superuser=True to be the default owner for new DB rows"""
User = get_user_model()
# if only one user exists total, return that user
if User.objects.filter(is_superuser=True).count() == 1:
return User.objects.filter(is_superuser=True).values_list('pk', flat=True)[0]
# otherwise, create a dedicated "system" user
user, created = User.objects.get_or_create(username=username, is_staff=True, is_superuser=True, defaults={'email': '', 'password': ''})
return user.pk
class ABIDModel(models.Model):
"""
Abstract Base Model for other models to depend on. Provides ArchiveBox ID (ABID) interface.
"""
abid_prefix: str = DEFAULT_ABID_PREFIX # e.g. 'tag_'
abid_ts_src = 'None' # e.g. 'self.created'
abid_uri_src = 'None' # e.g. 'self.uri'
abid_subtype_src = 'None' # e.g. 'self.extractor'
abid_rand_src = 'None' # e.g. 'self.uuid' or 'self.id'
# id = models.UUIDField(primary_key=True, default=uuid4, editable=True)
# uuid = models.UUIDField(blank=True, null=True, editable=True, unique=True)
abid = ABIDField(prefix=abid_prefix)
created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.CASCADE, default=get_or_create_system_user_pk)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
class Meta(TypedModelMeta):
abstract = True
def save(self, *args: Any, **kwargs: Any) -> None:
# when first creating a row, self.ABID is the source of truth
# overwrite default prefilled self.id & self.abid with generated self.ABID value
if self._state.adding or not self.id:
self.id = self.ABID.uuid
if self._state.adding or not self.abid:
self.abid = str(self.ABID)
super().save(*args, **kwargs)
assert str(self.id) == str(self.ABID.uuid), f'self.id {self.id} does not match self.ABID {self.ABID.uuid}'
assert str(self.abid) == str(self.ABID), f'self.abid {self.id} does not match self.ABID {self.ABID.uuid}'
@property
def abid_values(self) -> Dict[str, Any]:
return {
'prefix': self.abid_prefix,
'ts': eval(self.abid_ts_src),
'uri': eval(self.abid_uri_src),
'subtype': eval(self.abid_subtype_src),
'rand': eval(self.abid_rand_src),
}
def generate_abid(self) -> ABID:
"""
Return a freshly derived ABID (assembled from attrs defined in ABIDModel.abid_*_src).
"""
prefix, ts, uri, subtype, rand = self.abid_values.values()
if (not prefix) or prefix == DEFAULT_ABID_PREFIX:
suggested_abid = self.__class__.__name__[:3].lower()
raise Exception(f'{self.__class__.__name__}.abid_prefix must be defined to calculate ABIDs (suggested: {suggested_abid})')
if not ts:
# default to unix epoch with 00:00:00 UTC
ts = datetime.fromtimestamp(0, timezone.utc) # equivalent to: ts = datetime.utcfromtimestamp(0)
print(f'[!] WARNING: Generating ABID with ts=0000000000 placeholder because {self.__class__.__name__}.abid_ts_src={self.abid_ts_src} is unset!', ts.isoformat())
if not uri:
uri = str(self)
print(f'[!] WARNING: Generating ABID with uri=str(self) placeholder because {self.__class__.__name__}.abid_uri_src={self.abid_uri_src} is unset!', uri)
if not subtype:
subtype = self.__class__.__name__
print(f'[!] WARNING: Generating ABID with subtype={subtype} placeholder because {self.__class__.__name__}.abid_subtype_src={self.abid_subtype_src} is unset!', subtype)
if not rand:
rand = getattr(self, 'uuid', None) or getattr(self, 'id', None) or getattr(self, 'pk')
print(f'[!] WARNING: Generating ABID with rand=self.id placeholder because {self.__class__.__name__}.abid_rand_src={self.abid_rand_src} is unset!', rand)
abid = abid_from_values(
prefix=prefix,
ts=ts,
uri=uri,
subtype=subtype,
rand=rand,
salt=DEFAULT_ABID_URI_SALT,
)
assert abid.ulid and abid.uuid and abid.typeid, f'Failed to calculate {prefix}_ABID for {self.__class__.__name__}'
return abid
@property
def ABID(self) -> ABID:
"""
ULIDParts(timestamp='01HX9FPYTR', url='E4A5CCD9', subtype='00', randomness='ZYEBQE')
"""
# if object is not yet saved to DB, always generate fresh ABID from values
if self._state.adding:
return self.generate_abid()
# otherwise DB is single source of truth, load ABID from existing db pk
abid: ABID | None = None
try:
abid = abid or ABID.parse(self.pk)
except Exception:
pass
try:
abid = abid or ABID.parse(self.id)
except Exception:
pass
try:
abid = abid or ABID.parse(cast(str, self.abid))
except Exception:
pass
abid = abid or self.generate_abid()
return abid
@property
def ULID(self) -> ULID:
"""
Get a ulid.ULID representation of the object's ABID.
"""
return self.ABID.ulid
@property
def UUID(self) -> UUID:
"""
Get a uuid.UUID (v4) representation of the object's ABID.
"""
return self.ABID.uuid
@property
def TypeID(self) -> TypeID:
"""
Get a typeid.TypeID (stripe-style) representation of the object's ABID.
"""
return self.ABID.typeid
####################################################
# Django helpers
def find_all_abid_prefixes() -> Dict[str, type[models.Model]]:
"""
Return the mapping of all ABID prefixes to their models.
e.g. {'tag_': core.models.Tag, 'snp_': core.models.Snapshot, ...}
"""
import django.apps
prefix_map = {}
for model in django.apps.apps.get_models():
abid_prefix = getattr(model, 'abid_prefix', None)
if abid_prefix:
prefix_map[abid_prefix] = model
return prefix_map
def find_prefix_for_abid(abid: ABID) -> str:
"""
Find the correct prefix for a given ABID that may have be missing a prefix (slow).
e.g. ABID('obj_01BJQMF54D093DXEAWZ6JYRPAQ') -> 'snp_'
"""
# if existing abid prefix is correct, lookup is easy
model = find_model_from_abid(abid)
if model:
assert issubclass(model, ABIDModel)
return model.abid_prefix
# prefix might be obj_ or missing, fuzzy-search to find any object that matches
return find_obj_from_abid_rand(abid)[0].abid_prefix
def find_model_from_abid_prefix(prefix: str) -> type[ABIDModel] | None:
"""
Return the Django Model that corresponds to a given ABID prefix.
e.g. 'tag_' -> core.models.Tag
"""
prefix = abid_part_from_prefix(prefix)
import django.apps
for model in django.apps.apps.get_models():
if not issubclass(model, ABIDModel): continue # skip non-ABID-enabled models
if not hasattr(model, 'objects'): continue # skip abstract models
if (model.abid_prefix == prefix):
return model
return None
def find_model_from_abid(abid: ABID) -> type[models.Model] | None:
"""
Shortcut for find_model_from_abid_prefix(abid.prefix)
"""
return find_model_from_abid_prefix(abid.prefix)
def find_obj_from_abid_rand(rand: Union[ABID, str], model=None) -> List[ABIDModel]:
"""
Find an object corresponding to an ABID by exhaustively searching using its random suffix (slow).
e.g. 'obj_....................JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
"""
# convert str to ABID if necessary
if isinstance(rand, ABID):
abid: ABID = rand
else:
rand = str(rand)
if len(rand) < ABID_SUFFIX_LEN:
padding_needed = ABID_SUFFIX_LEN - len(rand)
rand = ('0'*padding_needed) + rand
abid = ABID.parse(rand)
import django.apps
partial_matches: List[ABIDModel] = []
models_to_try = cast(Set[type[models.Model]], set(filter(bool, (
model,
find_model_from_abid(abid),
*django.apps.apps.get_models(),
))))
# print(abid, abid.rand, abid.uuid, models_to_try)
for model in models_to_try:
if not issubclass(model, ABIDModel): continue # skip Models that arent ABID-enabled
if not hasattr(model, 'objects'): continue # skip abstract Models
assert hasattr(model, 'objects') # force-fix for type hint nit about missing manager https://github.com/typeddjango/django-stubs/issues/1684
# continue on to try fuzzy searching by randomness portion derived from uuid field
try:
qs = []
if hasattr(model, 'abid'):
qs = model.objects.filter(abid__endswith=abid.rand)
elif hasattr(model, 'uuid'):
qs = model.objects.filter(uuid__endswith=str(abid.uuid)[-ABID_RAND_LEN:])
elif hasattr(model, 'id'):
# NOTE: this only works on SQLite where every column is a string
# other DB backends like postgres dont let you do __endswith if this is a BigAutoInteger field
# try to search for uuid=...-2354352
# try to search for id=...2354352
# try to search for id=2354352
qs = model.objects.filter(
models.Q(id__endswith=str(abid.uuid)[-ABID_RAND_LEN:])
| models.Q(id__endswith=abid.rand)
| models.Q(id__startswith=str(int(abid.rand)) if abid.rand.isdigit() else abid.rand)
)
for obj in qs:
if obj.generate_abid() == abid:
# found exact match, no need to keep iterating
return [obj]
partial_matches.append(obj)
except OperationalError as err:
print(f'[!] WARNING: Got error while trying to iterate through QuerySet for {model}:', err, '\n')
return partial_matches
def find_obj_from_abid(abid: ABID, model=None, fuzzy=False) -> Any:
"""
Find an object with a given ABID by filtering possible models for a matching abid/uuid/id (fast).
e.g. 'snp_01BJQMF54D093DXEAWZ6JYRPAQ' -> Snapshot('snp_01BJQMF54D093DXEAWZ6JYRPAQ')
"""
model = model or find_model_from_abid(abid)
assert model, f'Could not find model that could match this ABID type: {abid}'
try:
if hasattr(model, 'abid'):
return model.objects.get(abid__endswith=abid.suffix)
if hasattr(model, 'uuid'):
return model.objects.get(uuid=abid.uuid)
return model.objects.get(id=abid.uuid)
except model.DoesNotExist:
# if the model has an abid field then it shouldve matched, pointless to fuzzy search in that case
if hasattr(model, 'abid') or (not fuzzy):
raise
# continue on to try fuzzy searching by randomness portion derived from uuid field
match_by_rand = find_obj_from_abid_rand(abid, model=model)
if match_by_rand:
if match_by_rand[0].abid_prefix != abid.prefix:
print(f'[!] WARNING: fetched object {match_by_rand} even though prefix {abid.prefix} doesnt match!', abid, '\n')
return match_by_rand
raise model.DoesNotExist