API reference

Public API

Roost — Postgres-backed background job queue for Python.

Public API:

from roost import AsyncRoost, Roost, job, cron

The async path uses asyncpg; the sync path uses psycopg. They share schema and a single SQL surface defined in roost._core.repo.

class roost.AsyncRoost(dsn, *, registry=None)[source]

Bases: object

Entry point for async apps.

Lazily opens an internal asyncpg.Pool on first use. Pass conn= to enqueue() to participate in the caller’s transaction — that’s the load-bearing primitive.

Parameters:
async close()[source]
Return type:

None

async setup_schema(conn=None)[source]

Apply the migration SQL. Idempotent.

Return type:

None

Parameters:

conn (Connection | None)

async enqueue(task, *, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None, conn=None)[source]

Insert a job. Pass conn= to enqueue inside the caller’s txn.

task may be a registered task name or a function decorated with @job(...). args accepts a dict or a Pydantic model — models are dumped via model_dump() so types like UUID and datetime round-trip cleanly. metadata is an out-of-band JSONB column for trace ids / request ids / tenant ids that aren’t handler input.

Return type:

int

Parameters:
  • task (str | Callable[[...], Any])

  • args (dict[str, Any] | BaseModel | None)

  • queue (str)

  • priority (int)

  • max_attempts (int)

  • scheduled_at (datetime | None)

  • unique_key (str | None)

  • tags (list[str] | None)

  • timeout_seconds (int | None)

  • depends_on (list[int] | None)

  • metadata (dict[str, Any] | None)

  • conn (Connection | None)

async enqueue_many(jobs, *, conn=None)[source]

Bulk-insert in a single round-trip. Returns the submitted count.

Return type:

int

Parameters:
  • jobs (list[JobInsert])

  • conn (Connection | None)

async status()[source]
Return type:

list[tuple[str, str, int]]

async retry(job_id)[source]
Return type:

None

Parameters:

job_id (int)

async cancel(job_id)[source]
Return type:

None

Parameters:

job_id (int)

async pause_queue(name)[source]
Return type:

None

Parameters:

name (str)

async resume_queue(name)[source]
Return type:

None

Parameters:

name (str)

async list_queues()[source]
Return type:

list[tuple[str, datetime | None]]

async list_workers()[source]
Return type:

list[dict[str, Any]]

async requeue_discarded()[source]
Return type:

int

async wait_for(job_id, *, timeout=30.0, poll_interval=1.0, raise_on_failure=True)[source]

Block until job_id reaches a terminal state.

Returns a roost.JobOutcome. By default raises roost.JobFailed when the job ended in discarded or cancelled (set raise_on_failure=False to suppress).

Return type:

Any

Parameters:
  • job_id (int)

  • timeout (float | None)

  • poll_interval (float)

  • raise_on_failure (bool)

worker(*, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]

Construct a Worker bound to this Roost’s DSN.

Return type:

Worker

Parameters:
  • queues (Iterable[str])

  • concurrency (int)

  • prefetch (int | None)

  • poll_interval (float)

  • retry_strategy (BackoffStrategy | None)

  • run_cron (bool)

  • heartbeat_interval (float)

  • orphan_reaper_interval (float)

  • orphan_stale_after (float)

  • shutdown_timeout (float)

  • listen_reconnect_delay (float)

  • error_cap (int)

  • archive_after_seconds (float | None)

  • archive_interval (float)

  • result_ttl_seconds (float | None)

  • startup_max_retries (int)

  • startup_retry_delay (float)

  • hooks (Any | None)

class roost.BackoffStrategy(*args, **kwargs)[source]

Bases: Protocol

Maps an attempt number (1-indexed) to seconds-until-next-attempt.

class roost.CronEntry(name, expression, task, args=<factory>, queue='default', priority=0, max_attempts=20, timezone_name=None)[source]

Bases: object

Parameters:
  • name (str)

  • expression (str)

  • task (str)

  • args (dict[str, Any])

  • queue (str)

  • priority (int)

  • max_attempts (int)

  • timezone_name (str | None)

name: str
expression: str
task: str
args: dict[str, Any]
queue: str = 'default'
priority: int = 0
max_attempts: int = 20
timezone_name: str | None = None
next_after(now)[source]
Return type:

datetime

Parameters:

now (datetime)

previous_or_at(now)[source]
Return type:

datetime

Parameters:

now (datetime)

exception roost.DuplicateUniqueJobError[source]

Bases: RoostError

Raised when an enqueue conflicts with an active job sharing the same unique_key.

code: ClassVar[str] = 'roost.duplicate-unique-job'
class roost.HandlerRegistry[source]

Bases: object

register(name, func, *, args_model=None, defaults=None)[source]
Return type:

None

Parameters:
  • name (str)

  • func (Callable[[...], Any])

  • args_model (type[BaseModel] | None)

  • defaults (TaskDefaults | None)

get(name)[source]
Return type:

HandlerSpec | None

Parameters:

name (str)

specs()[source]

Return every registered HandlerSpec, sorted by name.

Useful for building admin UIs or generating manifests:

for spec in roost_handlers.specs():
    print(spec.name, spec.defaults.queue, spec.args_model)
Return type:

list[HandlerSpec]

names()[source]
Return type:

list[str]

clear()[source]
Return type:

None

class roost.Hooks(before_job=None, after_job=None)[source]

Bases: object

Bundle of optional hooks invoked around every handler call.

Both hooks are async. The signature is (job, *, ctx) for before_job and (job, *, result, error, ctx) for after_job. Either can be omitted; only set the ones you need.

Parameters:
  • before_job (Callable[[...], Awaitable[None]] | None)

  • after_job (Callable[[...], Awaitable[None]] | None)

before_job: Callable[[...], Awaitable[None]] | None = None
after_job: Callable[[...], Awaitable[None]] | None = None
class roost.Job(**data)[source]

Bases: BaseModel

A row from roost.jobs.

Public read-only view; created internally by the repo. Users construct their own typed argument models and pass them through args.

Parameters:
  • id (int)

  • queue (str)

  • task (str)

  • args (dict[str, Any])

  • state (str)

  • priority (int)

  • attempt (int)

  • max_attempts (int)

  • scheduled_at (datetime)

  • attempted_at (datetime | None)

  • completed_at (datetime | None)

  • cancelled_at (datetime | None)

  • discarded_at (datetime | None)

  • errors (list[dict[str, Any]])

  • unique_key (str | None)

  • inserted_at (datetime)

  • tags (list[str])

  • timeout_seconds (int | None)

  • cancel_requested (bool)

  • result (Any | None)

  • depends_on (list[int])

  • metadata (dict[str, Any])

model_config: ClassVar[ConfigDict] = {'frozen': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

id: int
queue: str
task: str
args: dict[str, Any]
state: str
priority: int
attempt: int
max_attempts: int
scheduled_at: datetime
attempted_at: datetime | None
completed_at: datetime | None
cancelled_at: datetime | None
discarded_at: datetime | None
errors: list[dict[str, Any]]
unique_key: str | None
inserted_at: datetime
tags: list[str]
timeout_seconds: int | None
cancel_requested: bool
result: Any | None
depends_on: list[int]
metadata: dict[str, Any]
exception roost.JobFailed(job_id, state, errors=None)[source]

Bases: RuntimeError

Raised when the awaited job ended in discarded or cancelled.

Parameters:
  • job_id (int)

  • state (str)

  • errors (list[dict[str, Any]] | None)

code: ClassVar[str] = 'roost.job-failed'
class roost.JobInsert(task, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None)[source]

Bases: object

Bulk-enqueue payload.

Parameters:
  • task (str)

  • args (dict[str, Any] | None)

  • queue (str)

  • priority (int)

  • max_attempts (int)

  • scheduled_at (datetime | None)

  • unique_key (str | None)

  • tags (list[str] | None)

  • timeout_seconds (int | None)

  • depends_on (list[int] | None)

  • metadata (dict[str, Any] | None)

task: str
args: dict[str, Any] | None
queue: str
priority: int
max_attempts: int
scheduled_at: datetime | None
unique_key: str | None
tags: list[str] | None
timeout_seconds: int | None
depends_on: list[int] | None
metadata: dict[str, Any] | None
exception roost.JobNotFoundError[source]

Bases: RoostError

Raised when an admin operation targets a job_id that does not exist.

code: ClassVar[str] = 'roost.job-not-found'
class roost.JobOutcome(id, state, result, errors)[source]

Bases: object

Parameters:
  • id (int)

  • state (str)

  • result (Any | None)

  • errors (list[dict[str, Any]])

id: int
state: str
result: Any | None
errors: list[dict[str, Any]]
class roost.JobState(*values)[source]

Bases: str, Enum

AVAILABLE = 'available'
EXECUTING = 'executing'
RETRYABLE = 'retryable'
COMPLETED = 'completed'
DISCARDED = 'discarded'
CANCELLED = 'cancelled'
exception roost.JobTimeoutError[source]

Bases: TimeoutError

Raised when wait_for_async exceeds its timeout.

code: ClassVar[str] = 'roost.job-timeout'
class roost.Roost(dsn, *, registry=None)[source]

Bases: object

Synchronous entry point.

Most users only call enqueue() from this class — workers run via the CLI or roost.AsyncRoost.worker (the worker loop itself is asyncio-based; sync code can drive it via asyncio.run).

Parameters:
setup_schema(conn=None)[source]
Return type:

None

Parameters:

conn (Connection[Any] | None)

enqueue(task, *, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None, conn=None)[source]
Return type:

int

Parameters:
  • task (str | Callable[[...], Any])

  • args (dict[str, Any] | BaseModel | None)

  • queue (str)

  • priority (int)

  • max_attempts (int)

  • scheduled_at (datetime | None)

  • unique_key (str | None)

  • tags (list[str] | None)

  • timeout_seconds (int | None)

  • depends_on (list[int] | None)

  • metadata (dict[str, Any] | None)

  • conn (Connection[Any] | None)

status()[source]
Return type:

list[tuple[str, str, int]]

retry(job_id)[source]
Return type:

None

Parameters:

job_id (int)

cancel(job_id)[source]
Return type:

None

Parameters:

job_id (int)

pause_queue(name)[source]
Return type:

None

Parameters:

name (str)

resume_queue(name)[source]
Return type:

None

Parameters:

name (str)

list_queues()[source]
Return type:

list[tuple[str, datetime | None]]

list_workers()[source]
Return type:

list[dict[str, Any]]

requeue_discarded()[source]
Return type:

int

exception roost.RoostError[source]

Bases: Exception

Base class for all Roost errors. Carries a stable code.

code: ClassVar[str] = 'roost.error'
exception roost.SnoozeJob(seconds)[source]

Bases: RoostError

Raise inside a job handler to reschedule the job without counting it as a failure.

Parameters:

seconds (float)

Return type:

None

code: ClassVar[str] = 'roost.snooze-job'
class roost.TaskDefaults(queue=None, priority=None, max_attempts=None, tags=None, timeout_seconds=None, rate_per_minute=None, max_concurrency=None)[source]

Bases: object

Per-task enqueue defaults declared on @job(...).

The first five (queuetimeout_seconds) are merged into enqueue calls — explicit kwargs always win.

The throttling fields (rate_per_minute, max_concurrency) are enforced at fetch time by the worker. Workers read the registry and pass the limits into the fetch SQL.

Parameters:
  • queue (str | None)

  • priority (int | None)

  • max_attempts (int | None)

  • tags (tuple[str, ...] | None)

  • timeout_seconds (int | None)

  • rate_per_minute (int | None)

  • max_concurrency (int | None)

queue: str | None = None
priority: int | None = None
max_attempts: int | None = None
tags: tuple[str, ...] | None = None
timeout_seconds: int | None = None
rate_per_minute: int | None = None
max_concurrency: int | None = None
exception roost.UnknownTaskError[source]

Bases: RoostError

Raised when a worker pulls a job whose task name is not in the registry.

code: ClassVar[str] = 'roost.unknown-task'
class roost.Worker(dsn, *, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, registry=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]

Bases: object

A single-process Roost worker.

Multiple workers can run against the same database — concurrency control is enforced by FOR UPDATE SKIP LOCKED at the SQL level.

Parameters:
  • dsn (str)

  • queues (Iterable[str])

  • concurrency (int)

  • prefetch (int | None)

  • poll_interval (float)

  • retry_strategy (BackoffStrategy | None)

  • registry (HandlerRegistry | None)

  • run_cron (bool)

  • heartbeat_interval (float)

  • orphan_reaper_interval (float)

  • orphan_stale_after (float)

  • shutdown_timeout (float)

  • listen_reconnect_delay (float)

  • error_cap (int)

  • archive_after_seconds (float | None)

  • archive_interval (float)

  • result_ttl_seconds (float | None)

  • startup_max_retries (int)

  • startup_retry_delay (float)

  • hooks (Hooks | None)

async run()[source]
Return type:

None

async run_once()[source]

Drain every currently-available job and exit. Returns count processed.

Skips the cron scheduler, archive loop, and orphan reaper — this is intended for one-shot invocations (CI smokes, cron-style runs of roost run --once, programmatic test helpers).

Heartbeats and the listen connection still come up briefly; the worker still claims rows via FOR UPDATE SKIP LOCKED so it’s safe to run alongside other workers.

Return type:

int

request_stop()[source]
Return type:

None

install_signal_handlers(loop)[source]
Return type:

None

Parameters:

loop (AbstractEventLoop)

exception roost.WorkerShutdown[source]

Bases: RoostError

Raised internally to interrupt the worker loop on graceful shutdown.

code: ClassVar[str] = 'roost.worker-shutdown'
roost.cron(expression, *, name=None, queue='default', args=None, priority=0, max_attempts=20, timezone=None, handler_registry=None)[source]

Register a function as a cron handler under expression.

timezone accepts an IANA name ("America/Los_Angeles", "Europe/Berlin"). Defaults to UTC. The cron expression is then interpreted in that local timezone, including DST.

Return type:

Callable[[TypeVar(F, bound= Callable[..., Any])], TypeVar(F, bound= Callable[..., Any])]

Parameters:
  • expression (str)

  • name (str | None)

  • queue (str)

  • args (dict[str, Any] | None)

  • priority (int)

  • max_attempts (int)

  • timezone (str | None)

  • handler_registry (HandlerRegistry | None)

roost.exponential(base=2.0, *, jitter=True, cap=86400)[source]

base ** attempt seconds, optionally jittered, capped at cap.

Defaults to Oban’s behavior: 2, 4, 8, 16, capped at one day.

Return type:

BackoffStrategy

Parameters:
  • base (float)

  • jitter (bool)

  • cap (float)

roost.fixed(seconds=60.0)[source]

Always wait seconds.

Return type:

BackoffStrategy

Parameters:

seconds (float)

roost.job(name, *, args_model=None, queue=None, priority=None, max_attempts=None, tags=None, timeout_seconds=None, rate_per_minute=None, max_concurrency=None, registry=None)[source]

Register func as the handler for the task name.

Per-task defaults (queue, priority, max_attempts, tags, timeout_seconds) are applied to every enqueue of this task unless the caller passes an explicit kwarg.

Pass args_model= (a Pydantic model) to validate enqueued args at handler-call time.

The decorated function is returned untouched — it can still be called directly in tests.

Return type:

Callable[[TypeVar(F, bound= Callable[..., Any])], TypeVar(F, bound= Callable[..., Any])]

Parameters:
  • name (str)

  • args_model (type[BaseModel] | None)

  • queue (str | None)

  • priority (int | None)

  • max_attempts (int | None)

  • tags (list[str] | tuple[str, ...] | None)

  • timeout_seconds (int | None)

  • rate_per_minute (int | None)

  • max_concurrency (int | None)

  • registry (HandlerRegistry | None)

roost.linear(step=60.0, *, jitter=False)[source]

Constant linear growth: step * attempt seconds.

Return type:

BackoffStrategy

Parameters:
  • step (float)

  • jitter (bool)

Worker

class roost.Worker(dsn, *, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, registry=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]

A single-process Roost worker.

Multiple workers can run against the same database — concurrency control is enforced by FOR UPDATE SKIP LOCKED at the SQL level.

Parameters:
  • dsn (str)

  • queues (Iterable[str])

  • concurrency (int)

  • prefetch (int | None)

  • poll_interval (float)

  • retry_strategy (BackoffStrategy | None)

  • registry (HandlerRegistry | None)

  • run_cron (bool)

  • heartbeat_interval (float)

  • orphan_reaper_interval (float)

  • orphan_stale_after (float)

  • shutdown_timeout (float)

  • listen_reconnect_delay (float)

  • error_cap (int)

  • archive_after_seconds (float | None)

  • archive_interval (float)

  • result_ttl_seconds (float | None)

  • startup_max_retries (int)

  • startup_retry_delay (float)

  • hooks (Hooks | None)

async run()[source]
Return type:

None

async run_once()[source]

Drain every currently-available job and exit. Returns count processed.

Skips the cron scheduler, archive loop, and orphan reaper — this is intended for one-shot invocations (CI smokes, cron-style runs of roost run --once, programmatic test helpers).

Heartbeats and the listen connection still come up briefly; the worker still claims rows via FOR UPDATE SKIP LOCKED so it’s safe to run alongside other workers.

Return type:

int

request_stop()[source]
Return type:

None

install_signal_handlers(loop)[source]
Return type:

None

Parameters:

loop (AbstractEventLoop)

Exceptions

exception roost.RoostError[source]

Base class for all Roost errors. Carries a stable code.

exception roost.UnknownTaskError[source]

Raised when a worker pulls a job whose task name is not in the registry.

exception roost.JobNotFoundError[source]

Raised when an admin operation targets a job_id that does not exist.

exception roost.DuplicateUniqueJobError[source]

Raised when an enqueue conflicts with an active job sharing the same unique_key.

exception roost.SnoozeJob(seconds)[source]

Raise inside a job handler to reschedule the job without counting it as a failure.

Parameters:

seconds (float)

Return type:

None

exception roost.WorkerShutdown[source]

Raised internally to interrupt the worker loop on graceful shutdown.