Building a real-time AI agent API with Celery and SSE
If an API triggers a long-running AI agent, the first architectural mistake is usually the same: treating the whole interaction like a normal request-response cycle.
That model breaks down fast.
An agent run can take seconds or minutes. It may call tools, generate partial text, emit intermediate status, hit rate limits, retry external calls, or stop in a non-terminal state that still matters to the user. If the frontend only gets a final payload, the interface feels blind. If the worker talks directly to the browser, the backend loses control of delivery, replay, and state.
A better model is this:
- the API accepts a command to start a run
- the API persists a run record and enqueues a Celery task
- the Celery worker runs the agent
- the worker emits structured events as the run progresses
- the backend exposes those events as a real-time stream to the browser
- the frontend reconstructs the visible message state from the event stream
The practical transport for that last step is usually Server-Sent Events, or SSE.
This post is a technical 101 for building that architecture.
The shape of the system
At a high level, you want five separate responsibilities:
- HTTP API: accepts commands like “start this run”
- task queue: decouples the request from execution
- worker: runs the agent and emits events
- event pipeline: stores events durably and fans them out live
- stream endpoint: exposes the event stream to the frontend
flowchart LR
U[Frontend] -->|POST /runs| A[API]
A -->|insert run| D[(Postgres)]
A -->|enqueue task| Q[(Broker)]
Q --> C[Celery worker]
C -->|append event| E[(Event store)]
C -->|publish live event| P[(Pub/Sub or Stream)]
U -->|GET /runs/:id/events| S[SSE endpoint]
S -->|replay history| E
S -->|subscribe live| P
S --> U
That split matters.
The API should not block while the agent runs. The worker should not be responsible for browser connections. The frontend should not depend on in-memory worker state. And the live stream should not be your only source of truth.
Why SSE fits this problem well
SSE is usually the simplest correct option for one-way updates from server to browser.
The browser opens an HTTP connection and receives frames that look like this:
event: message_start
data: {"type":"message_start","message":{"id":"chatcompl_013sbdtFMNVoJE1xcQRByca6","type":"message","role":"assistant","uuid":"019cd15f-decd-7295-82b1-8fbf348bccbf","content":[],"trace_id":"e103789d7401b29262f91fcf695a4ce6","request_id":"req_011CYs4p3vJMhTo8o4JbvC6m"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" Hi"}}
That is a strong fit for AI agent output because most of the traffic is server-to-client, not bidirectional.
Use WebSockets if the browser must constantly push control messages into the live run itself. Use SSE if the browser mostly needs a reliable stream of updates and can send commands separately over normal HTTP.
For many agent UIs, SSE is enough.
The minimum data model
Before thinking about streaming, define the persisted objects.
A minimal model is:
Runs
One row per agent execution.
Suggested fields:
idconversation_idorthread_idstatus(queued,running,completed,failed,cancelled)requested_bycreated_atstarted_atfinished_aterror_codeerror_message
Run events
One append-only row per streamed event.
Suggested fields:
idor monotonic sequence numberrun_idevent_nameevent_indexorsequencepayloadas JSONcreated_at
Messages
Optional, but useful if you want a final normalized message table separate from raw event history.
You can always rebuild the final assistant message from the event stream, but many systems still materialize final messages for querying and rendering history quickly.
The core rule: events must be durable before they are live
Do not build this around pub/sub alone.
Pure pub/sub gives you live delivery, but it does not give you replay. If the browser reconnects, if the page opens late, or if the SSE process restarts, those events are gone.
The safer pattern is:
- worker creates a canonical event
- worker writes it to a durable event store
- worker publishes it to a live channel
- SSE consumers replay missing history from the event store and then continue with live events
That durable event store can be:
- Postgres table
- Redis Streams
- Kafka
- another append-only log
For most application teams, Postgres plus Redis pub/sub or Redis Streams is enough.
The lifecycle of a run
A robust request path looks like this.
1. Frontend starts a run
The browser sends something like:
POST /api/runs
Content-Type: application/json
{
"conversation_id": "conv_123",
"prompt": "Explain the last deployment failure",
"model": "gpt-5"
}
2. API creates the run record and enqueues Celery
The API should:
- validate the request
- create the
runsrow with statusqueued - create any user message records if needed
- enqueue a Celery task with
run_id - return immediately with the run identifier
Example response:
{
"run_id": "run_01jnt8b4q8x0g2m7k4b7j5s9r2",
"status": "queued",
"events_url": "/api/runs/run_01jnt8b4q8x0g2m7k4b7j5s9r2/events"
}
3. Frontend opens the SSE stream
The browser then subscribes:
const source = new EventSource(`/api/runs/${runId}/events`);
source.addEventListener('message_start', (event) => {
const payload = JSON.parse(event.data);
// initialize assistant message in UI state
});
source.addEventListener('content_block_delta', (event) => {
const payload = JSON.parse(event.data);
// append token text to the right content block
});
source.addEventListener('message_stop', () => {
source.close();
});
That split is cleaner than trying to hold the original POST request open.
4. Celery worker runs the agent
Once dequeued, the worker:
- marks the run
running - starts the AI agent
- listens to agent callbacks or streaming tokens
- normalizes them into your public event schema
- persists and publishes each event
- marks the run complete or failed at the end
What the worker should actually emit
The event schema you showed is already close to what you want.
It distinguishes:
- message lifecycle
- content block lifecycle
- incremental deltas
- stop reason
- limit information
That is much better than a single generic token event.
A stream like this is useful because the frontend can rebuild the exact assistant message progressively:
event: message_start
data: {"type":"message_start","message":{"id":"chatcompl_...","uuid":"019cd15f...","role":"assistant","content":[]}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":"","citations":[]}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" Hi"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"! How"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" can I help you today?"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"}}
event: message_stop
data: {"type":"message_stop"}
That stream tells the UI exactly what to do:
- create the assistant message
- create content block
0 - append text as deltas arrive
- finalize the block
- finalize the message
Normalizing the agent output inside the worker
This is one of the most important design decisions in the system.
Do not leak raw provider responses directly to the frontend.
The worker should map provider-specific events into a stable internal event contract. That contract is what the browser consumes and what the event store persists.
Why this matters:
- providers change their streaming schemas
- tool call events differ across vendors
- you may switch models later
- your frontend should not care whether the tokens came from provider A or provider B
The worker is the translation boundary.
A simplified Python sketch:
from dataclasses import dataclass
from datetime import datetime, timezone
@dataclass
class RunEvent:
run_id: str
sequence: int
event_name: str
payload: dict
created_at: datetime
def emit_event(run_id: str, sequence: int, event_name: str, payload: dict) -> None:
event = RunEvent(
run_id=run_id,
sequence=sequence,
event_name=event_name,
payload=payload,
created_at=datetime.now(timezone.utc),
)
save_event(event)
publish_live_event(event)
@app.task(bind=True)
def run_agent(self, run_id: str) -> None:
sequence = 0
mark_run_running(run_id)
for provider_event in stream_agent(run_id):
normalized = normalize_provider_event(provider_event)
emit_event(run_id, sequence, normalized["type"], normalized)
sequence += 1
mark_run_completed(run_id)
The important part is not the exact code. The important part is that normalize_provider_event() produces a schema your own system owns.
The SSE endpoint should support replay, not just live fan-out
This is where many implementations stay too thin.
A solid SSE endpoint does two things:
- replays stored events the client missed
- keeps the connection open for new events
That means it should support Last-Event-ID or an explicit cursor.
A good model is:
- each emitted event gets a strictly increasing
sequence - SSE frames use
id: <sequence> - on reconnect, the browser sends
Last-Event-ID - the server loads all events with sequence greater than that ID
- after replay, the server subscribes to the live channel
That lets the UI recover from refreshes, mobile network changes, and server restarts.
An SSE frame should look like this:
id: 42
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":" world"}}
Without IDs, reconnect logic becomes fragile.
A practical backend layout
One durable pattern is:
- Postgres for
runsandrun_events - Redis as Celery broker and as live fan-out channel
- Celery worker for agent execution
- web app for both HTTP and SSE
flowchart TD
FE[Frontend] -->|POST /api/runs| API[Web API]
API --> PG[(Postgres)]
API --> RB[(Redis broker)]
RB --> CW[Celery worker]
CW -->|update run status| PG
CW -->|append run_events| PG
CW -->|publish run:{id}| RL[(Redis live channel)]
FE -->|GET /api/runs/:id/events| SSE[SSE endpoint]
SSE -->|read history| PG
SSE -->|subscribe live| RL
SSE --> FE
Why this works well:
- Postgres gives history, ordering, and recovery
- Redis gives low-latency fan-out
- Celery already fits teams using Python async work queues
- the frontend only needs standard browser SSE support
What the frontend state model should look like
The browser should not treat the stream as plain text. It should treat it as state transitions.
A minimal reducer-style state can be:
type Message = {
id: string;
role: 'assistant' | 'user';
content: Array<{ type: 'text'; text: string }>;
stopReason?: string | null;
};
type RunState = {
messages: Record<string, Message>;
orderedMessageIds: string[];
status: 'queued' | 'running' | 'completed' | 'failed';
};
Then each SSE event applies a deterministic change.
Examples:
message_start: insert the assistant message shellcontent_block_start: create blockindexcontent_block_delta: append text to blockindexmessage_delta: update stop reason or other metadatamessage_stop: mark the message complete
That approach is much more maintainable than concatenating every incoming token onto one string.
Why the worker should not push directly to the browser
It is tempting to let the Celery worker talk to WebSockets or SSE clients directly.
That usually creates the wrong ownership boundary.
Problems with that model:
- workers are not reliable connection managers
- scaling browser fan-out inside workers is messy
- replay is hard
- authorization gets duplicated
- if the worker crashes, you may lose transient output with no persisted trail
The worker should produce events. The web layer should deliver them.
That separation is one of the main design constraints that keeps the system sane.
Failure handling you need from day one
If the system is real, not a demo, handle these cases explicitly.
Worker crashes mid-run
Persist an error event and mark the run failed or interrupted. The frontend should receive a terminal event like:
{
"type": "run_failed",
"error": {
"code": "worker_crash",
"message": "The worker terminated before the run completed."
}
}
Browser reconnects
Use event IDs and replay from durable storage.
Frontend subscribes late
The SSE endpoint should replay prior events first.
Duplicate events
If the client reconnects around the live boundary, it may see replayed data and then live data close together. Event IDs and sequence numbers let the frontend drop duplicates safely.
Run cancellation
Support a POST /api/runs/:id/cancel path that marks the run as cancelling and propagates that intent into the worker. A cancellation is still an eventful state transition and should appear in the stream.
How to think about ordering
Order is more important than people assume.
If these arrive out of order:
content_block_deltacontent_block_startmessage_stop
your UI state becomes nonsense.
So define ordering rules early:
- one run has one monotonic event sequence
- every persisted event gets the next sequence number
- SSE IDs use that same sequence
- frontend reducers process strictly in sequence order
This matters even more if the agent can emit tool results, partial assistant text, and status updates in parallel.
A minimal API contract
A clean HTTP surface can stay small.
| Endpoint | Purpose |
|---|---|
POST /api/runs | create and enqueue a run |
GET /api/runs/:id | fetch current run metadata |
GET /api/runs/:id/events | stream or replay SSE events |
POST /api/runs/:id/cancel | request cancellation |
And your event contract should stay explicit:
| Event | Meaning |
|---|---|
message_start | assistant message shell created |
content_block_start | content block initialized |
content_block_delta | partial text or block delta arrived |
content_block_stop | content block finalized |
message_delta | message metadata changed |
message_stop | message finalized |
run_failed | run terminated with error |
run_completed | run reached terminal success state |
That is enough to start. You can always add tool events later.
Security and multi-tenant boundaries
Do not let any authenticated user subscribe to arbitrary run IDs.
The SSE endpoint has to authorize the run exactly like the normal API would. In practice:
- validate the caller owns the conversation or workspace
- scope every run to a tenant or account
- do not use guessable run IDs
- keep provider request IDs and traces if useful, but do not leak secrets or raw tool payloads blindly
Streaming endpoints often get treated like infrastructure plumbing. They still need normal application authorization.
What a production-ready event emitter usually adds
The basic design above works. A better version usually adds these fields to every payload:
run_idsequencecreated_attrace_idmessage_idwhen applicablefinalflag for terminal states
That makes debugging much easier across API logs, worker logs, and frontend state.
A more operational payload envelope might look like this:
{
"type": "content_block_delta",
"run_id": "run_01jnt8b4q8x0g2m7k4b7j5s9r2",
"sequence": 7,
"created_at": "2026-03-09T06:53:54.472349Z",
"trace_id": "e103789d7401b29262f91fcf695a4ce6",
"index": 0,
"delta": {
"type": "text_delta",
"text": " can I help you today?"
}
}
That envelope is easier to replay, debug, and audit than a payload that only contains content.
Where most teams overcomplicate this
They usually do one of two things:
- build a fully custom real-time system before the event model is stable
- skip durability and rely on live sockets only
Both are mistakes.
You do not need a distributed streaming platform to begin. You need:
- a stable event contract
- durable event storage
- a worker that emits normalized events
- an SSE endpoint that supports replay
- a frontend reducer that applies events predictably
That architecture scales farther than most teams expect.
A practical mental model
If you remember only one thing, make it this:
The AI agent run is not one response. It is a timeline of state transitions.
Celery executes that timeline. The event store records it. SSE delivers it. The frontend renders it.
That framing makes the system easier to reason about.
Instead of asking, “How do I stream tokens from a worker to the browser?” ask this:
“How do I turn agent execution into an ordered event log with live delivery and replay?”
That is the architecture you actually want.