Sunday, April 5

By the end of this tutorial, you’ll have a working Claude agent served through a Starlette 1.0 application — with proper async handling, streaming responses over Server-Sent Events, and a WebSocket endpoint for interactive sessions. The Claude agents Starlette integration pattern we’re building here is what I’d actually deploy to production, not a toy demo.

Starlette is the right choice here for a specific reason: it’s the ASGI foundation under FastAPI, which means you get all the async primitives without FastAPI’s dependency injection overhead. For pure agent backends where you need maximum throughput on streaming responses, that matters. At Claude Sonnet 3.5 pricing (~$0.003/1K input tokens, $0.015/1K output), your server cost is trivial compared to model cost — so you want infrastructure that stays out of the way.

  1. Install dependencies — Set up Starlette 1.0, Anthropic SDK, and ASGI server
  2. Configure the Anthropic client — Async client setup with proper lifecycle management
  3. Build the base agent route — POST endpoint with full response handling
  4. Add streaming via SSE — Real-time token streaming without polling
  5. Wire up a WebSocket agent session — Stateful multi-turn conversations
  6. Add middleware and error handling — Rate limiting, CORS, and graceful failures

Step 1: Install Dependencies

You need Starlette 1.0 specifically — not whatever version comes bundled with FastAPI. The 1.0 release cleaned up the routing API and stabilized the lifespan protocol.

pip install "starlette==1.0.0" "anthropic>=0.28.0" "uvicorn[standard]>=0.30.0" "python-dotenv>=1.0.0"

Pin these versions in your requirements.txt. The Anthropic SDK broke the streaming interface between 0.25 and 0.28 — if you’re on an older version, stream() works differently than what’s shown below.

Step 2: Configure the Anthropic Client with Starlette Lifespan

The critical thing most tutorials get wrong: don’t instantiate the Anthropic client at module level. Use Starlette’s lifespan context manager to create it once at startup and share it across requests. This gives you proper connection pooling and clean shutdown.

from contextlib import asynccontextmanager
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.responses import JSONResponse
from starlette.routing import Route, WebSocketRoute
import anthropic
import os

# Shared state container — Starlette 1.0 passes this via request.state
app_state = {}

@asynccontextmanager
async def lifespan(app):
    # Startup: create async client once, reuse across all requests
    app_state["claude"] = anthropic.AsyncAnthropic(
        api_key=os.environ["ANTHROPIC_API_KEY"],
        max_retries=3,  # built-in retry on 529 / 529 rate limits
    )
    yield
    # Shutdown: close the underlying httpx session cleanly
    await app_state["claude"].close()

The AsyncAnthropic client uses httpx under the hood. Without explicit lifecycle management, you’ll see “unclosed client session” warnings in production logs — or worse, connection pool exhaustion under load.

Step 3: Build the Base Agent Route

Start with a synchronous-style POST endpoint that returns the full response. This is your baseline before adding streaming.

async def agent_endpoint(request: Request) -> JSONResponse:
    body = await request.json()
    
    user_message = body.get("message", "")
    system_prompt = body.get("system", "You are a helpful assistant.")
    
    if not user_message:
        return JSONResponse({"error": "message field required"}, status_code=422)
    
    client: anthropic.AsyncAnthropic = app_state["claude"]
    
    response = await client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1024,
        system=system_prompt,
        messages=[{"role": "user", "content": user_message}]
    )
    
    return JSONResponse({
        "content": response.content[0].text,
        "input_tokens": response.usage.input_tokens,
        "output_tokens": response.usage.output_tokens,
        # Useful for cost tracking: ~$0.003 per 1K input + $0.015 per 1K output
        "estimated_cost_usd": (
            response.usage.input_tokens / 1000 * 0.003 +
            response.usage.output_tokens / 1000 * 0.015
        )
    })

Always return token counts. When you’re debugging why a request cost $0.12 instead of $0.01, you’ll want that data in your logs immediately.

Step 4: Add Streaming via Server-Sent Events

This is where Starlette earns its place. For agent responses that take 5-15 seconds, SSE is dramatically better UX than waiting. See also our deep-dive on streaming Claude agent responses in production for the broader patterns around backpressure and client reconnects.

from starlette.responses import StreamingResponse

async def stream_agent_endpoint(request: Request):
    body = await request.json()
    user_message = body.get("message", "")
    system_prompt = body.get("system", "You are a helpful assistant.")
    
    client: anthropic.AsyncAnthropic = app_state["claude"]
    
    async def event_generator():
        # SSE format: "data: {payload}\n\n"
        try:
            async with client.messages.stream(
                model="claude-sonnet-4-5",
                max_tokens=1024,
                system=system_prompt,
                messages=[{"role": "user", "content": user_message}]
            ) as stream:
                async for text in stream.text_stream:
                    # Escape newlines — SSE treats bare \n as field separator
                    escaped = text.replace("\n", "\\n")
                    yield f"data: {escaped}\n\n"
                
                # Send final usage stats as a terminal event
                final = await stream.get_final_message()
                yield f"event: done\ndata: {final.usage.output_tokens}\n\n"
                
        except anthropic.APIStatusError as e:
            yield f"event: error\ndata: {e.message}\n\n"
    
    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Nginx-Buffering": "no",  # critical if behind nginx
        }
    )

The nginx buffering header is the one that burns people most. Without it, nginx buffers the entire SSE stream before sending — completely defeating the purpose. If you’re on a cloud platform like Railway or Render, check their docs for equivalent proxy settings.

Step 5: Wire Up a WebSocket Agent Session

WebSockets give you true bidirectional communication for multi-turn agent sessions. Unlike SSE, you can send follow-up messages without establishing a new connection — which is what you want for interactive agents.

from starlette.websockets import WebSocket, WebSocketDisconnect
import json

async def websocket_agent(websocket: WebSocket):
    await websocket.accept()
    client: anthropic.AsyncAnthropic = app_state["claude"]
    
    # Conversation history persists for the lifetime of this connection
    conversation_history = []
    
    try:
        while True:
            raw = await websocket.receive_text()
            payload = json.loads(raw)
            user_message = payload.get("message", "")
            
            # Append user turn to running history
            conversation_history.append({
                "role": "user",
                "content": user_message
            })
            
            # Stream the response token by token back through the WebSocket
            full_response = ""
            async with client.messages.stream(
                model="claude-sonnet-4-5",
                max_tokens=1024,
                messages=conversation_history
            ) as stream:
                async for text in stream.text_stream:
                    await websocket.send_json({"type": "token", "text": text})
                    full_response += text
            
            # Append assistant turn so next message has full context
            conversation_history.append({
                "role": "assistant",
                "content": full_response
            })
            await websocket.send_json({"type": "done"})
            
    except WebSocketDisconnect:
        pass  # Client closed normally — no need to log this
    except Exception as e:
        await websocket.send_json({"type": "error", "message": str(e)})
        await websocket.close()

One thing to watch: conversation_history lives in memory on that connection. For anything beyond a demo, you’ll want to serialize this to Redis so users can reconnect without losing context. We cover this pattern in detail in the article on persistent memory architecture for Claude agents.

Step 6: Add Middleware and Wire Everything Together

from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware

routes = [
    Route("/agent", agent_endpoint, methods=["POST"]),
    Route("/agent/stream", stream_agent_endpoint, methods=["POST"]),
    WebSocketRoute("/agent/ws", websocket_agent),
]

middleware = [
    Middleware(
        CORSMiddleware,
        allow_origins=["https://yourdomain.com"],  # don't use ["*"] in prod
        allow_methods=["POST"],
        allow_headers=["Content-Type", "Authorization"],
    )
]

app = Starlette(
    routes=routes,
    middleware=middleware,
    lifespan=lifespan,
)
# Run with uvicorn — use --workers for production (no WebSocket support with multi-worker + in-memory state)
uvicorn main:app --host 0.0.0.0 --port 8000 --loop uvloop

If you need structured outputs from your agent — JSON responses with validated schemas — the approach in structured output with Claude: JSON, XML, and regex-based validation patterns integrates cleanly into the agent_endpoint response handler here.

Common Errors

1. “RuntimeError: Timeout context manager should be used inside a task”

This happens when you create the AsyncAnthropic client outside the event loop — typically at module import time. Fix: always initialize inside the lifespan context manager as shown above. The async httpx client must be created in the same event loop that will use it.

2. SSE stream works locally, returns 200 with empty body in production

Almost always a proxy buffering issue. Check three things: (1) the X-Accel-Nginx-Buffering: no header, (2) whether your cloud platform’s load balancer has a response timeout shorter than your stream duration, (3) whether Cache-Control: no-cache is actually reaching the client. For rate limiting and backoff when the API itself starts dropping connections, see rate limiting and backoff strategies for Claude API in production.

3. WebSocket disconnects after ~60 seconds idle

Cloud platforms (AWS ALB, Cloudflare, Render) have default idle timeout of 60s. Either send a ping frame every 30 seconds or set the platform timeout higher. Add this to your WebSocket handler:

import asyncio

# In the websocket_agent function, replace receive_text() with:
raw = await asyncio.wait_for(websocket.receive_text(), timeout=55.0)
# Then send a keepalive ping before the timeout fires
await websocket.send_json({"type": "ping"})

What to Build Next

The natural extension here is adding tool use to the WebSocket session — giving your agent the ability to call external APIs mid-conversation and stream the results back. The architecture maps cleanly: intercept stop_reason == "tool_use" in the stream, execute the tool, then re-enter the stream loop with the tool result. For the orchestration patterns that make this maintainable at scale, the article on building Claude subagents that delegate work gives you the delegation model to structure it properly.

Solo founders and small teams: deploy this on a single Fly.io machine with persistent volume for SQLite conversation history. You don’t need Kubernetes for a Claude agent backend serving under 100 concurrent users.

Teams building multi-tenant products: move conversation history to Redis with per-user key namespacing, put a reverse proxy in front for auth, and consider whether you actually need WebSockets or whether SSE covers 90% of your use case with less infrastructure complexity.

Enterprise / high-throughput builds: the Claude agents Starlette integration pattern here scales well horizontally — just externalize all state. Each Starlette worker is stateless except for the lifespan-managed client, so you can run as many instances as you need behind a load balancer.

Frequently Asked Questions

Why use Starlette instead of FastAPI for Claude agent backends?

FastAPI adds dependency injection, automatic validation, and OpenAPI generation on top of Starlette — all useful, but all overhead. For pure agent API backends where you control the request shape and want to minimize latency on streaming responses, raw Starlette gives you the same async primitives with fewer moving parts. If your agent backend also serves a complex REST API with many routes, use FastAPI; for a focused agent endpoint, Starlette is cleaner.

Can I run multiple Starlette workers with WebSocket agent sessions?

Not with in-memory conversation history. If a client reconnects to a different worker, their history is gone. The fix is externalizing session state to Redis and keying it by a session ID the client sends in the WebSocket handshake header. With that in place, you can run as many workers as you need with uvicorn --workers 4.

How do I handle Claude API rate limits in a Starlette streaming endpoint?

Wrap your client.messages.stream() call in a try/except catching anthropic.RateLimitError, then yield an SSE error event and close the stream. For production, implement token bucket rate limiting at the Starlette middleware level before you even hit the Anthropic API — this protects both your quota and your users from confusing mid-stream failures.

What’s the correct way to pass conversation history in a multi-turn Starlette agent?

Build a list of {"role": "user"/"assistant", "content": "..."}` dicts and pass it as the <code>messages parameter on each API call. The key mistake is not appending the assistant’s response back to history after each turn — Claude needs to see its own prior responses to maintain coherent context. Keep the list in the WebSocket handler scope for single-connection sessions, or serialize to persistent storage for cross-session memory.

How do I test SSE streaming endpoints locally?

Use curl -N http://localhost:8000/agent/stream -d '{"message":"hello"}' -H "Content-Type: application/json" — the -N flag disables buffering so you see tokens as they arrive. For programmatic testing in pytest, use httpx.AsyncClient with client.stream("POST", url, json=payload) and iterate over response.aiter_lines().

Put this into practice

Try the Web Vitals Optimizer agent — ready to use, no setup required.

Browse Agents →

Editorial note: API pricing, model capabilities, and tool features change frequently — always verify current details on the vendor’s website before building in production. Code examples are tested at time of writing; pin your dependency versions to avoid breaking changes. Some links in this article may be affiliate links — we may earn a commission if you sign up, at no extra cost to you.


Share.
Leave A Reply