Source code for roost._core.repo

"""All DB I/O lives here. Single source of truth for SQL.

Both the async (asyncpg) and sync (psycopg) facades route through this module.
The two flavors share the same SQL with placeholder syntax differences only
(``$N`` for asyncpg, ``%s`` for psycopg).

The most important contract: ``enqueue_*`` always takes the caller's
connection / cursor. We never open one ourselves.
"""

from __future__ import annotations

import json
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Any, cast

from roost._core.job import Job
from roost.exceptions import JobNotFoundError

if TYPE_CHECKING:  # pragma: no cover
    import asyncpg
    import psycopg


# ---------------------------------------------------------------------------
# SQL strings — placeholder agnostic where possible
# ---------------------------------------------------------------------------

_ENQUEUE_BASE_COLS = (
    "task, args, queue, priority, max_attempts, scheduled_at, "
    "unique_key, tags, timeout_seconds, depends_on, metadata"
)

# scheduled_at defaults to server-side now() when caller passes NULL. We
# avoid Python-side ``datetime.now()`` for the default to dodge clock drift
# between the client host and the Postgres server (Docker testcontainers,
# k8s pods, etc.) which can leave newly enqueued rows briefly invisible to
# ``WHERE scheduled_at <= now()``.
_INSERT_PLAIN_PG = f"""
INSERT INTO roost.jobs ({_ENQUEUE_BASE_COLS})
VALUES (
    $1, $2::jsonb, $3, $4, $5,
    COALESCE($6::timestamptz, now()),
    $7, $8::text[], $9, $10::bigint[], $11::jsonb
)
RETURNING id
"""

_INSERT_UNIQUE_PG = f"""
WITH attempted AS (
    INSERT INTO roost.jobs ({_ENQUEUE_BASE_COLS})
    VALUES (
        $1, $2::jsonb, $3, $4, $5,
        COALESCE($6::timestamptz, now()),
        $7, $8::text[], $9, $10::bigint[], $11::jsonb
    )
    ON CONFLICT (unique_key)
      WHERE unique_key IS NOT NULL AND state IN ('available','executing','retryable')
    DO NOTHING
    RETURNING id, true AS inserted
)
SELECT id, inserted FROM attempted
UNION ALL
SELECT j.id, false AS inserted
  FROM roost.jobs j
 WHERE j.unique_key = $7
   AND j.state IN ('available','executing','retryable')
   AND NOT EXISTS (SELECT 1 FROM attempted)
 LIMIT 1
"""

_INSERT_PLAIN_PSY = f"""
INSERT INTO roost.jobs ({_ENQUEUE_BASE_COLS})
VALUES (
    %s, %s::jsonb, %s, %s, %s,
    COALESCE(%s::timestamptz, now()),
    %s, %s::text[], %s, %s::bigint[], %s::jsonb
)
RETURNING id
"""

_INSERT_UNIQUE_PSY = f"""
WITH attempted AS (
    INSERT INTO roost.jobs ({_ENQUEUE_BASE_COLS})
    VALUES (
        %s, %s::jsonb, %s, %s, %s,
        COALESCE(%s::timestamptz, now()),
        %s, %s::text[], %s, %s::bigint[], %s::jsonb
    )
    ON CONFLICT (unique_key)
      WHERE unique_key IS NOT NULL AND state IN ('available','executing','retryable')
    DO NOTHING
    RETURNING id, true AS inserted
)
SELECT id, inserted FROM attempted
UNION ALL
SELECT j.id, false AS inserted
  FROM roost.jobs j
 WHERE j.unique_key = %s
   AND j.state IN ('available','executing','retryable')
   AND NOT EXISTS (SELECT 1 FROM attempted)
 LIMIT 1
"""

_PICK_AVAILABLE_FAST_PG = """
SELECT j.id
  FROM roost.jobs j
 WHERE j.state = 'available'
   AND j.queue = ANY($1::text[])
   AND j.scheduled_at <= now()
   AND j.queue NOT IN (SELECT name FROM roost.queues WHERE paused_at IS NOT NULL)
   AND (
       cardinality(j.depends_on) = 0
       OR NOT EXISTS (
           SELECT 1 FROM roost.jobs p
            WHERE p.id = ANY(j.depends_on)
              AND p.state <> 'completed'
       )
   )
 ORDER BY j.priority ASC, j.scheduled_at ASC, j.id ASC
 FOR UPDATE SKIP LOCKED
 LIMIT $2
"""

_PICK_AVAILABLE_THROTTLED_PG = """
WITH limits AS (
    SELECT t.task,
           t.rate_per_minute,
           t.max_concurrency
      FROM unnest($3::text[], $4::int[], $5::int[])
        AS t(task, rate_per_minute, max_concurrency)
), candidates AS (
    SELECT j.id,
           j.task,
           j.priority,
           j.scheduled_at,
           l.rate_per_minute,
           l.max_concurrency,
           (SELECT COUNT(*) FROM roost.jobs e
             WHERE e.task = j.task AND e.state = 'executing') AS exec_now,
           (SELECT COUNT(*) FROM roost.jobs r
             WHERE r.task = j.task
               AND r.attempted_at >= now() - interval '1 minute') AS rate_now
      FROM roost.jobs j
      LEFT JOIN limits l ON l.task = j.task
     WHERE j.state = 'available'
       AND j.queue = ANY($1::text[])
       AND j.scheduled_at <= now()
       AND j.queue NOT IN (SELECT name FROM roost.queues WHERE paused_at IS NOT NULL)
       AND (
           cardinality(j.depends_on) = 0
           OR NOT EXISTS (
               SELECT 1 FROM roost.jobs p
                WHERE p.id = ANY(j.depends_on)
                  AND p.state <> 'completed'
           )
       )
), ranked AS (
    SELECT *,
           ROW_NUMBER() OVER (PARTITION BY task ORDER BY priority, scheduled_at, id) AS rn_task
      FROM candidates
), allowed AS (
    SELECT id, priority, scheduled_at
      FROM ranked
     WHERE (max_concurrency IS NULL OR exec_now + rn_task <= max_concurrency)
       AND (rate_per_minute IS NULL OR rate_now + rn_task <= rate_per_minute)
)
SELECT j.id
  FROM roost.jobs j
  JOIN allowed a ON a.id = j.id
 WHERE j.state = 'available'
 ORDER BY j.priority ASC, j.scheduled_at ASC, j.id ASC
 FOR UPDATE SKIP LOCKED
 LIMIT $2
"""

# Two-step claim: pick ids with FOR UPDATE SKIP LOCKED, then UPDATE by ids.
# Splitting SELECT from UPDATE keeps the contention surface tiny — the
# UPDATE no longer scans the queue, just hits the picked ids. The locks
# taken by FOR UPDATE are held by the outer transaction until we commit,
# so the UPDATE is race-safe.
_CLAIM_BY_IDS_PG = """
UPDATE roost.jobs
   SET state = 'executing',
       attempt = attempt + 1,
       attempted_at = now(),
       metadata = CASE WHEN $2 = '' THEN metadata
                       ELSE metadata || jsonb_build_object('worker_id', $2::text)
                  END
 WHERE id = ANY($1::bigint[])
RETURNING *
"""

_MARK_COMPLETED_PG = """
UPDATE roost.jobs
   SET state = 'completed',
       completed_at = now(),
       result = COALESCE($2::jsonb, result)
 WHERE id = $1
"""

_MARK_RETRYABLE_PG = """
UPDATE roost.jobs
   SET state = 'retryable',
       scheduled_at = $2,
       errors = (
           SELECT COALESCE(jsonb_agg(e ORDER BY ord), '[]'::jsonb) FROM (
               SELECT e, ord FROM jsonb_array_elements(errors || $3::jsonb)
                   WITH ORDINALITY AS t(e, ord)
                ORDER BY ord
               OFFSET GREATEST(
                   0,
                   jsonb_array_length(errors || $3::jsonb) - $4::int
               )
           ) trimmed(e, ord)
       )
 WHERE id = $1
"""

_MARK_DISCARDED_PG = """
UPDATE roost.jobs
   SET state = 'discarded',
       discarded_at = now(),
       errors = (
           SELECT COALESCE(jsonb_agg(e ORDER BY ord), '[]'::jsonb) FROM (
               SELECT e, ord FROM jsonb_array_elements(errors || $2::jsonb)
                   WITH ORDINALITY AS t(e, ord)
                ORDER BY ord
               OFFSET GREATEST(
                   0,
                   jsonb_array_length(errors || $2::jsonb) - $3::int
               )
           ) trimmed(e, ord)
       )
 WHERE id = $1
"""

_RESET_TO_AVAILABLE_PG = """
UPDATE roost.jobs
   SET state = 'available',
       scheduled_at = $2
 WHERE id = $1
"""

_SNOOZE_PG = """
UPDATE roost.jobs
   SET state = 'available',
       scheduled_at = $2,
       attempt = GREATEST(attempt - 1, 0)
 WHERE id = $1
"""

_PROMOTE_RETRYABLE_PG = """
UPDATE roost.jobs
   SET state = 'available'
 WHERE state = 'retryable'
   AND scheduled_at <= now()
"""

_RETRY_BY_ID_PG = """
UPDATE roost.jobs
   SET state = 'available',
       scheduled_at = now()
 WHERE id = $1
   AND state IN ('retryable', 'discarded', 'cancelled', 'completed')
RETURNING id
"""

_CANCEL_BY_ID_PG = """
UPDATE roost.jobs
   SET state = 'cancelled',
       cancelled_at = now()
 WHERE id = $1
   AND state IN ('available', 'retryable', 'executing')
RETURNING id
"""

_STATUS_COUNTS_PG = """
SELECT queue, state, COUNT(*)::bigint AS n
  FROM roost.jobs
 GROUP BY queue, state
 ORDER BY queue, state
"""

_REAP_ORPHANS_PG = """
WITH stale AS (
    SELECT id
      FROM roost.jobs
     WHERE state = 'executing'
       AND attempted_at < now() - ($1::interval)
     FOR UPDATE SKIP LOCKED
)
UPDATE roost.jobs j
   SET state = CASE WHEN j.attempt >= j.max_attempts THEN 'discarded' ELSE 'retryable' END,
       discarded_at = CASE WHEN j.attempt >= j.max_attempts THEN now() ELSE j.discarded_at END,
       scheduled_at = CASE WHEN j.attempt >= j.max_attempts THEN j.scheduled_at ELSE now() END,
       errors = j.errors || $2::jsonb
  FROM stale
 WHERE j.id = stale.id
RETURNING j.id, j.state
"""

_HEARTBEAT_UPSERT_PG = """
INSERT INTO roost.workers (id, hostname, pid, queues, last_seen_at, metadata)
VALUES ($1, $2, $3, $4, now(), $5::jsonb)
ON CONFLICT (id) DO UPDATE
   SET last_seen_at = now(),
       queues = EXCLUDED.queues,
       metadata = EXCLUDED.metadata
"""

_WORKER_DEREGISTER_PG = """
DELETE FROM roost.workers WHERE id = $1
"""

_WORKER_GC_PG = """
DELETE FROM roost.workers
 WHERE last_seen_at < now() - ($1::interval)
"""

_LIST_WORKERS_PG = """
SELECT id, hostname, pid, queues, started_at, last_seen_at, metadata
  FROM roost.workers
 ORDER BY last_seen_at DESC
"""

_QUEUE_PAUSE_PG = """
INSERT INTO roost.queues (name, paused_at, updated_at)
VALUES ($1, now(), now())
ON CONFLICT (name) DO UPDATE
   SET paused_at = COALESCE(roost.queues.paused_at, EXCLUDED.paused_at),
       updated_at = now()
"""

_QUEUE_RESUME_PG = """
INSERT INTO roost.queues (name, paused_at, updated_at)
VALUES ($1, NULL, now())
ON CONFLICT (name) DO UPDATE
   SET paused_at = NULL,
       updated_at = now()
"""

_QUEUE_LIST_PG = """
SELECT name, paused_at, metadata, updated_at
  FROM roost.queues
 ORDER BY name
"""

_REQUEST_CANCEL_PG = """
UPDATE roost.jobs
   SET cancel_requested = true
 WHERE id = $1
   AND state IN ('available', 'retryable', 'executing')
RETURNING id, state
"""

_REQUEUE_DISCARDED_PG = """
UPDATE roost.jobs
   SET state = 'available',
       scheduled_at = now(),
       attempt = 0,
       cancel_requested = false
 WHERE state = 'discarded'
"""

_BULK_INSERT_PG = f"""
INSERT INTO roost.jobs ({_ENQUEUE_BASE_COLS})
VALUES (
    $1, $2::jsonb, $3, $4, $5,
    COALESCE($6::timestamptz, now()),
    $7, $8::text[], $9, $10::bigint[], $11::jsonb
)
ON CONFLICT (unique_key)
  WHERE unique_key IS NOT NULL AND state IN ('available','executing','retryable')
DO NOTHING
"""

_ARCHIVE_TERMINAL_PG = """
WITH moved AS (
    DELETE FROM roost.jobs
     WHERE state IN ('completed', 'discarded', 'cancelled')
       AND COALESCE(completed_at, discarded_at, cancelled_at) < now() - ($1::interval)
    RETURNING *
)
INSERT INTO roost.jobs_archive (
    id, queue, task, args, state, priority, attempt, max_attempts,
    scheduled_at, attempted_at, completed_at, cancelled_at, discarded_at,
    errors, unique_key, inserted_at, tags, timeout_seconds, cancel_requested,
    result, depends_on, metadata
)
SELECT
    id, queue, task, args, state, priority, attempt, max_attempts,
    scheduled_at, attempted_at, completed_at, cancelled_at, discarded_at,
    errors, unique_key, inserted_at, tags, timeout_seconds, cancel_requested,
    result, depends_on, metadata
  FROM moved
RETURNING id
"""

_CANCEL_BLOCKED_DEPENDENTS_PG = """
WITH blocked AS (
    SELECT j.id
      FROM roost.jobs j
     WHERE j.state = 'available'
       AND cardinality(j.depends_on) > 0
       AND EXISTS (
           SELECT 1 FROM roost.jobs p
            WHERE p.id = ANY(j.depends_on)
              AND p.state IN ('discarded', 'cancelled')
       )
     FOR UPDATE SKIP LOCKED
)
UPDATE roost.jobs j
   SET state = 'cancelled',
       cancelled_at = now(),
       errors = j.errors || $1::jsonb
  FROM blocked
 WHERE j.id = blocked.id
RETURNING j.id
"""

# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------


def _utcnow() -> datetime:
    return datetime.now(tz=timezone.utc)


def _coerce_scheduled_at(scheduled_at: datetime | None) -> datetime | None:
    if scheduled_at is None:
        return None
    if scheduled_at.tzinfo is None:
        return scheduled_at.replace(tzinfo=timezone.utc)
    return scheduled_at


def _args_dict(args: dict[str, Any] | None) -> dict[str, Any]:
    return args or {}


def _args_json(args: dict[str, Any] | None) -> str:
    return json.dumps(args or {}, default=str, separators=(",", ":"))


def _record_to_job(record: Any) -> Job:
    """Convert an asyncpg Record or psycopg row mapping to a Job."""
    data = dict(record)
    return Job.model_validate(data)


# ---------------------------------------------------------------------------
# Async (asyncpg)
# ---------------------------------------------------------------------------


async def init_connection(conn: asyncpg.Connection) -> None:
    """Register codecs so JSONB columns round-trip as dict/list, not text.

    Call this on every asyncpg connection used to fetch from ``roost.jobs``,
    or pass it as ``init=`` when constructing a :class:`asyncpg.Pool`.

    """
    await conn.set_type_codec(
        "jsonb",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
        format="text",
    )
    await conn.set_type_codec(
        "json",
        encoder=json.dumps,
        decoder=json.loads,
        schema="pg_catalog",
        format="text",
    )


async def apply_schema_async(conn: asyncpg.Connection) -> None:
    """Bring the schema fully up to date by running pending migrations."""
    from roost._core.migrations import apply_pending_async

    await apply_pending_async(conn)


async def enqueue_async(
    conn: asyncpg.Connection,
    *,
    task: str,
    args: dict[str, Any] | None = None,
    queue: str = "default",
    priority: int = 0,
    max_attempts: int = 20,
    scheduled_at: datetime | None = None,
    unique_key: str | None = None,
    tags: list[str] | None = None,
    timeout_seconds: int | None = None,
    depends_on: list[int] | None = None,
    metadata: dict[str, Any] | None = None,
) -> int:
    """Insert a job using ``conn`` — typically the caller's transaction.

    Returns the id of the inserted (or existing, on unique conflict) job.
    """
    args_value = _args_dict(args)
    when = _coerce_scheduled_at(scheduled_at)
    tags_value = list(tags or [])
    depends_value = [int(x) for x in (depends_on or [])]
    meta_value = _args_dict(metadata)

    if unique_key is None:
        row = await conn.fetchrow(
            _INSERT_PLAIN_PG,
            task,
            args_value,
            queue,
            priority,
            max_attempts,
            when,
            None,
            tags_value,
            timeout_seconds,
            depends_value,
            meta_value,
        )
        assert row is not None
        return cast(int, row["id"])

    row = await conn.fetchrow(
        _INSERT_UNIQUE_PG,
        task,
        args_value,
        queue,
        priority,
        max_attempts,
        when,
        unique_key,
        tags_value,
        timeout_seconds,
        depends_value,
        meta_value,
    )
    assert row is not None, "unique INSERT CTE must return a row"
    return cast(int, row["id"])


[docs] @dataclass(frozen=True, slots=True) class JobInsert: """Bulk-enqueue payload.""" task: str args: dict[str, Any] | None = None queue: str = "default" priority: int = 0 max_attempts: int = 20 scheduled_at: datetime | None = None unique_key: str | None = None tags: list[str] | None = None timeout_seconds: int | None = None depends_on: list[int] | None = None metadata: dict[str, Any] | None = None
async def enqueue_many_async(conn: asyncpg.Connection, jobs: list[JobInsert]) -> int: """Bulk-insert jobs in a single round-trip via ``executemany``. Returns the number of submitted rows. The conflict policy matches :func:`enqueue_async` — duplicates against active ``unique_key`` rows are silently skipped (so the count is "submitted", not "unique inserted"). Note: ids are not returned. If you need the id back, use ``enqueue_async``. """ if not jobs: return 0 rows = [ ( j.task, _args_dict(j.args), j.queue, j.priority, j.max_attempts, _coerce_scheduled_at(j.scheduled_at), j.unique_key, list(j.tags or []), j.timeout_seconds, [int(x) for x in (j.depends_on or [])], _args_dict(j.metadata), ) for j in jobs ] await conn.executemany(_BULK_INSERT_PG, rows) return len(rows) async def fetch_available_async( conn: asyncpg.Connection, queues: list[str], limit: int, *, task_limits: dict[str, tuple[int | None, int | None]] | None = None, worker_id: str = "", ) -> list[Job]: """Pick up to ``limit`` jobs from the listed queues and mark them ``executing``. ``task_limits`` is ``{task: (rate_per_minute, max_concurrency)}`` — pass ``None`` for either field to skip that gate. Tasks not in the mapping are unrestricted. ``worker_id`` is stamped into ``metadata.worker_id`` of every claimed job so the dashboard can show "what's running on which worker". Pass ``""`` to skip the tag (e.g. test helpers). """ # Two-step claim inside a transaction so the FOR UPDATE locks survive # until we run the UPDATE. We split SELECT + UPDATE because asyncpg on # PG 15/16-alpine can silently drop rows from multi-row RETURNING in a # modifying CTE. async with conn.transaction(): if not task_limits: picked = await conn.fetch(_PICK_AVAILABLE_FAST_PG, queues, limit) else: tasks_a: list[str] = [] rates: list[int | None] = [] concs: list[int | None] = [] for task, (rate, conc) in task_limits.items(): tasks_a.append(task) rates.append(rate) concs.append(conc) picked = await conn.fetch(_PICK_AVAILABLE_THROTTLED_PG, queues, limit, tasks_a, rates, concs) if not picked: return [] ids = [int(r["id"]) for r in picked] rows = await conn.fetch(_CLAIM_BY_IDS_PG, ids, worker_id or "") return [_record_to_job(r) for r in rows] async def mark_completed_async(conn: asyncpg.Connection, job_id: int, *, result: Any | None = None) -> None: """Mark ``job_id`` completed; optionally store ``result`` in the row.""" await conn.execute(_MARK_COMPLETED_PG, job_id, result) DEFAULT_ERROR_CAP = 20 async def mark_retryable_async( conn: asyncpg.Connection, job_id: int, scheduled_at: datetime, error: dict[str, Any], *, error_cap: int = DEFAULT_ERROR_CAP, ) -> None: await conn.execute(_MARK_RETRYABLE_PG, job_id, scheduled_at, [error], error_cap) async def mark_discarded_async( conn: asyncpg.Connection, job_id: int, error: dict[str, Any], *, error_cap: int = DEFAULT_ERROR_CAP, ) -> None: await conn.execute(_MARK_DISCARDED_PG, job_id, [error], error_cap) async def reset_to_available_async(conn: asyncpg.Connection, job_id: int, scheduled_at: datetime) -> None: await conn.execute(_RESET_TO_AVAILABLE_PG, job_id, scheduled_at) async def snooze_async(conn: asyncpg.Connection, job_id: int, scheduled_at: datetime) -> None: await conn.execute(_SNOOZE_PG, job_id, scheduled_at) async def promote_retryable_async(conn: asyncpg.Connection) -> int: res = await conn.execute(_PROMOTE_RETRYABLE_PG) # asyncpg returns "UPDATE N" try: return int(res.rsplit(" ", 1)[-1]) except (ValueError, AttributeError): return 0 async def retry_job_async(conn: asyncpg.Connection, job_id: int) -> None: row = await conn.fetchrow(_RETRY_BY_ID_PG, job_id) if row is None: raise JobNotFoundError(f"job {job_id} not found or not in a retryable state") async def cancel_job_async(conn: asyncpg.Connection, job_id: int) -> None: """Mark a job cancelled and request in-flight cancellation if executing. The accompanying NOTIFY (``roost_cancel_requested``) lets running workers cancel the inflight task. If the job is ``available`` or ``retryable``, it never runs. """ async with conn.transaction(): row = await conn.fetchrow(_REQUEST_CANCEL_PG, job_id) if row is None: raise JobNotFoundError(f"job {job_id} not found or not cancellable") # If not currently executing, also flip the row to cancelled now. if row["state"] != "executing": await conn.execute(_CANCEL_BY_ID_PG, job_id) async def request_cancel_async(conn: asyncpg.Connection, job_id: int) -> str | None: """Set ``cancel_requested = true`` and return the current state. Returns ``None`` if the job is already terminal. """ row = await conn.fetchrow(_REQUEST_CANCEL_PG, job_id) if row is None: return None return cast(str, row["state"]) async def finalize_cancel_async(conn: asyncpg.Connection, job_id: int) -> None: """Move an in-flight job into the ``cancelled`` terminal state.""" await conn.execute(_CANCEL_BY_ID_PG, job_id) async def status_counts_async( conn: asyncpg.Connection, ) -> list[tuple[str, str, int]]: rows = await conn.fetch(_STATUS_COUNTS_PG) return [(r["queue"], r["state"], int(r["n"])) for r in rows] async def pause_queue_async(conn: asyncpg.Connection, name: str) -> None: await conn.execute(_QUEUE_PAUSE_PG, name) async def resume_queue_async(conn: asyncpg.Connection, name: str) -> None: await conn.execute(_QUEUE_RESUME_PG, name) async def list_queues_async( conn: asyncpg.Connection, ) -> list[tuple[str, datetime | None]]: rows = await conn.fetch(_QUEUE_LIST_PG) return [(r["name"], r["paused_at"]) for r in rows] async def list_workers_async(conn: asyncpg.Connection) -> list[dict[str, Any]]: rows = await conn.fetch(_LIST_WORKERS_PG) return [dict(r) for r in rows] async def requeue_discarded_async(conn: asyncpg.Connection) -> int: res = await conn.execute(_REQUEUE_DISCARDED_PG) try: return int(res.rsplit(" ", 1)[-1]) except (ValueError, AttributeError): return 0 async def cron_try_lock_async(conn: asyncpg.Connection, key: int) -> bool: val = await conn.fetchval("SELECT pg_try_advisory_lock($1)", key) return bool(val) async def explain_job_async(conn: asyncpg.Connection, job_id: int) -> dict[str, Any]: """Diagnose why a job hasn't started yet. Returns a dict with the gates Roost evaluates at fetch time, each annotated with whether it currently blocks the job. The dashboard's "why is this stuck?" panel renders this; programmatic callers can use it to surface helpful error messages. Keys: ``state`` — current job state (only ``available`` jobs are meaningfully diagnosable; others get ``state_terminal=True``). ``scheduled_in_future`` — true if ``scheduled_at > now()``. ``queue_paused`` — true if the queue has a row in ``roost.queues`` with ``paused_at IS NOT NULL``. ``waiting_on_parents`` — list of parent ids that haven't completed. ``rate_limited`` / ``concurrency_limited`` — set when the worker's registry includes per-task limits and the current count meets them. Note: this helper doesn't know the registry, so it returns the raw counts; callers join in the limits. """ row = await conn.fetchrow( """ SELECT j.id, j.task, j.queue, j.state, j.scheduled_at, j.depends_on, (j.scheduled_at > now()) AS scheduled_in_future, EXISTS( SELECT 1 FROM roost.queues q WHERE q.name = j.queue AND q.paused_at IS NOT NULL ) AS queue_paused, ( SELECT COALESCE(array_agg(p.id ORDER BY p.id), ARRAY[]::bigint[]) FROM roost.jobs p WHERE p.id = ANY(j.depends_on) AND p.state <> 'completed' ) AS waiting_on_parents, ( SELECT COUNT(*) FROM roost.jobs jt WHERE jt.task = j.task AND jt.state = 'executing' ) AS executing_count, ( SELECT COUNT(*) FROM roost.jobs jt WHERE jt.task = j.task AND jt.attempted_at >= now() - interval '1 minute' ) AS attempted_last_minute FROM roost.jobs j WHERE j.id = $1 """, job_id, ) if row is None: return {"found": False} waiting = list(row["waiting_on_parents"] or []) return { "found": True, "id": int(row["id"]), "task": row["task"], "queue": row["queue"], "state": row["state"], "state_terminal": row["state"] in {"completed", "discarded", "cancelled"}, "scheduled_at": row["scheduled_at"], "scheduled_in_future": bool(row["scheduled_in_future"]), "queue_paused": bool(row["queue_paused"]), "waiting_on_parents": [int(p) for p in waiting], "executing_count": int(row["executing_count"]), "attempted_last_minute": int(row["attempted_last_minute"]), } async def list_cron_runs_async(conn: asyncpg.Connection) -> list[dict[str, Any]]: """Return the cron entries we've ever fired, with their last-run timestamps. Joins to ``roost.jobs`` to count past invocations per entry — useful in the dashboard to confirm a cron is actually firing. """ rows = await conn.fetch( """ SELECT cr.name, cr.last_run_at, ( SELECT COUNT(*) FROM roost.jobs j WHERE j.unique_key LIKE 'cron:' || cr.name || ':%' ) AS run_count, ( SELECT j.state FROM roost.jobs j WHERE j.unique_key LIKE 'cron:' || cr.name || ':%' ORDER BY j.id DESC LIMIT 1 ) AS last_state FROM roost.cron_runs cr ORDER BY cr.name """ ) return [dict(r) for r in rows] async def list_jobs_for_worker_async( conn: asyncpg.Connection, worker_id: str, limit: int = 20 ) -> list[dict[str, Any]]: """Recent jobs that are/were running on the named worker. Uses ``metadata->>'worker_id'`` if the worker was tagged, else falls back to a count of currently-executing rows. The worker tags itself in metadata when it picks a job up (see worker.py). """ rows = await conn.fetch( """ SELECT id, queue, task, state, attempt, max_attempts, attempted_at, completed_at, errors FROM roost.jobs WHERE metadata->>'worker_id' = $1 ORDER BY id DESC LIMIT $2 """, worker_id, limit, ) return [dict(r) for r in rows] async def throughput_buckets_async( conn: asyncpg.Connection, *, minutes: int = 60, bucket_seconds: int = 60 ) -> list[dict[str, Any]]: """Counts of completed/discarded/cancelled jobs per ``bucket_seconds``-wide bucket. Used for the overview sparkline. Returns one row per bucket between now and ``now() - minutes minutes``, even if zero jobs landed in it. """ rows = await conn.fetch( """ WITH spans AS ( SELECT generate_series( date_trunc('second', now()) - ($1::int * interval '1 minute'), date_trunc('second', now()), ($2::int * interval '1 second') ) AS bucket_start ) SELECT s.bucket_start, (SELECT COUNT(*) FROM roost.jobs j WHERE j.completed_at >= s.bucket_start AND j.completed_at < s.bucket_start + ($2::int * interval '1 second') AND j.state = 'completed') AS completed, (SELECT COUNT(*) FROM roost.jobs j WHERE j.discarded_at >= s.bucket_start AND j.discarded_at < s.bucket_start + ($2::int * interval '1 second')) AS discarded FROM spans s ORDER BY s.bucket_start """, minutes, bucket_seconds, ) return [dict(r) for r in rows] async def cron_unlock_async(conn: asyncpg.Connection, key: int) -> None: await conn.execute("SELECT pg_advisory_unlock($1)", key) async def clear_old_results_async(conn: asyncpg.Connection, *, older_than_seconds: float) -> int: """Null out ``result`` for completed jobs older than the cutoff. Useful as a privacy/space hygiene measure when handlers store sensitive or large return values. The row stays — only ``result`` is cleared. """ interval = timedelta(seconds=max(older_than_seconds, 0.0)) res = await conn.execute( """ UPDATE roost.jobs SET result = NULL WHERE state = 'completed' AND result IS NOT NULL AND completed_at < now() - ($1::interval) """, interval, ) try: return int(res.rsplit(" ", 1)[-1]) except (ValueError, AttributeError): return 0 async def archive_terminal_async(conn: asyncpg.Connection, *, older_than_seconds: float) -> int: """Move ``completed/discarded/cancelled`` jobs older than the cutoff to the archive table. Returns the count of moved rows. """ interval = timedelta(seconds=max(older_than_seconds, 0.0)) rows = await conn.fetch(_ARCHIVE_TERMINAL_PG, interval) return len(rows) async def cancel_blocked_dependents_async( conn: asyncpg.Connection, ) -> list[int]: """Cancel jobs whose parents ended in ``discarded`` or ``cancelled``. Returns the list of cancelled ids. """ error_payload = [ { "at": _utcnow().isoformat(), "error": "BlockedDependency: a parent job ended in a non-completed state", "trace": "", } ] rows = await conn.fetch(_CANCEL_BLOCKED_DEPENDENTS_PG, error_payload) return [int(r["id"]) for r in rows] async def reap_orphans_async( conn: asyncpg.Connection, *, stale_after_seconds: float ) -> list[tuple[int, str]]: """Recover jobs stuck in ``executing`` past the staleness window. Returns ``[(job_id, new_state), …]``. Jobs whose attempt count is at ``max_attempts`` go to ``discarded``; the rest go to ``retryable`` and are scheduled to run again immediately. """ interval = timedelta(seconds=max(stale_after_seconds, 0.0)) error_payload = [ { "at": _utcnow().isoformat(), "error": "WorkerCrash: job left in executing state past staleness window", "trace": "", } ] rows = await conn.fetch(_REAP_ORPHANS_PG, interval, error_payload) return [(int(r["id"]), str(r["state"])) for r in rows] async def heartbeat_async( conn: asyncpg.Connection, *, worker_id: str, hostname: str, pid: int, queues: list[str], metadata: dict[str, Any] | None = None, ) -> None: await conn.execute( _HEARTBEAT_UPSERT_PG, worker_id, hostname, pid, queues, metadata or {}, ) async def deregister_worker_async(conn: asyncpg.Connection, worker_id: str) -> None: await conn.execute(_WORKER_DEREGISTER_PG, worker_id) async def gc_workers_async(conn: asyncpg.Connection, *, stale_after_seconds: float) -> int: interval = timedelta(seconds=max(stale_after_seconds, 0.0)) res = await conn.execute(_WORKER_GC_PG, interval) try: return int(res.rsplit(" ", 1)[-1]) except (ValueError, AttributeError): return 0 async def cron_should_run_async(conn: asyncpg.Connection, name: str, due_at: datetime) -> bool: """Atomically claim the next due slot for a cron entry. Returns True iff the caller should enqueue a job for ``due_at``. Uses an UPSERT against ``roost.cron_runs`` keyed on ``name`` and bumps ``last_run_at`` to ``due_at`` only when the existing value is older. """ row = await conn.fetchrow( """ INSERT INTO roost.cron_runs (name, last_run_at) VALUES ($1, $2) ON CONFLICT (name) DO UPDATE SET last_run_at = EXCLUDED.last_run_at WHERE roost.cron_runs.last_run_at < EXCLUDED.last_run_at RETURNING last_run_at """, name, due_at, ) return row is not None # --------------------------------------------------------------------------- # Sync (psycopg) # --------------------------------------------------------------------------- def apply_schema_sync(conn: psycopg.Connection[Any]) -> None: """Bring the schema fully up to date by running pending migrations.""" from roost._core.migrations import apply_pending_sync apply_pending_sync(conn) def enqueue_sync( conn: psycopg.Connection[Any], *, task: str, args: dict[str, Any] | None = None, queue: str = "default", priority: int = 0, max_attempts: int = 20, scheduled_at: datetime | None = None, unique_key: str | None = None, tags: list[str] | None = None, timeout_seconds: int | None = None, depends_on: list[int] | None = None, metadata: dict[str, Any] | None = None, ) -> int: args_json = _args_json(args) when = _coerce_scheduled_at(scheduled_at) tags_value = list(tags or []) depends_value = [int(x) for x in (depends_on or [])] meta_json = _args_json(metadata) with conn.cursor() as cur: if unique_key is None: cur.execute( _INSERT_PLAIN_PSY, ( task, args_json, queue, priority, max_attempts, when, None, tags_value, timeout_seconds, depends_value, meta_json, ), ) row = cur.fetchone() assert row is not None return cast(int, row[0]) cur.execute( _INSERT_UNIQUE_PSY, ( task, args_json, queue, priority, max_attempts, when, unique_key, tags_value, timeout_seconds, depends_value, meta_json, unique_key, ), ) row = cur.fetchone() assert row is not None return cast(int, row[0]) def status_counts_sync( conn: psycopg.Connection[Any], ) -> list[tuple[str, str, int]]: with conn.cursor() as cur: cur.execute(_STATUS_COUNTS_PG) return [(r[0], r[1], int(r[2])) for r in cur.fetchall()] def retry_job_sync(conn: psycopg.Connection[Any], job_id: int) -> None: with conn.cursor() as cur: cur.execute(_RETRY_BY_ID_PG.replace("$1", "%s"), (job_id,)) if cur.fetchone() is None: raise JobNotFoundError(f"job {job_id} not found or not in a retryable state") def cancel_job_sync(conn: psycopg.Connection[Any], job_id: int) -> None: with conn.cursor() as cur: cur.execute(_REQUEST_CANCEL_PG.replace("$1", "%s"), (job_id,)) row = cur.fetchone() if row is None: raise JobNotFoundError(f"job {job_id} not found or not cancellable") if row[1] != "executing": cur.execute(_CANCEL_BY_ID_PG.replace("$1", "%s"), (job_id,)) def pause_queue_sync(conn: psycopg.Connection[Any], name: str) -> None: with conn.cursor() as cur: cur.execute(_QUEUE_PAUSE_PG.replace("$1", "%s"), (name,)) def resume_queue_sync(conn: psycopg.Connection[Any], name: str) -> None: with conn.cursor() as cur: cur.execute(_QUEUE_RESUME_PG.replace("$1", "%s"), (name,)) def list_queues_sync( conn: psycopg.Connection[Any], ) -> list[tuple[str, datetime | None]]: with conn.cursor() as cur: cur.execute(_QUEUE_LIST_PG) return [(r[0], r[1]) for r in cur.fetchall()] def list_workers_sync(conn: psycopg.Connection[Any]) -> list[dict[str, Any]]: with conn.cursor() as cur: cur.execute(_LIST_WORKERS_PG) cols = [d[0] for d in cur.description] if cur.description else [] return [dict(zip(cols, r, strict=False)) for r in cur.fetchall()] def requeue_discarded_sync(conn: psycopg.Connection[Any]) -> int: with conn.cursor() as cur: cur.execute(_REQUEUE_DISCARDED_PG) return cur.rowcount or 0