ArchiveBox/archivebox/index/sql.py
Nick Sweeting 9b1659c72f
Some checks failed
Build GitHub Pages website / build (push) Has been cancelled
Run linters / lint (push) Has been cancelled
Build Debian package / build (push) Has been cancelled
Build Docker image / buildx (push) Has been cancelled
Build Homebrew package / build (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
Build Pip package / build (push) Has been cancelled
Run tests / python_tests (ubuntu-22.04, 3.11) (push) Has been cancelled
Run tests / docker_tests (push) Has been cancelled
Build GitHub Pages website / deploy (push) Has been cancelled
make created_by_id autoapply to any ArchiveResults created under Snapshot
2024-08-20 19:43:07 -07:00

164 lines
5.4 KiB
Python

__package__ = 'archivebox.index'
import re
from io import StringIO
from pathlib import Path
from typing import List, Tuple, Iterator
from django.db.models import QuerySet
from django.db import transaction
from .schema import Link
from ..util import enforce_types, parse_date
from ..config import (
OUTPUT_DIR,
TAG_SEPARATOR_PATTERN,
)
### Main Links Index
@enforce_types
def parse_sql_main_index(out_dir: Path=OUTPUT_DIR) -> Iterator[Link]:
from core.models import Snapshot
return (
Link.from_json(page.as_json(*Snapshot.keys))
for page in Snapshot.objects.all()
)
@enforce_types
def remove_from_sql_main_index(snapshots: QuerySet, atomic: bool=False, out_dir: Path=OUTPUT_DIR) -> None:
if atomic:
with transaction.atomic():
return snapshots.delete()
return snapshots.delete()
@enforce_types
def write_link_to_sql_index(link: Link, created_by_id: int | None=None):
from core.models import Snapshot, ArchiveResult
info = {k: v for k, v in link._asdict().items() if k in Snapshot.keys}
info['created_by_id'] = created_by_id
tag_list = list(dict.fromkeys(
tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')
))
info.pop('tags')
try:
snapshot = Snapshot.objects.get(url=link.url)
info["timestamp"] = snapshot.timestamp
except Snapshot.DoesNotExist:
while Snapshot.objects.filter(timestamp=info["timestamp"]).exists():
info["timestamp"] = str(float(info["timestamp"]) + 1.0)
snapshot, _ = Snapshot.objects.update_or_create(url=link.url, defaults=info)
snapshot.save_tags(tag_list)
for extractor, entries in link.history.items():
for entry in entries:
if isinstance(entry, dict):
result, _ = ArchiveResult.objects.get_or_create(
snapshot_id=snapshot.pk,
extractor=extractor,
start_ts=parse_date(entry['start_ts']),
defaults={
'end_ts': parse_date(entry['end_ts']),
'cmd': entry['cmd'],
'output': entry['output'],
'cmd_version': entry.get('cmd_version') or 'unknown',
'pwd': entry['pwd'],
'status': entry['status'],
'created_by_id': snapshot.created_by_id,
}
)
else:
result, _ = ArchiveResult.objects.update_or_create(
snapshot_id=snapshot.pk,
extractor=extractor,
start_ts=parse_date(entry.start_ts),
defaults={
'end_ts': parse_date(entry.end_ts),
'cmd': entry.cmd,
'output': entry.output,
'cmd_version': entry.cmd_version or 'unknown',
'pwd': entry.pwd,
'status': entry.status,
'created_by_id': snapshot.created_by_id,
}
)
return snapshot
@enforce_types
def write_sql_main_index(links: List[Link], out_dir: Path=OUTPUT_DIR, created_by_id: int | None=None) -> None:
for link in links:
# with transaction.atomic():
# write_link_to_sql_index(link)
write_link_to_sql_index(link, created_by_id=created_by_id)
@enforce_types
def write_sql_link_details(link: Link, out_dir: Path=OUTPUT_DIR, created_by_id: int | None=None) -> None:
from core.models import Snapshot
# with transaction.atomic():
# try:
# snap = Snapshot.objects.get(url=link.url)
# except Snapshot.DoesNotExist:
# snap = write_link_to_sql_index(link)
# snap.title = link.title
try:
snap = Snapshot.objects.get(url=link.url)
except Snapshot.DoesNotExist:
snap = write_link_to_sql_index(link, created_by_id=created_by_id)
snap.title = link.title
tag_list = list(
{tag.strip() for tag in re.split(TAG_SEPARATOR_PATTERN, link.tags or '')}
| set(snap.tags.values_list('name', flat=True))
)
snap.save()
snap.save_tags(tag_list)
@enforce_types
def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
from django.core.management import call_command
out = StringIO()
call_command("showmigrations", list=True, stdout=out)
out.seek(0)
migrations = []
for line in out.readlines():
if line.strip() and ']' in line:
status_str, name_str = line.strip().split(']', 1)
is_applied = 'X' in status_str
migration_name = name_str.strip()
migrations.append((is_applied, migration_name))
return migrations
@enforce_types
def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
from django.core.management import call_command
null, out = StringIO(), StringIO()
try:
call_command("makemigrations", interactive=False, stdout=null)
except Exception as e:
print('[!] Failed to create some migrations. Please open an issue and copy paste this output for help: {}'.format(e))
print()
call_command("migrate", interactive=False, stdout=out)
out.seek(0)
return [line.strip() for line in out.readlines() if line.strip()]
@enforce_types
def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:
from django.contrib.auth.models import User
return User.objects.filter(is_superuser=True)