API reference¶
Public API¶
Roost — Postgres-backed background job queue for Python.
Public API:
from roost import AsyncRoost, Roost, job, cron
The async path uses asyncpg; the sync path uses psycopg. They share schema
and a single SQL surface defined in roost._core.repo.
- class roost.AsyncRoost(dsn, *, registry=None)[source]¶
Bases:
objectEntry point for async apps.
Lazily opens an internal
asyncpg.Poolon first use. Passconn=toenqueue()to participate in the caller’s transaction — that’s the load-bearing primitive.- Parameters:
dsn (str)
registry (HandlerRegistry | None)
- async setup_schema(conn=None)[source]¶
Apply the migration SQL. Idempotent.
- Return type:
None- Parameters:
conn (Connection | None)
- async enqueue(task, *, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None, conn=None)[source]¶
Insert a job. Pass
conn=to enqueue inside the caller’s txn.taskmay be a registered task name or a function decorated with@job(...).argsaccepts a dict or a Pydantic model — models are dumped viamodel_dump()so types likeUUIDanddatetimeround-trip cleanly.metadatais an out-of-band JSONB column for trace ids / request ids / tenant ids that aren’t handler input.- Return type:
int- Parameters:
task (str | Callable[[...], Any])
args (dict[str, Any] | BaseModel | None)
queue (str)
priority (int)
max_attempts (int)
scheduled_at (datetime | None)
unique_key (str | None)
tags (list[str] | None)
timeout_seconds (int | None)
depends_on (list[int] | None)
metadata (dict[str, Any] | None)
conn (Connection | None)
- async enqueue_many(jobs, *, conn=None)[source]¶
Bulk-insert in a single round-trip. Returns the submitted count.
- Return type:
int- Parameters:
jobs (list[JobInsert])
conn (Connection | None)
- async wait_for(job_id, *, timeout=30.0, poll_interval=1.0, raise_on_failure=True)[source]¶
Block until
job_idreaches a terminal state.Returns a
roost.JobOutcome. By default raisesroost.JobFailedwhen the job ended indiscardedorcancelled(setraise_on_failure=Falseto suppress).- Return type:
Any- Parameters:
job_id (int)
timeout (float | None)
poll_interval (float)
raise_on_failure (bool)
- worker(*, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]¶
Construct a
Workerbound to this Roost’s DSN.- Return type:
- Parameters:
queues (Iterable[str])
concurrency (int)
prefetch (int | None)
poll_interval (float)
retry_strategy (BackoffStrategy | None)
run_cron (bool)
heartbeat_interval (float)
orphan_reaper_interval (float)
orphan_stale_after (float)
shutdown_timeout (float)
listen_reconnect_delay (float)
error_cap (int)
archive_after_seconds (float | None)
archive_interval (float)
result_ttl_seconds (float | None)
startup_max_retries (int)
startup_retry_delay (float)
hooks (Any | None)
- class roost.BackoffStrategy(*args, **kwargs)[source]¶
Bases:
ProtocolMaps an attempt number (1-indexed) to seconds-until-next-attempt.
- class roost.CronEntry(name, expression, task, args=<factory>, queue='default', priority=0, max_attempts=20, timezone_name=None)[source]¶
Bases:
object- Parameters:
name (str)
expression (str)
task (str)
args (dict[str, Any])
queue (str)
priority (int)
max_attempts (int)
timezone_name (str | None)
- name: str¶
- expression: str¶
- task: str¶
- args: dict[str, Any]¶
- queue: str = 'default'¶
- priority: int = 0¶
- max_attempts: int = 20¶
- timezone_name: str | None = None¶
- exception roost.DuplicateUniqueJobError[source]¶
Bases:
RoostErrorRaised when an enqueue conflicts with an active job sharing the same unique_key.
- code: ClassVar[str] = 'roost.duplicate-unique-job'¶
- class roost.HandlerRegistry[source]¶
Bases:
object- register(name, func, *, args_model=None, defaults=None)[source]¶
- Return type:
None- Parameters:
name (str)
func (Callable[[...], Any])
args_model (type[BaseModel] | None)
defaults (TaskDefaults | None)
- class roost.Hooks(before_job=None, after_job=None)[source]¶
Bases:
objectBundle of optional hooks invoked around every handler call.
Both hooks are async. The signature is
(job, *, ctx)forbefore_joband(job, *, result, error, ctx)forafter_job. Either can be omitted; only set the ones you need.- Parameters:
before_job (Callable[[...], Awaitable[None]] | None)
after_job (Callable[[...], Awaitable[None]] | None)
- before_job: Callable[[...], Awaitable[None]] | None = None¶
- after_job: Callable[[...], Awaitable[None]] | None = None¶
- class roost.Job(**data)[source]¶
Bases:
BaseModelA row from
roost.jobs.Public read-only view; created internally by the repo. Users construct their own typed argument models and pass them through
args.- Parameters:
id (int)
queue (str)
task (str)
args (dict[str, Any])
state (str)
priority (int)
attempt (int)
max_attempts (int)
scheduled_at (datetime)
attempted_at (datetime | None)
completed_at (datetime | None)
cancelled_at (datetime | None)
discarded_at (datetime | None)
errors (list[dict[str, Any]])
unique_key (str | None)
inserted_at (datetime)
tags (list[str])
timeout_seconds (int | None)
cancel_requested (bool)
result (Any | None)
depends_on (list[int])
metadata (dict[str, Any])
- model_config: ClassVar[ConfigDict] = {'frozen': True}¶
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
- id: int¶
- queue: str¶
- task: str¶
- args: dict[str, Any]¶
- state: str¶
- priority: int¶
- attempt: int¶
- max_attempts: int¶
- scheduled_at: datetime¶
- attempted_at: datetime | None¶
- completed_at: datetime | None¶
- cancelled_at: datetime | None¶
- discarded_at: datetime | None¶
- errors: list[dict[str, Any]]¶
- unique_key: str | None¶
- inserted_at: datetime¶
- tags: list[str]¶
- timeout_seconds: int | None¶
- cancel_requested: bool¶
- result: Any | None¶
- depends_on: list[int]¶
- metadata: dict[str, Any]¶
- exception roost.JobFailed(job_id, state, errors=None)[source]¶
Bases:
RuntimeErrorRaised when the awaited job ended in
discardedorcancelled.- Parameters:
job_id (int)
state (str)
errors (list[dict[str, Any]] | None)
- code: ClassVar[str] = 'roost.job-failed'¶
- class roost.JobInsert(task, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None)[source]¶
Bases:
objectBulk-enqueue payload.
- Parameters:
task (str)
args (dict[str, Any] | None)
queue (str)
priority (int)
max_attempts (int)
scheduled_at (datetime | None)
unique_key (str | None)
tags (list[str] | None)
timeout_seconds (int | None)
depends_on (list[int] | None)
metadata (dict[str, Any] | None)
- task: str¶
- args: dict[str, Any] | None¶
- queue: str¶
- priority: int¶
- max_attempts: int¶
- scheduled_at: datetime | None¶
- unique_key: str | None¶
- tags: list[str] | None¶
- timeout_seconds: int | None¶
- depends_on: list[int] | None¶
- metadata: dict[str, Any] | None¶
- exception roost.JobNotFoundError[source]¶
Bases:
RoostErrorRaised when an admin operation targets a job_id that does not exist.
- code: ClassVar[str] = 'roost.job-not-found'¶
- class roost.JobOutcome(id, state, result, errors)[source]¶
Bases:
object- Parameters:
id (int)
state (str)
result (Any | None)
errors (list[dict[str, Any]])
- id: int¶
- state: str¶
- result: Any | None¶
- errors: list[dict[str, Any]]¶
- class roost.JobState(*values)[source]¶
Bases:
str,Enum- AVAILABLE = 'available'¶
- EXECUTING = 'executing'¶
- RETRYABLE = 'retryable'¶
- COMPLETED = 'completed'¶
- DISCARDED = 'discarded'¶
- CANCELLED = 'cancelled'¶
- exception roost.JobTimeoutError[source]¶
Bases:
TimeoutErrorRaised when
wait_for_asyncexceeds its timeout.- code: ClassVar[str] = 'roost.job-timeout'¶
- class roost.Roost(dsn, *, registry=None)[source]¶
Bases:
objectSynchronous entry point.
Most users only call
enqueue()from this class — workers run via the CLI orroost.AsyncRoost.worker(the worker loop itself is asyncio-based; sync code can drive it viaasyncio.run).- Parameters:
dsn (str)
registry (HandlerRegistry | None)
- enqueue(task, *, args=None, queue='default', priority=0, max_attempts=20, scheduled_at=None, unique_key=None, tags=None, timeout_seconds=None, depends_on=None, metadata=None, conn=None)[source]¶
- Return type:
int- Parameters:
task (str | Callable[[...], Any])
args (dict[str, Any] | BaseModel | None)
queue (str)
priority (int)
max_attempts (int)
scheduled_at (datetime | None)
unique_key (str | None)
tags (list[str] | None)
timeout_seconds (int | None)
depends_on (list[int] | None)
metadata (dict[str, Any] | None)
conn (Connection[Any] | None)
- exception roost.RoostError[source]¶
Bases:
ExceptionBase class for all Roost errors. Carries a stable
code.- code: ClassVar[str] = 'roost.error'¶
- exception roost.SnoozeJob(seconds)[source]¶
Bases:
RoostErrorRaise inside a job handler to reschedule the job without counting it as a failure.
- Parameters:
seconds (float)
- Return type:
None
- code: ClassVar[str] = 'roost.snooze-job'¶
- class roost.TaskDefaults(queue=None, priority=None, max_attempts=None, tags=None, timeout_seconds=None, rate_per_minute=None, max_concurrency=None)[source]¶
Bases:
objectPer-task enqueue defaults declared on
@job(...).The first five (
queue…timeout_seconds) are merged intoenqueuecalls — explicit kwargs always win.The throttling fields (
rate_per_minute,max_concurrency) are enforced at fetch time by the worker. Workers read the registry and pass the limits into the fetch SQL.- Parameters:
queue (str | None)
priority (int | None)
max_attempts (int | None)
tags (tuple[str, ...] | None)
timeout_seconds (int | None)
rate_per_minute (int | None)
max_concurrency (int | None)
- queue: str | None = None¶
- priority: int | None = None¶
- max_attempts: int | None = None¶
- tags: tuple[str, ...] | None = None¶
- timeout_seconds: int | None = None¶
- rate_per_minute: int | None = None¶
- max_concurrency: int | None = None¶
- exception roost.UnknownTaskError[source]¶
Bases:
RoostErrorRaised when a worker pulls a job whose task name is not in the registry.
- code: ClassVar[str] = 'roost.unknown-task'¶
- class roost.Worker(dsn, *, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, registry=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]¶
Bases:
objectA single-process Roost worker.
Multiple workers can run against the same database — concurrency control is enforced by
FOR UPDATE SKIP LOCKEDat the SQL level.- Parameters:
dsn (str)
queues (Iterable[str])
concurrency (int)
prefetch (int | None)
poll_interval (float)
retry_strategy (BackoffStrategy | None)
registry (HandlerRegistry | None)
run_cron (bool)
heartbeat_interval (float)
orphan_reaper_interval (float)
orphan_stale_after (float)
shutdown_timeout (float)
listen_reconnect_delay (float)
error_cap (int)
archive_after_seconds (float | None)
archive_interval (float)
result_ttl_seconds (float | None)
startup_max_retries (int)
startup_retry_delay (float)
hooks (Hooks | None)
- async run_once()[source]¶
Drain every currently-available job and exit. Returns count processed.
Skips the cron scheduler, archive loop, and orphan reaper — this is intended for one-shot invocations (CI smokes, cron-style runs of
roost run --once, programmatic test helpers).Heartbeats and the listen connection still come up briefly; the worker still claims rows via
FOR UPDATE SKIP LOCKEDso it’s safe to run alongside other workers.- Return type:
int
- exception roost.WorkerShutdown[source]¶
Bases:
RoostErrorRaised internally to interrupt the worker loop on graceful shutdown.
- code: ClassVar[str] = 'roost.worker-shutdown'¶
- roost.cron(expression, *, name=None, queue='default', args=None, priority=0, max_attempts=20, timezone=None, handler_registry=None)[source]¶
Register a function as a cron handler under
expression.timezoneaccepts an IANA name ("America/Los_Angeles","Europe/Berlin"). Defaults to UTC. The cron expression is then interpreted in that local timezone, including DST.- Return type:
Callable[[TypeVar(F, bound=Callable[...,Any])],TypeVar(F, bound=Callable[...,Any])]- Parameters:
expression (str)
name (str | None)
queue (str)
args (dict[str, Any] | None)
priority (int)
max_attempts (int)
timezone (str | None)
handler_registry (HandlerRegistry | None)
- roost.exponential(base=2.0, *, jitter=True, cap=86400)[source]¶
base ** attemptseconds, optionally jittered, capped atcap.Defaults to Oban’s behavior:
2, 4, 8, 16, …capped at one day.- Return type:
- Parameters:
base (float)
jitter (bool)
cap (float)
- roost.job(name, *, args_model=None, queue=None, priority=None, max_attempts=None, tags=None, timeout_seconds=None, rate_per_minute=None, max_concurrency=None, registry=None)[source]¶
Register
funcas the handler for the taskname.Per-task defaults (
queue,priority,max_attempts,tags,timeout_seconds) are applied to every enqueue of this task unless the caller passes an explicit kwarg.Pass
args_model=(a Pydantic model) to validate enqueued args at handler-call time.The decorated function is returned untouched — it can still be called directly in tests.
- Return type:
Callable[[TypeVar(F, bound=Callable[...,Any])],TypeVar(F, bound=Callable[...,Any])]- Parameters:
name (str)
args_model (type[BaseModel] | None)
queue (str | None)
priority (int | None)
max_attempts (int | None)
tags (list[str] | tuple[str, ...] | None)
timeout_seconds (int | None)
rate_per_minute (int | None)
max_concurrency (int | None)
registry (HandlerRegistry | None)
Worker¶
- class roost.Worker(dsn, *, queues=('default',), concurrency=4, prefetch=None, poll_interval=1.0, retry_strategy=None, registry=None, run_cron=True, heartbeat_interval=15.0, orphan_reaper_interval=30.0, orphan_stale_after=300.0, shutdown_timeout=30.0, listen_reconnect_delay=1.0, error_cap=20, archive_after_seconds=None, archive_interval=60.0, result_ttl_seconds=None, startup_max_retries=30, startup_retry_delay=1.0, hooks=None)[source]¶
A single-process Roost worker.
Multiple workers can run against the same database — concurrency control is enforced by
FOR UPDATE SKIP LOCKEDat the SQL level.- Parameters:
dsn (str)
queues (Iterable[str])
concurrency (int)
prefetch (int | None)
poll_interval (float)
retry_strategy (BackoffStrategy | None)
registry (HandlerRegistry | None)
run_cron (bool)
heartbeat_interval (float)
orphan_reaper_interval (float)
orphan_stale_after (float)
shutdown_timeout (float)
listen_reconnect_delay (float)
error_cap (int)
archive_after_seconds (float | None)
archive_interval (float)
result_ttl_seconds (float | None)
startup_max_retries (int)
startup_retry_delay (float)
hooks (Hooks | None)
- async run_once()[source]¶
Drain every currently-available job and exit. Returns count processed.
Skips the cron scheduler, archive loop, and orphan reaper — this is intended for one-shot invocations (CI smokes, cron-style runs of
roost run --once, programmatic test helpers).Heartbeats and the listen connection still come up briefly; the worker still claims rows via
FOR UPDATE SKIP LOCKEDso it’s safe to run alongside other workers.- Return type:
int
Exceptions¶
- exception roost.UnknownTaskError[source]¶
Raised when a worker pulls a job whose task name is not in the registry.
- exception roost.JobNotFoundError[source]¶
Raised when an admin operation targets a job_id that does not exist.
- exception roost.DuplicateUniqueJobError[source]¶
Raised when an enqueue conflicts with an active job sharing the same unique_key.