Source code for roost._core.job

from __future__ import annotations

from datetime import datetime
from typing import Any

from pydantic import BaseModel, ConfigDict, Field


[docs] class Job(BaseModel): """A row from ``roost.jobs``. Public read-only view; created internally by the repo. Users construct their own typed argument models and pass them through ``args``. """ model_config = ConfigDict(frozen=True) id: int queue: str task: str args: dict[str, Any] = Field(default_factory=dict) state: str priority: int = 0 attempt: int = 0 max_attempts: int = 20 scheduled_at: datetime attempted_at: datetime | None = None completed_at: datetime | None = None cancelled_at: datetime | None = None discarded_at: datetime | None = None errors: list[dict[str, Any]] = Field(default_factory=list) unique_key: str | None = None inserted_at: datetime tags: list[str] = Field(default_factory=list) timeout_seconds: int | None = None cancel_requested: bool = False result: Any | None = None depends_on: list[int] = Field(default_factory=list) metadata: dict[str, Any] = Field(default_factory=dict)