add django_huey, huey_monitor, and replace Threads with huey tasks

This commit is contained in:
Nick Sweeting 2024-09-10 00:04:39 -07:00
parent 4df90fbb40
commit 60154fba5f
No known key found for this signature in database
19 changed files with 850 additions and 92 deletions

View file

@ -12,6 +12,7 @@ from plugantic.base_plugin import BasePlugin
from plugantic.base_configset import BaseConfigSet, ConfigSectionName
from plugantic.base_binary import BaseBinary, env
from plugantic.base_extractor import BaseExtractor
from plugantic.base_queue import BaseQueue
from plugantic.base_hook import BaseHook
# Depends on Other Plugins:
@ -95,6 +96,13 @@ class SinglefileExtractor(BaseExtractor):
SINGLEFILE_BINARY = SinglefileBinary()
SINGLEFILE_EXTRACTOR = SinglefileExtractor()
class SinglefileQueue(BaseQueue):
name: str = 'singlefile'
binaries: List[InstanceOf[BaseBinary]] = [SINGLEFILE_BINARY]
SINGLEFILE_QUEUE = SinglefileQueue()
class SinglefilePlugin(BasePlugin):
app_label: str ='singlefile'
verbose_name: str = 'SingleFile'

View file

@ -0,0 +1,40 @@
__package__ = 'archivebox.queues'
import time
from django.core.cache import cache
from huey import crontab
from django_huey import db_task, on_startup, db_periodic_task
from huey_monitor.models import TaskModel
from huey_monitor.tqdm import ProcessInfo
@db_task(queue="singlefile", context=True)
def extract(url, out_dir, config, task=None, parent_task_id=None):
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
process_info = ProcessInfo(task, desc="extract_singlefile", parent_task_id=parent_task_id, total=1)
time.sleep(5)
process_info.update(n=1)
return {'output': 'singlefile.html', 'status': 'succeeded'}
# @on_startup(queue='singlefile')
# def start_singlefile_queue():
# print("[+] Starting singlefile worker...")
# update_version.call_local()
# @db_periodic_task(crontab(minute='*/5'), queue='singlefile')
# def update_version():
# print('[*] Updating singlefile version... 5 minute interval')
# from django.conf import settings
# bin = settings.BINARIES.SinglefileBinary.load()
# if bin.version:
# cache.set(f"bin:abspath:{bin.name}", bin.abspath)
# cache.set(f"bin:version:{bin.name}:{bin.abspath}", bin.version)
# print('[√] Updated singlefile version:', bin.version, bin.abspath)

View file

@ -37,7 +37,7 @@ is_valid_cli_module = lambda module, subcommand: (
)
IGNORED_BG_THREADS = ('MainThread', 'ThreadPoolExecutor', 'IPythonHistorySavingThread') # threads we dont have to wait for before exiting
IGNORED_BG_THREADS = ('MainThread', 'ThreadPoolExecutor', 'IPythonHistorySavingThread', 'Scheduler') # threads we dont have to wait for before exiting
def wait_for_bg_threads_to_exit(thread_names: Iterable[str]=(), ignore_names: Iterable[str]=IGNORED_BG_THREADS, timeout: int=60) -> int:

View file

@ -30,6 +30,7 @@ from core.models import Snapshot, ArchiveResult, Tag
from core.mixins import SearchResultsAdminMixin
from api.models import APIToken
from abid_utils.admin import ABIDModelAdmin
from queues.tasks import bg_archive_links, bg_add
from index.html import snapshot_icons
from logging_util import printable_filesize
@ -137,6 +138,8 @@ class CustomUserAdmin(UserAdmin):
) + f'<br/><a href="/admin/api/outboundwebhook/?created_by__id__exact={obj.pk}">{total_count} total records...<a>')
archivebox_admin = ArchiveBoxAdmin()
archivebox_admin.register(get_user_model(), CustomUserAdmin)
archivebox_admin.disable_action('delete_selected')
@ -155,6 +158,28 @@ archivebox_admin.get_admin_data_urls = get_admin_data_urls.__get__(archivebox_ad
archivebox_admin.get_urls = get_urls(archivebox_admin.get_urls).__get__(archivebox_admin, ArchiveBoxAdmin)
from huey_monitor.apps import HueyMonitorConfig
HueyMonitorConfig.verbose_name = 'Background Workers'
from huey_monitor.admin import TaskModel, TaskModelAdmin, SignalInfoModel, SignalInfoModelAdmin
archivebox_admin.register(SignalInfoModel, SignalInfoModelAdmin)
class CustomTaskModelAdmin(TaskModelAdmin):
actions = ["delete_selected"]
def has_delete_permission(self, request, obj=None):
codename = get_permission_codename("delete", self.opts)
return request.user.has_perm("%s.%s" % (self.opts.app_label, codename))
archivebox_admin.register(TaskModel, CustomTaskModelAdmin)
def result_url(result: TaskModel) -> str:
url = reverse("admin:huey_monitor_taskmodel_change", args=[str(result.id)])
return format_html('<a href="{url}" class="fade-in-progress-url">See progress...</a>'.format(url=url))
class AccelleratedPaginator(Paginator):
"""
Accellerated Pagniator ignores DISTINCT when counting total number of rows.
@ -515,65 +540,53 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
archive_links(links, overwrite=True, methods=('title','favicon'), out_dir=CONFIG.OUTPUT_DIR)
messages.success(request, f"Title and favicon have been fetched and saved for {len(links)} URLs.")
else:
# otherwise run in a bg thread
bg_thread = threading.Thread(
target=archive_links,
args=(links,),
kwargs={"overwrite": True, "methods": ['title', 'favicon'], "out_dir": CONFIG.OUTPUT_DIR},
# otherwise run in a background worker
result = bg_archive_links((links,), kwargs={"overwrite": True, "methods": ["title", "favicon"], "out_dir": CONFIG.OUTPUT_DIR})
messages.success(
request,
mark_safe(f"Title and favicon are updating in the background for {len(links)} URLs. {result_url(result)}"),
)
bg_thread.setDaemon(True)
bg_thread.start()
messages.success(request, f"Title and favicon are updating in the background for {len(links)} URLs. (refresh in a few minutes to see results)")
@admin.action(
description="⬇️ Get Missing"
)
def update_snapshots(self, request, queryset):
links = [snapshot.as_link() for snapshot in queryset]
bg_thread = threading.Thread(
target=archive_links,
args=(links,),
kwargs={"overwrite": False, "out_dir": CONFIG.OUTPUT_DIR},
)
bg_thread.setDaemon(True)
bg_thread.start()
result = bg_archive_links((links,), kwargs={"overwrite": False, "out_dir": CONFIG.OUTPUT_DIR})
messages.success(
request, f"Re-trying any previously failed methods for {len(links)} URLs in the background. (refresh in a few minutes to see results)"
request,
mark_safe(f"Re-trying any previously failed methods for {len(links)} URLs in the background. {result_url(result)}"),
)
@admin.action(
description="📑 Archive again"
description="🆕 Archive Again"
)
def resnapshot_snapshot(self, request, queryset):
for snapshot in queryset:
timestamp = timezone.now().isoformat('T', 'seconds')
new_url = snapshot.url.split('#')[0] + f'#{timestamp}'
bg_thread = threading.Thread(target=add, args=(new_url,), kwargs={'tag': snapshot.tags_str()})
bg_thread.setDaemon(True)
bg_thread.start()
result = bg_add({'urls': new_url, 'tag': snapshot.tags_str()})
messages.success(
request,
f"Creating new fresh snapshots for {len(queryset.count())} URLs in the background. (refresh in a few minutes to see results)",
mark_safe(f"Creating new fresh snapshots for {queryset.count()} URLs in the background. {result_url(result)}"),
)
@admin.action(
description=" Redo"
description="🔄 Redo"
)
def overwrite_snapshots(self, request, queryset):
links = [snapshot.as_link() for snapshot in queryset]
bg_thread = threading.Thread(
target=archive_links,
args=(links,),
kwargs={"overwrite": True, "out_dir": CONFIG.OUTPUT_DIR},
)
bg_thread.setDaemon(True)
bg_thread.start()
result = bg_archive_links((links,), kwargs={"overwrite": True, "out_dir": CONFIG.OUTPUT_DIR})
messages.success(
request,
f"Clearing all previous results and re-downloading {len(links)} URLs in the background. (refresh in a few minutes to see results)",
mark_safe(f"Clearing all previous results and re-downloading {len(links)} URLs in the background. {result_url(result)}"),
)
@admin.action(
@ -583,7 +596,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
remove(snapshots=queryset, yes=True, delete=True, out_dir=CONFIG.OUTPUT_DIR)
messages.success(
request,
f"Succesfully deleted {len(queryset.count())} Snapshots. Don't forget to scrub URLs from import logs (data/sources) and error logs (data/logs) if needed.",
mark_safe(f"Succesfully deleted {queryset.count()} Snapshots. Don't forget to scrub URLs from import logs (data/sources) and error logs (data/logs) if needed."),
)
@ -597,7 +610,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
obj.tags.add(*tags)
messages.success(
request,
f"Added {len(tags)} tags to {len(queryset.count())} Snapshots.",
f"Added {len(tags)} tags to {queryset.count()} Snapshots.",
)
@ -611,7 +624,7 @@ class SnapshotAdmin(SearchResultsAdminMixin, ABIDModelAdmin):
obj.tags.remove(*tags)
messages.success(
request,
f"Removed {len(tags)} tags from {len(queryset.count())} Snapshots.",
f"Removed {len(tags)} tags from {queryset.count()} Snapshots.",
)
@ -727,7 +740,6 @@ class ArchiveResultAdmin(ABIDModelAdmin):
else:
root_dir = str(snapshot_dir)
# print(root_dir, str(list(os.walk(root_dir))))
for root, dirs, files in os.walk(root_dir):

View file

@ -87,6 +87,7 @@ INSTALLED_APPS = [
'django_object_actions', # provides easy Django Admin action buttons on change views https://github.com/crccheck/django-object-actions
# Our ArchiveBox-provided apps
'queues', # handles starting and managing background workers and processes
'abid_utils', # handles ABID ID creation, handling, and models
'plugantic', # ArchiveBox plugin API definition + finding/registering/calling interface
'core', # core django model with Snapshot, ArchiveResult, etc.
@ -98,6 +99,9 @@ INSTALLED_APPS = [
# 3rd-party apps from PyPI that need to be loaded last
'admin_data_views', # handles rendering some convenient automatic read-only views of data in Django admin
'django_extensions', # provides Django Debug Toolbar (and other non-debug helpers)
'django_huey', # provides multi-queue support for django huey https://github.com/gaiacoop/django-huey
'bx_django_utils', # needed for huey_monitor https://github.com/boxine/bx_django_utils
'huey_monitor', # adds an admin UI for monitoring background huey tasks https://github.com/boxine/django-huey-monitor
]
@ -212,17 +216,28 @@ CACHE_DB_TABLE = 'django_cache'
DATABASE_FILE = Path(CONFIG.OUTPUT_DIR) / CONFIG.SQL_INDEX_FILENAME
DATABASE_NAME = os.environ.get("ARCHIVEBOX_DATABASE_NAME", str(DATABASE_FILE))
QUEUE_DATABASE_NAME = DATABASE_NAME.replace('index.sqlite3', 'queue.sqlite3')
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': DATABASE_NAME,
'OPTIONS': {
'timeout': 60,
'check_same_thread': False,
"default": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": DATABASE_NAME,
"OPTIONS": {
"timeout": 60,
"check_same_thread": False,
},
'TIME_ZONE': CONFIG.TIMEZONE,
"TIME_ZONE": CONFIG.TIMEZONE,
# DB setup is sometimes modified at runtime by setup_django() in config.py
},
"queue": {
"ENGINE": "django.db.backends.sqlite3",
"NAME": QUEUE_DATABASE_NAME,
"OPTIONS": {
"timeout": 60,
"check_same_thread": False,
},
"TIME_ZONE": CONFIG.TIMEZONE,
},
# 'cache': {
# 'ENGINE': 'django.db.backends.sqlite3',
# 'NAME': CACHE_DB_PATH,
@ -239,6 +254,64 @@ MIGRATION_MODULES = {'signal_webhooks': None}
DEFAULT_AUTO_FIELD = 'django.db.models.BigAutoField'
HUEY = {
"huey_class": "huey.SqliteHuey",
"filename": QUEUE_DATABASE_NAME,
"name": "system_tasks",
"results": True,
"store_none": True,
"immediate": False,
"utc": True,
"consumer": {
"workers": 1,
"worker_type": "thread",
"initial_delay": 0.1, # Smallest polling interval, same as -d.
"backoff": 1.15, # Exponential backoff using this rate, -b.
"max_delay": 10.0, # Max possible polling interval, -m.
"scheduler_interval": 1, # Check schedule every second, -s.
"periodic": True, # Enable crontab feature.
"check_worker_health": True, # Enable worker health checks.
"health_check_interval": 1, # Check worker health every second.
},
}
# https://huey.readthedocs.io/en/latest/contrib.html#setting-things-up
# https://github.com/gaiacoop/django-huey
DJANGO_HUEY = {
"default": "system_tasks",
"queues": {
HUEY["name"]: HUEY.copy(),
# more registered here at plugin import-time by BaseQueue.register()
},
}
class HueyDBRouter:
"""A router to store all the Huey Monitor models in the queue.sqlite3 database."""
route_app_labels = {"huey_monitor", "django_huey", "djhuey"}
def db_for_read(self, model, **hints):
if model._meta.app_label in self.route_app_labels:
return "queue"
return 'default'
def db_for_write(self, model, **hints):
if model._meta.app_label in self.route_app_labels:
return "queue"
return 'default'
def allow_relation(self, obj1, obj2, **hints):
if obj1._meta.app_label in self.route_app_labels or obj2._meta.app_label in self.route_app_labels:
return obj1._meta.app_label == obj2._meta.app_label
return None
def allow_migrate(self, db, app_label, model_name=None, **hints):
if app_label in self.route_app_labels:
return db == "queue"
return db == "default"
DATABASE_ROUTERS = ['core.settings.HueyDBRouter']
CACHES = {
'default': {'BACKEND': 'django.core.cache.backends.locmem.LocMemCache'},
# 'sqlite': {'BACKEND': 'django.core.cache.backends.db.DatabaseCache', 'LOCATION': 'cache'},

View file

@ -23,6 +23,9 @@ from admin_data_views.utils import render_with_table_view, render_with_item_view
from core.models import Snapshot
from core.forms import AddLinkForm
from core.admin import result_url
from queues.tasks import bg_add
from ..config import (
OUTPUT_DIR,
@ -478,15 +481,14 @@ class AddView(UserPassesTestMixin, FormView):
if extractors:
input_kwargs.update({"extractors": extractors})
bg_thread = threading.Thread(target=add, kwargs=input_kwargs)
bg_thread.setDaemon(True)
bg_thread.start()
result = bg_add(input_kwargs, parent_task_id=None)
print('Started background add job:', result)
rough_url_count = url.count('://')
messages.success(
self.request,
f"Adding {rough_url_count} URLs in the background. (refresh in a few minutes to see results)",
mark_safe(f"Adding {rough_url_count} URLs in the background. (refresh in a few minutes to see results) {result_url(result)}"),
)
return redirect("/admin/core/snapshot/")

View file

@ -148,17 +148,16 @@ def list_migrations(out_dir: Path=OUTPUT_DIR) -> List[Tuple[bool, str]]:
@enforce_types
def apply_migrations(out_dir: Path=OUTPUT_DIR) -> List[str]:
from django.core.management import call_command
null, out = StringIO(), StringIO()
try:
call_command("makemigrations", interactive=False, stdout=null)
except Exception as e:
print('[!] Failed to create some migrations. Please open an issue and copy paste this output for help: {}'.format(e))
print()
out1, out2 = StringIO(), StringIO()
call_command("migrate", interactive=False, stdout=out)
out.seek(0)
call_command("migrate", interactive=False, database='default', stdout=out1)
out1.seek(0)
call_command("migrate", "huey_monitor", interactive=False, database='queue', stdout=out2)
out2.seek(0)
return [line.strip() for line in out.readlines() if line.strip()]
return [
line.strip() for line in out1.readlines() + out2.readlines() if line.strip()
]
@enforce_types
def get_admins(out_dir: Path=OUTPUT_DIR) -> List[str]:

View file

@ -1,8 +1,10 @@
__package__ = 'archivebox'
import os
import time
import sys
import shutil
import signal
import platform
import subprocess
@ -1352,6 +1354,7 @@ def server(runserver_args: Optional[List[str]]=None,
if reload or debug:
call_command("runserver", *runserver_args)
else:
host = '127.0.0.1'
port = '8000'
@ -1367,12 +1370,52 @@ def server(runserver_args: Optional[List[str]]=None,
except IndexError:
pass
from queues.supervisor_util import get_or_create_supervisord_process, start_worker, stop_worker, watch_worker
print()
supervisor = get_or_create_supervisord_process(daemonize=False)
bg_workers = [
{
"name": "worker_system_tasks",
"command": "archivebox manage djangohuey --queue system_tasks",
"autostart": "true",
"autorestart": "true",
"stdout_logfile": "logs/worker_system_tasks.log",
"redirect_stderr": "true",
},
]
fg_worker = {
"name": "worker_daphne",
"command": f"daphne --bind={host} --port={port} --application-close-timeout=600 archivebox.core.asgi:application",
"autostart": "false",
"autorestart": "true",
"stdout_logfile": "logs/worker_daphne.log",
"redirect_stderr": "true",
}
print()
for worker in bg_workers:
start_worker(supervisor, worker)
print()
start_worker(supervisor, fg_worker)
print()
try:
subprocess.run(['daphne', '--bind', host, '--port', port, 'archivebox.core.asgi:application'])
except (SystemExit, KeyboardInterrupt):
watch_worker(supervisor, "worker_daphne")
except KeyboardInterrupt:
print("\n[🛑] Got Ctrl+C, stopping gracefully...")
except SystemExit:
pass
except Exception as e:
print(e)
except BaseException as e:
print(f"\n[🛑] Got {e.__class__.__name__} exception, stopping web server gracefully...")
raise
finally:
stop_worker(supervisor, "worker_daphne")
time.sleep(0.5)
print("\n[🟩] ArchiveBox server shut down gracefully.")
@enforce_types

View file

@ -12,13 +12,13 @@ from ..config_stubs import AttrDict
class BaseBinProvider(BaseHook, BinProvider):
hook_type: HookType = 'BINPROVIDER'
# def on_get_abspath(self, bin_name: BinName, **context) -> Optional[HostBinPath]:
# Class = super()
# get_abspath_func = lambda: Class.on_get_abspath(bin_name, **context)
# # return cache.get_or_set(f'bin:abspath:{bin_name}', get_abspath_func)
# return get_abspath_func()
# def on_get_version(self, bin_name: BinName, abspath: Optional[HostBinPath]=None, **context) -> SemVer | None:
# Class = super()
# get_version_func = lambda: Class.on_get_version(bin_name, abspath, **context)

View file

@ -0,0 +1,143 @@
__package__ = 'archivebox.plugantic'
import importlib
from typing import Dict, List, TYPE_CHECKING
from pydantic import Field, InstanceOf
if TYPE_CHECKING:
from huey.api import TaskWrapper
from .base_hook import BaseHook, HookType
from .base_binary import BaseBinary
from ..config_stubs import AttrDict
class BaseQueue(BaseHook):
hook_type: HookType = 'QUEUE'
name: str = Field() # e.g. 'singlefile'
binaries: List[InstanceOf[BaseBinary]] = Field()
@property
def tasks(self) -> Dict[str, 'TaskWrapper']:
"""Return an AttrDict of all the background worker tasks defined in the plugin's tasks.py file."""
tasks = importlib.import_module(f"{self.plugin_module}.tasks")
all_tasks = {}
for task_name, task in tasks.__dict__.items():
# if attr is a Huey task and its queue_name matches our hook's queue name
if hasattr(task, "task_class") and task.huey.name == self.name:
all_tasks[task_name] = task
return AttrDict(all_tasks)
def get_huey_config(self, settings) -> dict:
return {
"huey_class": "huey.SqliteHuey",
"filename": settings.QUEUE_DATABASE_NAME,
"name": self.name,
"results": True,
"store_none": True,
"immediate": False,
"utc": True,
"consumer": {
"workers": 1,
"worker_type": "thread",
"initial_delay": 0.1, # Smallest polling interval, same as -d.
"backoff": 1.15, # Exponential backoff using this rate, -b.
"max_delay": 10.0, # Max possible polling interval, -m.
"scheduler_interval": 1, # Check schedule every second, -s.
"periodic": True, # Enable crontab feature.
"check_worker_health": True, # Enable worker health checks.
"health_check_interval": 1, # Check worker health every second.
},
}
def get_supervisor_config(self, settings) -> dict:
return {
"name": f"worker_{self.name}",
"command": f"archivebox manage djangohuey --queue {self.name}",
"stdout_logfile": f"logs/worker_{self.name}.log",
"redirect_stderr": "true",
"autorestart": "true",
"autostart": "false",
}
def start_supervisord_worker(self, settings, lazy=True):
from queues.supervisor_util import get_or_create_supervisord_process, start_worker
print()
try:
supervisor = get_or_create_supervisord_process(daemonize=False)
except Exception as e:
print(f"Error starting worker for queue {self.name}: {e}")
return None
print()
worker = start_worker(supervisor, self.get_supervisor_config(settings), lazy=lazy)
return worker
def register(self, settings, parent_plugin=None):
# self._plugin = parent_plugin # for debugging only, never rely on this!
# Side effect: register queue with django-huey multiqueue dict
settings.DJANGO_HUEY = getattr(settings, "DJANGO_HUEY", None) or AttrDict({"queues": {}})
settings.DJANGO_HUEY["queues"][self.name] = self.get_huey_config(settings)
# Side effect: register some extra tasks with huey
# on_startup(queue=self.name)(self.on_startup_task)
# db_periodic_task(crontab(minute='*/5'))(self.on_periodic_task)
# Side effect: start consumer worker process under supervisord
settings.WORKERS = getattr(settings, "WORKERS", None) or AttrDict({})
settings.WORKERS[self.id] = self.start_supervisord_worker(settings, lazy=True)
# Install queue into settings.QUEUES
settings.QUEUES = getattr(settings, "QUEUES", None) or AttrDict({})
settings.QUEUES[self.id] = self
# Record installed hook into settings.HOOKS
super().register(settings, parent_plugin=parent_plugin)
# class WgetToggleConfig(ConfigSet):
# section: ConfigSectionName = 'ARCHIVE_METHOD_TOGGLES'
# SAVE_WGET: bool = True
# SAVE_WARC: bool = True
# class WgetDependencyConfig(ConfigSet):
# section: ConfigSectionName = 'DEPENDENCY_CONFIG'
# WGET_BINARY: str = Field(default='wget')
# WGET_ARGS: Optional[List[str]] = Field(default=None)
# WGET_EXTRA_ARGS: List[str] = []
# WGET_DEFAULT_ARGS: List[str] = ['--timeout={TIMEOUT-10}']
# class WgetOptionsConfig(ConfigSet):
# section: ConfigSectionName = 'ARCHIVE_METHOD_OPTIONS'
# # loaded from shared config
# WGET_AUTO_COMPRESSION: bool = Field(default=True)
# SAVE_WGET_REQUISITES: bool = Field(default=True)
# WGET_USER_AGENT: str = Field(default='', alias='USER_AGENT')
# WGET_TIMEOUT: int = Field(default=60, alias='TIMEOUT')
# WGET_CHECK_SSL_VALIDITY: bool = Field(default=True, alias='CHECK_SSL_VALIDITY')
# WGET_RESTRICT_FILE_NAMES: str = Field(default='windows', alias='RESTRICT_FILE_NAMES')
# WGET_COOKIES_FILE: Optional[Path] = Field(default=None, alias='COOKIES_FILE')
# CONFIG = {
# 'CHECK_SSL_VALIDITY': False,
# 'SAVE_WARC': False,
# 'TIMEOUT': 999,
# }
# WGET_CONFIG = [
# WgetToggleConfig(**CONFIG),
# WgetDependencyConfig(**CONFIG),
# WgetOptionsConfig(**CONFIG),
# ]

View file

View file

@ -0,0 +1,6 @@
from django.apps import AppConfig
class QueuesConfig(AppConfig):
default_auto_field = 'django.db.models.BigAutoField'
name = 'queues'

View file

View file

@ -0,0 +1,18 @@
from pathlib import Path
from django.conf import settings
OUTPUT_DIR = settings.CONFIG.OUTPUT_DIR
LOGS_DIR = settings.CONFIG.LOGS_DIR
TMP_DIR = OUTPUT_DIR / "tmp"
Path.mkdir(TMP_DIR, exist_ok=True)
CONFIG_FILE = TMP_DIR / "supervisord.conf"
PID_FILE = TMP_DIR / "supervisord.pid"
SOCK_FILE = TMP_DIR / "supervisord.sock"
LOG_FILE = TMP_DIR / "supervisord.log"
WORKER_DIR = TMP_DIR / "workers"

View file

@ -0,0 +1,261 @@
__package__ = 'archivebox.queues'
import sys
import time
import signal
import psutil
import subprocess
from pathlib import Path
from rich.pretty import pprint
from typing import Dict, cast
from supervisor.xmlrpc import SupervisorTransport
from xmlrpc.client import ServerProxy
from .settings import CONFIG_FILE, PID_FILE, SOCK_FILE, LOG_FILE, WORKER_DIR, TMP_DIR, LOGS_DIR
def create_supervisord_config():
config_content = f"""
[supervisord]
nodaemon = true
environment = IS_SUPERVISORD_PARENT="true"
pidfile = %(here)s/{PID_FILE.name}
logfile = %(here)s/../{LOGS_DIR.name}/{LOG_FILE.name}
childlogdir = %(here)s/../{LOGS_DIR.name}
directory = %(here)s/..
strip_ansi = true
nocleanup = true
[unix_http_server]
file = %(here)s/{SOCK_FILE.name}
chmod = 0700
[supervisorctl]
serverurl = unix://%(here)s/{SOCK_FILE.name}
[rpcinterface:supervisor]
supervisor.rpcinterface_factory = supervisor.rpcinterface:make_main_rpcinterface
[include]
files = %(here)s/{WORKER_DIR.name}/*.conf
"""
with open(CONFIG_FILE, "w") as f:
f.write(config_content)
def create_worker_config(daemon):
Path.mkdir(WORKER_DIR, exist_ok=True)
name = daemon['name']
configfile = WORKER_DIR / f"{name}.conf"
config_content = f"[program:{name}]\n"
for key, value in daemon.items():
if key == 'name': continue
config_content += f"{key}={value}\n"
config_content += "\n"
with open(configfile, "w") as f:
f.write(config_content)
def get_existing_supervisord_process():
try:
transport = SupervisorTransport(None, None, f"unix://{SOCK_FILE}")
server = ServerProxy("http://localhost", transport=transport)
current_state = cast(Dict[str, int | str], server.supervisor.getState())
if current_state["statename"] == "RUNNING":
pid = server.supervisor.getPID()
print(f"[🦸‍♂️] Supervisord connected (pid={pid}) via unix://{str(SOCK_FILE).replace(str(TMP_DIR), 'tmp')}.")
return server.supervisor
except FileNotFoundError:
return None
except Exception as e:
print(f"Error connecting to existing supervisord: {str(e)}")
return None
def stop_existing_supervisord_process():
try:
pid = int(PID_FILE.read_text())
except FileNotFoundError:
return
except ValueError:
PID_FILE.unlink()
return
try:
print(f"[🦸‍♂️] Stopping supervisord process (pid={pid})...")
proc = psutil.Process(pid)
proc.terminate()
proc.wait()
except Exception:
raise
try:
PID_FILE.unlink()
except FileNotFoundError:
pass
def start_new_supervisord_process(daemonize=True):
print(f"[🦸‍♂️] Supervisord starting{' in background' if daemonize else ''}...")
# Create a config file in the current working directory
create_supervisord_config()
# Start supervisord
subprocess.Popen(
f"supervisord --configuration={CONFIG_FILE}",
stdin=None,
shell=True,
start_new_session=daemonize,
)
def exit_signal_handler(signum, frame):
if signum != 13:
print(f"\n[🦸‍♂️] Supervisord got stop signal ({signal.strsignal(signum)}). Terminating child processes...")
stop_existing_supervisord_process()
raise SystemExit(0)
# Monitor for termination signals and cleanup child processes
if not daemonize:
signal.signal(signal.SIGINT, exit_signal_handler)
signal.signal(signal.SIGHUP, exit_signal_handler)
signal.signal(signal.SIGPIPE, exit_signal_handler)
signal.signal(signal.SIGTERM, exit_signal_handler)
# otherwise supervisord will containue in background even if parent proc is ends (aka daemon mode)
time.sleep(2)
return get_existing_supervisord_process()
def get_or_create_supervisord_process(daemonize=True):
supervisor = get_existing_supervisord_process()
if supervisor is None:
stop_existing_supervisord_process()
supervisor = start_new_supervisord_process(daemonize=daemonize)
assert supervisor and supervisor.getPID(), "Failed to start supervisord or connect to it!"
return supervisor
def start_worker(supervisor, daemon, lazy=False):
assert supervisor.getPID()
print(f"[🦸‍♂️] Supervisord starting new subprocess worker: {daemon['name']}...")
create_worker_config(daemon)
result = supervisor.reloadConfig()
added, changed, removed = result[0]
# print(f"Added: {added}, Changed: {changed}, Removed: {removed}")
for removed in removed:
supervisor.stopProcessGroup(removed)
supervisor.removeProcessGroup(removed)
for changed in changed:
supervisor.stopProcessGroup(changed)
supervisor.removeProcessGroup(changed)
supervisor.addProcessGroup(changed)
for added in added:
supervisor.addProcessGroup(added)
time.sleep(1)
for _ in range(10):
procs = supervisor.getAllProcessInfo()
for proc in procs:
if proc['name'] == daemon["name"]:
# See process state diagram here: http://supervisord.org/subprocess.html
if proc['statename'] == 'RUNNING':
print(f" - Worker {daemon['name']}: already {proc['statename']} ({proc['description']})")
return proc
else:
if not lazy:
supervisor.startProcessGroup(daemon["name"], True)
proc = supervisor.getProcessInfo(daemon["name"])
print(f" - Worker {daemon['name']}: started {proc['statename']} ({proc['description']})")
return proc
# retry in a second in case it's slow to launch
time.sleep(0.5)
raise Exception(f"Failed to start worker {daemon['name']}! Only found: {procs}")
def watch_worker(supervisor, daemon_name, interval=5):
"""loop continuously and monitor worker's health"""
while True:
proc = get_worker(supervisor, daemon_name)
if not proc:
raise Exception("Worker dissapeared while running! " + daemon_name)
if proc['statename'] == 'STOPPED':
return proc
if proc['statename'] == 'RUNNING':
time.sleep(1)
continue
if proc['statename'] in ('STARTING', 'BACKOFF', 'FATAL', 'EXITED', 'STOPPING'):
print(f'[🦸‍♂️] WARNING: Worker {daemon_name} {proc["statename"]} {proc["description"]}')
time.sleep(interval)
continue
def get_worker(supervisor, daemon_name):
try:
return supervisor.getProcessInfo(daemon_name)
except Exception:
pass
return None
def stop_worker(supervisor, daemon_name):
proc = get_worker(supervisor, daemon_name)
for _ in range(10):
if not proc:
# worker does not exist (was never running or configured in the first place)
return True
# See process state diagram here: http://supervisord.org/subprocess.html
if proc['statename'] == 'STOPPED':
# worker was configured but has already stopped for some reason
supervisor.removeProcessGroup(daemon_name)
return True
else:
# worker was configured and is running, stop it now
supervisor.stopProcessGroup(daemon_name)
# wait 500ms and then re-check to make sure it's really stopped
time.sleep(0.5)
proc = get_worker(supervisor, daemon_name)
raise Exception(f"Failed to stop worker {daemon_name}!")
def main(daemons):
supervisor = get_or_create_supervisord_process(daemonize=True)
worker = start_worker(supervisor, daemons["webworker"])
pprint(worker)
print("All processes started in background.")
# Optionally you can block the main thread until an exit signal is received:
# try:
# signal.pause()
# except KeyboardInterrupt:
# pass
# finally:
# stop_existing_supervisord_process()
# if __name__ == "__main__":
# DAEMONS = {
# "webworker": {
# "name": "webworker",
# "command": "python3 -m http.server 9000",
# "directory": str(cwd),
# "autostart": "true",
# "autorestart": "true",
# "stdout_logfile": cwd / "webworker.log",
# "stderr_logfile": cwd / "webworker_error.log",
# },
# }
# main(DAEMONS, cwd)

View file

@ -0,0 +1,41 @@
__package__ = 'archivebox.queues'
from django_huey import db_task, task
from huey_monitor.models import TaskModel
from huey_monitor.tqdm import ProcessInfo
@db_task(queue="system_tasks", context=True)
def bg_add(add_kwargs, task=None, parent_task_id=None):
from ..main import add
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert add_kwargs and add_kwargs.get("urls")
rough_url_count = add_kwargs["urls"].count("://")
process_info = ProcessInfo(task, desc="add", parent_task_id=parent_task_id, total=rough_url_count)
result = add(**add_kwargs)
process_info.update(n=rough_url_count)
return result
@task(queue="system_tasks", context=True)
def bg_archive_links(args, kwargs=None, task=None, parent_task_id=None):
from ..extractors import archive_links
if task and parent_task_id:
TaskModel.objects.set_parent_task(main_task_id=parent_task_id, sub_task_id=task.id)
assert args and args[0]
kwargs = kwargs or {}
rough_count = len(args[0])
process_info = ProcessInfo(task, desc="archive_links", parent_task_id=parent_task_id, total=rough_count)
result = archive_links(*args, **kwargs)
process_info.update(n=rough_count)
return result

View file

@ -329,3 +329,15 @@ tbody .output-link {
box-shadow: 4px 4px 4px rgba(0,0,0,0.1);
}
tbody .output-link:hover {opacity: 1;}
@keyframes fadeIn {
0% { opacity: 0; }
20% { opacity: 0;}
100% { opacity: 1; }
}
.fade-in-progress-url {
animation: fadeIn 8s;
}

125
pdm.lock
View file

@ -5,7 +5,7 @@
groups = ["default", "ldap", "sonic"]
strategy = ["inherit_metadata"]
lock_version = "4.5.0"
content_hash = "sha256:c890335ff9967151514ff57e709d8b39c19f51edce5d15fb1b15c0a276a573f9"
content_hash = "sha256:ec23de8c5caf198c09f35e79411990eba9ed095da475f694d2a837c9a93d9bb1"
[[metadata.targets]]
requires_python = "==3.11.*"
@ -176,6 +176,34 @@ files = [
{file = "brotlicffi-1.1.0.0.tar.gz", hash = "sha256:b77827a689905143f87915310b93b273ab17888fd43ef350d4832c4a71083c13"},
]
[[package]]
name = "bx-django-utils"
version = "79"
summary = "Various Django utility functions"
groups = ["default"]
marker = "python_version == \"3.11\""
dependencies = [
"bx-py-utils>=92",
"django>=4.2",
"python-stdnum",
]
files = [
{file = "bx_django_utils-79-py3-none-any.whl", hash = "sha256:d50b10ace24b0b363574542faecf04a81029e2fec6d6e6525fe063ed06238e04"},
{file = "bx_django_utils-79.tar.gz", hash = "sha256:cb66087d4e9396281acf5a4394b749cff3062b66082d5726f6a8a342fdd35d0e"},
]
[[package]]
name = "bx-py-utils"
version = "101"
requires_python = "<4,>=3.10"
summary = "Various Python utility functions"
groups = ["default"]
marker = "python_version == \"3.11\""
files = [
{file = "bx_py_utils-101-py3-none-any.whl", hash = "sha256:eece1f0b1e3c091d38f3013984056b05f43c6a0fd716489cf337d89df802ab59"},
{file = "bx_py_utils-101.tar.gz", hash = "sha256:2aa295cde55da99b77f5f2f8b5bf8c0bec7e0046511832989ecbb1a43183cf75"},
]
[[package]]
name = "certifi"
version = "2024.8.30"
@ -424,6 +452,40 @@ files = [
{file = "django_extensions-3.2.3-py3-none-any.whl", hash = "sha256:9600b7562f79a92cbf1fde6403c04fee314608fefbb595502e34383ae8203401"},
]
[[package]]
name = "django-huey"
version = "1.2.1"
requires_python = ">=3.8"
summary = "An extension for django and huey that supports multi queue management"
groups = ["default"]
marker = "python_version == \"3.11\""
dependencies = [
"django>=3.2",
"huey>=2.0",
]
files = [
{file = "django_huey-1.2.1-py3-none-any.whl", hash = "sha256:59c82b72fd4b6e60c219bd1fbab78acfe68a1c8d3efb1d3e42798a67d01a4aa2"},
{file = "django_huey-1.2.1.tar.gz", hash = "sha256:634abf1e707acef90dd00df4267458486f89a3117419000ec5584b1c4129701a"},
]
[[package]]
name = "django-huey-monitor"
version = "0.9.0"
requires_python = ">=3.10"
summary = "Django based tool for monitoring huey task queue: https://github.com/coleifer/huey"
groups = ["default"]
marker = "python_version == \"3.11\""
dependencies = [
"bx-django-utils",
"bx-py-utils",
"django",
"huey",
]
files = [
{file = "django-huey-monitor-0.9.0.tar.gz", hash = "sha256:03366d98579c07e132672aa760373949fecec108a0e91229e870bb21453c800b"},
{file = "django_huey_monitor-0.9.0-py3-none-any.whl", hash = "sha256:1d5922d182e138e288f99d6cdb326cbed20c831d4c906c96cba148b0979e648a"},
]
[[package]]
name = "django-jsonform"
version = "2.22.0"
@ -643,6 +705,16 @@ files = [
{file = "httpx-0.27.2.tar.gz", hash = "sha256:f7c2be1d2f3c3c3160d441802406b206c2b76f5947b11115e6df10c6c65e66c2"},
]
[[package]]
name = "huey"
version = "2.5.1"
summary = "huey, a little task queue"
groups = ["default"]
marker = "python_version == \"3.11\""
files = [
{file = "huey-2.5.1.tar.gz", hash = "sha256:8a323783ab434a095a4e72b8c48c5b8f957f9031fa860474a390a0927e957112"},
]
[[package]]
name = "hyperlink"
version = "21.0.0"
@ -832,19 +904,6 @@ dependencies = [
"requests",
]
[[package]]
name = "pocket"
version = "0.3.7"
git = "https://github.com/tapanpandita/pocket.git"
ref = "v0.3.7"
revision = "5a144438cc89bfc0ec94db960718ccf1f76468c1"
summary = "api wrapper for getpocket.com"
groups = ["default"]
marker = "python_version == \"3.11\""
dependencies = [
"requests",
]
[[package]]
name = "prompt-toolkit"
version = "3.0.47"
@ -860,6 +919,19 @@ files = [
{file = "prompt_toolkit-3.0.47.tar.gz", hash = "sha256:1e1b29cb58080b1e69f207c893a1a7bf16d127a5c30c9d17a25a5d77792e5360"},
]
[[package]]
name = "psutil"
version = "6.0.0"
requires_python = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,!=3.4.*,!=3.5.*,>=2.7"
summary = "Cross-platform lib for process and system monitoring in Python."
groups = ["default"]
marker = "python_version == \"3.11\""
files = [
{file = "psutil-6.0.0-cp36-abi3-manylinux_2_12_x86_64.manylinux2010_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:5fd9a97c8e94059b0ef54a7d4baf13b405011176c3b6ff257c247cae0d560ecd"},
{file = "psutil-6.0.0-cp38-abi3-macosx_11_0_arm64.whl", hash = "sha256:ffe7fc9b6b36beadc8c322f84e1caff51e8703b88eee1da46d1e3a6ae11b4fd0"},
{file = "psutil-6.0.0.tar.gz", hash = "sha256:8faae4f310b6d969fa26ca0545338b21f73c6b15db7c4a8d934a5482faa818f2"},
]
[[package]]
name = "ptyprocess"
version = "0.7.0"
@ -1057,6 +1129,17 @@ files = [
{file = "python-ldap-3.4.4.tar.gz", hash = "sha256:7edb0accec4e037797705f3a05cbf36a9fde50d08c8f67f2aef99a2628fab828"},
]
[[package]]
name = "python-stdnum"
version = "1.20"
summary = "Python module to handle standardized numbers and codes"
groups = ["default"]
marker = "python_version == \"3.11\""
files = [
{file = "python-stdnum-1.20.tar.gz", hash = "sha256:ad2a2cf2eb025de408210235f36b4ae31252de3186240ccaa8126e117cb82690"},
{file = "python_stdnum-1.20-py2.py3-none-any.whl", hash = "sha256:111008e10391d54fb2afad2a10df70d5cb0c6c0a7ec82fec6f022cb8712961d3"},
]
[[package]]
name = "pytz"
version = "2024.1"
@ -1220,6 +1303,20 @@ files = [
{file = "stack_data-0.6.3.tar.gz", hash = "sha256:836a778de4fec4dcd1dcd89ed8abff8a221f58308462e1c4aa2a3cf30148f0b9"},
]
[[package]]
name = "supervisor"
version = "4.2.5"
summary = "A system for controlling process state under UNIX"
groups = ["default"]
marker = "python_version == \"3.11\""
dependencies = [
"setuptools",
]
files = [
{file = "supervisor-4.2.5-py2.py3-none-any.whl", hash = "sha256:2ecaede32fc25af814696374b79e42644ecaba5c09494c51016ffda9602d0f08"},
{file = "supervisor-4.2.5.tar.gz", hash = "sha256:34761bae1a23c58192281a5115fb07fbf22c9b0133c08166beffc70fed3ebc12"},
]
[[package]]
name = "traitlets"
version = "5.14.3"

View file

@ -13,27 +13,13 @@ readme = "README.md"
# pdm update --unconstrained
dependencies = [
# Last Bumped: 2024-08-20
# Base Framework and Language Dependencies
############# Django / Core Libraries #############
"setuptools>=69.5.1",
"django>=5.0.4,<6.0",
"django-ninja>=1.1.0",
"django-extensions>=3.2.3",
"mypy-extensions>=1.0.0",
# Python Helper Libraries
"requests>=2.31.0",
"dateparser>=1.0.0",
"feedparser>=6.0.11",
"w3lib>=2.1.2",
"rich>=13.8.0",
"ulid-py>=1.1.0",
"typeid-python>=0.3.0",
# Feature-Specific Dependencies
"python-crontab>=3.0.0", # for: archivebox schedule
"croniter>=2.0.5", # for: archivebox schedule
"ipython>=8.23.0", # for: archivebox shell
# Extractor Dependencies
"yt-dlp>=2024.8.6", # for: media
# "playwright>=1.43.0; platform_machine != 'armv7l'", # WARNING: playwright doesn't have any sdist, causes trouble on build systems that refuse to install wheel-only packages
"channels[daphne]>=4.1.0",
"django-signal-webhooks>=0.3.0",
"django-admin-data-views>=0.3.1",
"django-object-actions>=4.2.0",
@ -41,6 +27,22 @@ dependencies = [
"django-pydantic-field>=0.3.9",
"django-jsonform>=2.22.0",
"django-stubs>=5.0.2",
"django-huey>=1.2.1",
"django-huey-monitor>=0.9.0",
############# Python Helper Libraries ############
"requests>=2.31.0",
"dateparser>=1.0.0",
"feedparser>=6.0.11",
"w3lib>=2.1.2",
"rich>=13.8.0",
"ulid-py>=1.1.0",
"typeid-python>=0.3.0",
"psutil>=6.0.0",
"supervisor>=4.2.5",
"python-crontab>=3.0.0", # for: archivebox schedule
"croniter>=2.0.5", # for: archivebox schedule
"ipython>=8.23.0", # for: archivebox shell
############# VENDORED LIBS ######################
# these can be safely omitted when installation subsystem does not provide these as packages (e.g. apt/debian)
# archivebox will automatically load fallback vendored copies bundled via archivebox/vendor/__init__.py
"pydantic-pkgr>=0.1.4",
@ -48,7 +50,8 @@ dependencies = [
"pocket@git+https://github.com/tapanpandita/pocket.git@v0.3.7",
"django-taggit==1.3.0",
"base32-crockford==0.3.0",
"channels[daphne]>=4.1.0",
############# Extractor Dependencies #############
"yt-dlp>=2024.8.6", # for: media
]
homepage = "https://github.com/ArchiveBox/ArchiveBox"