FastAPIΒΆ
from contextlib import asynccontextmanager
import asyncpg
from fastapi import FastAPI
from roost import AsyncRoost, job
@job("send_welcome_email")
async def send_welcome_email(user_id: int) -> None:
...
@asynccontextmanager
async def lifespan(app: FastAPI):
app.state.pool = await asyncpg.create_pool("postgresql://...")
app.state.roost = AsyncRoost("postgresql://...")
yield
await app.state.roost.close()
await app.state.pool.close()
app = FastAPI(lifespan=lifespan)
@app.post("/users")
async def create_user(email: str):
pool = app.state.pool
roost: AsyncRoost = app.state.roost
async with pool.acquire() as conn:
async with conn.transaction():
user_id = await conn.fetchval(
"INSERT INTO users (email) VALUES ($1) RETURNING id", email
)
await roost.enqueue(
send_welcome_email,
args={"user_id": user_id},
conn=conn,
)
return {"id": user_id}
Run a worker process alongside the API:
roost run --module myapp.tasks --queues default --concurrency 8