"""Sync public facade — for Django / Flask / plain-Python codebases.
Uses ``psycopg`` directly. Never drives async-from-sync via ``asyncio.run``
inside library code — that's a footgun and the explicit non-goal.
"""
from __future__ import annotations
from collections.abc import Callable, Iterator
from contextlib import contextmanager
from datetime import datetime
from typing import TYPE_CHECKING, Any
from pydantic import BaseModel
from roost import observability
from roost._core import repo
from roost.async_api import _apply_task_defaults
from roost.decorators import DEFAULT_HANDLERS, HandlerRegistry, task_name
if TYPE_CHECKING: # pragma: no cover
import psycopg
def _coerce_args(args: dict[str, Any] | BaseModel | None) -> dict[str, Any]:
if args is None:
return {}
if isinstance(args, BaseModel):
return args.model_dump()
return args
[docs]
class Roost:
"""Synchronous entry point.
Most users only call :meth:`enqueue` from this class — workers run via
the CLI or :class:`roost.AsyncRoost.worker` (the worker loop itself is
asyncio-based; sync code can drive it via ``asyncio.run``).
"""
def __init__(
self,
dsn: str,
*,
registry: HandlerRegistry | None = None,
) -> None:
self.dsn = dsn
self.registry = registry or DEFAULT_HANDLERS
@contextmanager
def _connect(self) -> Iterator[psycopg.Connection[Any]]:
import psycopg
with psycopg.connect(self.dsn) as conn:
yield conn
# ------------------------------------------------------------------
# schema
# ------------------------------------------------------------------
[docs]
def setup_schema(self, conn: psycopg.Connection[Any] | None = None) -> None:
if conn is not None:
repo.apply_schema_sync(conn)
conn.commit()
return
with self._connect() as managed:
repo.apply_schema_sync(managed)
managed.commit()
# ------------------------------------------------------------------
# enqueue
# ------------------------------------------------------------------
[docs]
def enqueue(
self,
task: str | Callable[..., Any],
*,
args: dict[str, Any] | BaseModel | None = None,
queue: str = "default",
priority: int = 0,
max_attempts: int = 20,
scheduled_at: datetime | None = None,
unique_key: str | None = None,
tags: list[str] | None = None,
timeout_seconds: int | None = None,
depends_on: list[int] | None = None,
metadata: dict[str, Any] | None = None,
conn: psycopg.Connection[Any] | None = None,
) -> int:
name = task_name(task) if callable(task) else task
args_dict = observability.inject_trace_context(_coerce_args(args))
spec = self.registry.get(name)
if spec is not None:
queue, priority, max_attempts, tags, timeout_seconds = _apply_task_defaults(
spec.defaults,
queue=queue,
priority=priority,
max_attempts=max_attempts,
tags=tags,
timeout_seconds=timeout_seconds,
queue_default="default",
priority_default=0,
max_attempts_default=20,
)
observability.JOBS_ENQUEUED.labels(queue=queue, task=name).inc()
kwargs: dict[str, Any] = dict(
task=name,
args=args_dict,
queue=queue,
priority=priority,
max_attempts=max_attempts,
scheduled_at=scheduled_at,
unique_key=unique_key,
tags=tags,
timeout_seconds=timeout_seconds,
depends_on=depends_on,
metadata=metadata,
)
if conn is not None:
return repo.enqueue_sync(conn, **kwargs)
with self._connect() as managed:
try:
job_id = repo.enqueue_sync(managed, **kwargs)
managed.commit()
return job_id
except Exception:
managed.rollback()
raise
# ------------------------------------------------------------------
# admin
# ------------------------------------------------------------------
[docs]
def status(self) -> list[tuple[str, str, int]]:
with self._connect() as conn:
return repo.status_counts_sync(conn)
[docs]
def retry(self, job_id: int) -> None:
with self._connect() as conn:
repo.retry_job_sync(conn, job_id)
conn.commit()
[docs]
def cancel(self, job_id: int) -> None:
with self._connect() as conn:
repo.cancel_job_sync(conn, job_id)
conn.commit()
[docs]
def pause_queue(self, name: str) -> None:
with self._connect() as conn:
repo.pause_queue_sync(conn, name)
conn.commit()
[docs]
def resume_queue(self, name: str) -> None:
with self._connect() as conn:
repo.resume_queue_sync(conn, name)
conn.commit()
[docs]
def list_queues(self) -> list[tuple[str, datetime | None]]:
with self._connect() as conn:
return repo.list_queues_sync(conn)
[docs]
def list_workers(self) -> list[dict[str, Any]]:
with self._connect() as conn:
return repo.list_workers_sync(conn)
[docs]
def requeue_discarded(self) -> int:
with self._connect() as conn:
n = repo.requeue_discarded_sync(conn)
conn.commit()
return n