Sunday, April 5

By the end of this tutorial, you’ll have a working orchestrator agent that spawns specialized Claude subagents, passes context between them, and recovers gracefully when one fails — all in plain Python with no framework magic hiding the important parts. Claude subagent orchestration is where single-agent workflows stop being sufficient and real production complexity begins.

Most tutorials show you a single Claude API call doing everything. That breaks down fast when tasks require parallelism, specialized prompts per domain, or outputs that exceed what one context window can usefully hold. The orchestrator/subagent pattern solves this — but the devil is in how you pass context, handle failures, and avoid token bloat cascading through your agent tree.

  1. Install dependencies — set up the Anthropic client and async primitives
  2. Define the subagent contract — typed inputs and outputs for reliable delegation
  3. Build the orchestrator — spawn subagents, collect results, manage context
  4. Add parallel execution — run independent subagents concurrently with asyncio
  5. Implement failure recovery — retry logic, fallbacks, and partial result handling
  6. Wire up result synthesis — merge subagent outputs back to a final answer

Step 1: Install Dependencies

You need the Anthropic SDK and tenacity for retry logic. That’s it — no LangChain, no extra framework overhead.

pip install anthropic tenacity python-dotenv

Set your API key in a .env file:

ANTHROPIC_API_KEY=sk-ant-...

Step 2: Define the Subagent Contract

The single biggest mistake in multi-agent systems is treating subagent I/O as free-form text. You end up with the orchestrator trying to parse outputs it can’t reliably structure. Use typed dataclasses and force JSON output from every subagent — this gives you something you can actually act on programmatically.

from dataclasses import dataclass
from typing import Optional, Any
import json

@dataclass
class SubagentTask:
    task_id: str
    agent_role: str          # e.g., "researcher", "coder", "reviewer"
    instructions: str        # the specific task this subagent must do
    context: dict            # shared data from the orchestrator
    output_schema: dict      # JSON schema the subagent must conform to

@dataclass
class SubagentResult:
    task_id: str
    success: bool
    data: Optional[dict]     # structured output
    error: Optional[str]
    tokens_used: int

The output_schema field is critical. You pass it into the subagent’s system prompt so Claude knows exactly what JSON structure to return. This is the same pattern covered in our guide on reducing LLM hallucinations with structured outputs — forcing a schema at the prompt level catches most output drift before it propagates upstream.

Step 3: Build the Orchestrator

The orchestrator is itself a Claude call. It receives the high-level task, breaks it into subtasks, and decides which subagent handles each one. The key is keeping the orchestrator’s context lean — it doesn’t need to see full subagent outputs, just summaries.

import anthropic
import os
from dotenv import load_dotenv

load_dotenv()
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

ORCHESTRATOR_SYSTEM = """You are a task orchestration agent. Given a complex task, 
decompose it into subtasks and assign each to a specialist subagent.

Return ONLY valid JSON in this structure:
{
  "task_summary": "string",
  "subtasks": [
    {
      "task_id": "unique_id",
      "agent_role": "researcher|coder|reviewer|writer",
      "instructions": "specific instructions for this subagent",
      "depends_on": []  // task_ids this task must wait for
    }
  ]
}"""

def orchestrate(user_task: str, context: dict = {}) -> dict:
    response = client.messages.create(
        model="claude-opus-4-5",   # orchestrator gets the smart model
        max_tokens=1024,
        system=ORCHESTRATOR_SYSTEM,
        messages=[{
            "role": "user",
            "content": f"Task: {user_task}\n\nContext: {json.dumps(context)}"
        }]
    )
    
    raw = response.content[0].text
    # Strip markdown code fences if Claude adds them
    raw = raw.strip().removeprefix("```json").removesuffix("```").strip()
    return json.loads(raw)

Note: orchestrator runs on claude-opus-4-5 because decomposition quality matters. Subagents run on claude-haiku-4-5 for most tasks — roughly $0.00025 per 1K input tokens vs $0.015 for Opus. On a workflow with 6 subagents averaging 800 input tokens each, that’s about $0.0012 in subagent costs versus $0.12 if you used Opus throughout. The tier separation pays for itself immediately at scale.

Step 4: Add Parallel Execution

Subtasks without dependencies can run concurrently. This is where asyncio earns its place — sequential subagent calls on a 6-task workflow with 2s average latency takes 12 seconds. Parallel drops that to the longest chain.

import asyncio

SUBAGENT_SYSTEM_TEMPLATE = """You are a specialist {role} agent. 
Complete the assigned task and return ONLY valid JSON matching this schema:
{schema}

Be precise. Do not include explanations outside the JSON structure."""

async def run_subagent(task: SubagentTask) -> SubagentResult:
    """Run a single subagent call asynchronously."""
    schema_str = json.dumps(task.output_schema, indent=2)
    system = SUBAGENT_SYSTEM_TEMPLATE.format(
        role=task.agent_role,
        schema=schema_str
    )
    
    # Inject parent context as a compressed summary — not raw dump
    context_str = summarize_context(task.context)  # see below
    
    loop = asyncio.get_event_loop()
    response = await loop.run_in_executor(None, lambda: client.messages.create(
        model="claude-haiku-4-5",   # cheaper model for subagents
        max_tokens=2048,
        system=system,
        messages=[{
            "role": "user",
            "content": f"Task: {task.instructions}\n\nContext: {context_str}"
        }]
    ))
    
    raw = response.content[0].text.strip()
    raw = raw.removeprefix("```json").removesuffix("```").strip()
    
    try:
        data = json.loads(raw)
        return SubagentResult(
            task_id=task.task_id,
            success=True,
            data=data,
            error=None,
            tokens_used=response.usage.input_tokens + response.usage.output_tokens
        )
    except json.JSONDecodeError as e:
        return SubagentResult(
            task_id=task.task_id,
            success=False,
            data=None,
            error=f"JSON parse failed: {e}. Raw output: {raw[:200]}",
            tokens_used=response.usage.input_tokens + response.usage.output_tokens
        )

def summarize_context(context: dict) -> str:
    """Keep context under 500 tokens — don't pass everything to every subagent."""
    if not context:
        return "No additional context."
    # Only pass keys relevant to the subagent, not the full shared state
    relevant = {k: v for k, v in context.items() if not k.startswith("_internal")}
    summary = json.dumps(relevant, indent=2)
    # Hard cap — truncate if necessary
    if len(summary) > 2000:
        summary = summary[:2000] + "\n... [truncated]"
    return summary

The summarize_context function is not optional. Without it, every subagent gets the full accumulated state from all previous steps, and your context window fills up fast. The pattern of scoping context to what each agent actually needs is the same principle behind persistent memory architecture for Claude agents — selective retrieval beats dumping everything.

Step 5: Implement Failure Recovery

Subagents fail. JSON parse errors, API timeouts, rate limits, and occasional model refusals are all real. You need retry logic with exponential backoff and a graceful fallback when retries are exhausted. See our detailed breakdown of LLM fallback and retry patterns in production for the full picture — here’s the practical implementation for the subagent context:

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
import anthropic

@retry(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=2, max=10),
    retry=retry_if_exception_type((anthropic.APIError, json.JSONDecodeError))
)
async def run_subagent_with_retry(task: SubagentTask) -> SubagentResult:
    return await run_subagent(task)

async def execute_dag(subtasks: list[dict], shared_context: dict) -> dict[str, SubagentResult]:
    """Execute subtasks respecting dependencies (simple DAG execution)."""
    results = {}
    pending = {t["task_id"]: t for t in subtasks}
    
    while pending:
        # Find tasks whose dependencies are all resolved
        ready = [
            t for t in pending.values()
            if all(dep in results for dep in t.get("depends_on", []))
        ]
        
        if not ready:
            # Circular dependency or all remaining tasks blocked by failed deps
            for task_id in pending:
                results[task_id] = SubagentResult(
                    task_id=task_id,
                    success=False,
                    data=None,
                    error="Blocked by failed upstream dependency",
                    tokens_used=0
                )
            break
        
        # Run all ready tasks in parallel
        tasks_to_run = [
            SubagentTask(
                task_id=t["task_id"],
                agent_role=t["agent_role"],
                instructions=t["instructions"],
                context={
                    **shared_context,
                    # Inject upstream results this task depends on
                    "upstream_results": {
                        dep: results[dep].data 
                        for dep in t.get("depends_on", []) 
                        if results.get(dep) and results[dep].success
                    }
                },
                output_schema={"result": "string", "confidence": "number", "notes": "string"}
            )
            for t in ready
        ]
        
        batch_results = await asyncio.gather(
            *[run_subagent_with_retry(task) for task in tasks_to_run],
            return_exceptions=True
        )
        
        for task, result in zip(tasks_to_run, batch_results):
            if isinstance(result, Exception):
                results[task.task_id] = SubagentResult(
                    task_id=task.task_id,
                    success=False,
                    data=None,
                    error=str(result),
                    tokens_used=0
                )
            else:
                results[task.task_id] = result
            pending.pop(task.task_id)
    
    return results

Step 6: Wire Up Result Synthesis

Once subagents complete, the orchestrator synthesizes their outputs into a final answer. The synthesis call only sees summaries of successful subagent results — failed ones get flagged explicitly so the orchestrator can reason about partial completion.

def synthesize_results(
    original_task: str, 
    subtask_plan: dict,
    results: dict[str, SubagentResult]
) -> str:
    """Final orchestrator call to merge subagent outputs."""
    
    successful = {tid: r.data for tid, r in results.items() if r.success}
    failed = {tid: r.error for tid, r in results.items() if not r.success}
    
    synthesis_prompt = f"""Original task: {original_task}

Subtask plan: {json.dumps(subtask_plan['task_summary'])}

Successful subagent outputs:
{json.dumps(successful, indent=2)}

Failed subagents (handle gracefully):
{json.dumps(failed, indent=2) if failed else "None"}

Synthesize the successful outputs into a complete response. 
If some subtasks failed, note what's incomplete and provide the best answer possible from available data."""
    
    response = client.messages.create(
        model="claude-opus-4-5",
        max_tokens=2048,
        messages=[{"role": "user", "content": synthesis_prompt}]
    )
    
    total_tokens = sum(r.tokens_used for r in results.values())
    print(f"Total subagent tokens used: {total_tokens}")
    
    return response.content[0].text

# Full pipeline
async def run_orchestrated_task(user_task: str) -> str:
    plan = orchestrate(user_task)
    results = await execute_dag(plan["subtasks"], {"original_task": user_task})
    return synthesize_results(user_task, plan, results)

Common Errors

1. Orchestrator returns invalid JSON

Claude sometimes wraps JSON in markdown fences (` “`json `) or adds a preamble sentence. The removeprefix/removesuffix calls above handle fences. For the preamble issue, add this to your orchestrator system prompt: “Return ONLY the JSON object. No explanatory text before or after. No markdown formatting.” If it still happens, use a regex extraction as last resort: re.search(r'\{.*\}', raw, re.DOTALL).

2. Context explosion across subagent calls

If you pass full conversation history or raw upstream outputs to every subagent, you’ll hit token limits on long workflows and rack up unnecessary costs. Always compress context. For results from upstream tasks, pass only the fields the downstream agent actually needs — not the full result object. On a 10-task workflow, this alone can cut per-run costs by 40-60%.

3. Silent dependency deadlocks

If your DAG executor encounters a cycle or all remaining tasks depend on a failed task, it can hang. The execute_dag implementation above breaks the loop explicitly — but add logging when you hit the “blocked” condition so you can diagnose it. In production, set a hard timeout on the whole execute_dag call using asyncio.wait_for(execute_dag(...), timeout=120).

What to Build Next

The natural extension here is adding tool use to individual subagents — let the researcher subagent call a web search API, let the coder subagent write and execute code in a sandbox. The Claude tool use with Python guide walks through exactly how to attach tools to specific subagents without exposing them to the orchestrator (which shouldn’t care about implementation details). You can also add an observability layer — Langfuse or Helicone work well here — to trace which subagent is consuming the most tokens and where failures cluster.

Bottom line by reader type: If you’re a solo founder building a prototype, start with two subagent roles max and sequential execution — parallel adds complexity you don’t need yet. If you’re on a team shipping a production workflow with 5+ subtasks, the DAG executor and typed contracts above are worth the upfront investment. The Claude subagent orchestration pattern scales cleanly; the ad-hoc single-prompt approach does not.

Frequently Asked Questions

How do I pass context between Claude subagents without exceeding the context window?

Only pass what each subagent actually needs — not the full shared state. Extract relevant fields from upstream results and summarize long outputs before injecting them downstream. Hard-cap context injections at around 2000 characters per subagent call and use a dedicated summarization step for large outputs from previous agents.

Which Claude model should the orchestrator use versus subagents?

Use claude-opus-4-5 for orchestration (decomposition quality matters) and claude-haiku-4-5 for most subagent tasks where the instructions are specific and well-scoped. Use Sonnet for subagents doing complex reasoning or code generation. The cost difference between an all-Haiku stack and an all-Opus stack is roughly 60x — tier selection is the highest-leverage cost lever in multi-agent systems.

Can I run Claude subagents in parallel, and does it violate rate limits?

Yes, asyncio parallel execution works well with the Anthropic API. Rate limits apply per-organization (not per-request), so parallel subagents consume your rate budget simultaneously. If you’re hitting limits on large workflows, add a semaphore to cap concurrent subagent calls: asyncio.Semaphore(5) limits parallelism to 5 simultaneous calls.

What’s the difference between Claude subagent orchestration and LangChain agents?

LangChain agents use a single model with tool calls in a loop — useful for sequential tool use but awkward for parallel specialist subagents. The pattern here gives each subagent its own system prompt, model tier, and output schema, which produces more consistent specialist behavior. Plain Python also gives you full control over retry logic and context management without framework abstractions getting in the way.

How do I handle a subagent that keeps returning malformed JSON?

Three layers: first, strengthen the system prompt with explicit “return ONLY valid JSON” instructions; second, use regex extraction as a fallback to pull JSON from surrounding text; third, retry with a repair prompt that shows Claude the malformed output and asks it to fix the JSON. If all three fail, mark the subagent result as failed and let the synthesizer handle partial completion gracefully.

Put this into practice

Browse our directory of Claude Code agents — ready-to-use agents for development, automation, and data workflows.

Browse Agents →

Editorial note: API pricing, model capabilities, and tool features change frequently — always verify current details on the vendor’s website before building in production. Code examples are tested at time of writing; pin your dependency versions to avoid breaking changes. Some links in this article may be affiliate links — we may earn a commission if you sign up, at no extra cost to you.


Share.
Leave A Reply