Skip to content

Orchestration

This guide explains how the runtime processes a user turn, manages state, runs tools, and emits checkpoints. It does not explain graph authoring or FastAPI route design in detail.

This guide is for developers who need to understand the runtime contract behind process_swarm_stream(...). It assumes that you already know the basic SwarmForge objects and want deeper control over execution.

After reading this guide, you should be able to:

  • understand the turn runner contract
  • trace the event and checkpoint flow
  • reason about handoffs, tool execution, and session state
  • identify the right extension points for persistence and state management

The orchestration package executes one user turn against a SwarmSession. You provide a turn runner for one agent step, usually backed by a real provider call, and the SDK manages routing, tools, reducers, and checkpoints around it.

Execution Model

  1. Create or load a SwarmSession.
  2. Call process_swarm_stream(...) with the user input.
  3. Provide a turn runner so the SDK can request one agent turn at a time.
  4. Consume emitted events and persisted checkpoints.

Runtime Responsibilities

  • per-agent histories and runtime context
  • automatic transfer_to_agent injection from graph edges
  • reducer-based variable application
  • queued-intent routing
  • live tool handler execution
  • tool call checkpointing
  • tool_mock_required events when a tool is called without a mock

Runner Contract

turn_runner.run_turn(...) receives:

  • agent_node The active SwarmNode.
  • contents The agent-visible conversation history for the current turn.
  • config An AgentTurnConfig containing the system instruction, available tools, visible variables, and behavior config.

The runner returns an AgentTurnResult with:

  • response_text
  • tool_calls
  • raw_response

All runnable examples on this page assume you already copied .env.example to .env, set MODEL_PROVIDER, set LLM_MODEL, and added the matching API key.

Minimal multi-agent flow

python
import asyncio
import json

from swarmforge.env import require_env_vars
from swarmforge.evaluation.provider import ModelConfig
from swarmforge.swarm import (
    InMemorySessionStore,
    SessionStateManager,
    SwarmDefinition,
    SwarmEdge,
    SwarmNode,
    SwarmSession,
    build_turn_runner,
    process_swarm_stream,
)


swarm = SwarmDefinition(
    id="support",
    name="Support Swarm",
    nodes=[
        SwarmNode(
            id="triage",
            node_key="triage",
            name="Triage",
            system_prompt=(
                "You triage support requests. "
                "Immediately call transfer_to_agent for billing issues once the route is clear."
            ),
            is_entry_node=True,
        ),
        SwarmNode(
            id="billing",
            node_key="billing",
            name="Billing",
            system_prompt="You handle billing.",
        ),
    ],
    edges=[
        SwarmEdge(
            id="triage->billing",
            source_node_id="triage",
            target_node_id="billing",
            handoff_description="Transfer billing issues after intent confirmation.",
            required_variables=["account_id"],
        )
    ],
)

session = SwarmSession(id="session-1", swarm=swarm)
store = InMemorySessionStore()


async def extract_required_variables(user_input="", **_kwargs):
    if "ACME-991" in user_input:
        return {"account_id": "ACME-991"}
    return {}


async def main():
    require_env_vars("MODEL_PROVIDER", "LLM_MODEL")
    turn_runner = build_turn_runner(ModelConfig())
    async for event in process_swarm_stream(
        session,
        "I need help with a charge on account ACME-991.",
        store=store,
        turn_runner=turn_runner,
        extract_required_variables=extract_required_variables,
    ):
        print(event)


asyncio.run(main())

Event Stream

  • open Announces the starting agent for the turn.
  • tool_use Reports model-requested tool calls.
  • handoff Reports successful routing to another node.
  • handoff_blocked Reports a blocked transfer when required variables are still missing. The runtime also appends a structured tool result for the blocked transfer_to_agent call, including a concrete rejection_reason, before asking the current agent for its next reply.
  • tool_mock_required Reports a tool call that has neither a handler nor a mock.
  • chunk Streams assistant text.
  • done Emits the final assembled agent response.

With a tool-capable model, the runnable flow on this page typically emits:

text
open -> tool_use -> handoff -> chunk -> done

When a transfer is blocked, the visible flow still looks like open -> tool_use -> handoff_blocked -> chunk -> done, but the final chunk is generated after the agent receives the blocked transfer as a tool result in its turn history.

The handoff event carries both routing and variable resolution:

json
{
    "event": "handoff",
    "data": {
        "fromNodeKey": "triage",
        "toNodeKey": "billing",
        "requiredVariables": ["account_id"],
        "resolvedVariables": {"account_id": "ACME-991"}
    }
}

Persistence Contract

SessionStore implementations back session and checkpoint persistence. The interface is:

  • get_session(session_id)
  • save_session(session)
  • append_checkpoint(checkpoint)
  • list_checkpoints(session_id)

State Management

SessionStateManager is the preferred runtime state abstraction for reducer-aware session data. GlobalVariableManager remains as a compatibility alias.

It is the recommended extension point when you need custom validation or normalization policies because it is shared by:

  • direct process_swarm_stream(...) usage
  • create_fastapi_app(...)
  • create_swarm_app(...)

The default behavior respects swarm variable reducer rules. If you need custom logic, subclass it and pass the instance into the runtime.

python
class CustomState(SessionStateManager):
    def normalize_value(self, key, value, *, session, current_node=None):
        if key == "account_id":
            return str(value).strip().upper()
        return value

    def validate_value(self, key, value, *, session, current_node=None, reducer_rule):
        if key == "account_id" and not value:
            raise ValueError("account_id cannot be empty")


state_manager = CustomState.from_swarm(swarm)

async for event in process_swarm_stream(
    session,
    "I need help with account acme-991",
    store=store,
    turn_runner=build_turn_runner(ModelConfig()),
    global_variable_manager=state_manager,
):
    print(event)

When the caller already has request-scoped facts, you can apply them before the turn using the same manager contract the API uses:

python
state_manager.update_state(
    session,
    {"account_id": "ACME-991", "priority": "high"},
    current_node=session.current_node,
)

Dynamic System Prompt Context

For code-defined swarms, a node prompt can be a function instead of a static string.

python
from swarmforge.swarm import SwarmNode, SystemPromptContext


def billing_prompt(context: SystemPromptContext) -> str:
    account_id = context.state.get("account_id") or "unknown"
    return f"You handle billing issues. Current account context: {account_id}."


node = SwarmNode(
    id="billing",
    node_key="billing",
    name="Billing",
    system_prompt=billing_prompt,
    is_entry_node=True,
)

This keeps code-defined prompts dynamic without changing the JSON authoring format.

The important part is where account_id came from: you pass it in from outside the swarm, either through SwarmSession.from_state(...) in code or through the API state payload, and the runtime injects it into the prompt context.

Single-agent mode

The same runtime works with one entry node and no edges. See Examples for the single-agent script.

Tool Execution Model

For Python usage, a node tool can execute real logic instead of returning only mock data. The simplest path is to attach a handler directly to a tool definition and use context plus tool_state for runtime dependencies. JSON schema is inferred automatically from the function signature, and the tool description is pulled from the function docstring when you do not provide one explicitly.

Handlers can be sync or async, receive tool args by name, and can optionally accept injected runtime-only arguments such as context, state, shared_state, session, agent_node, user_input, visible_state, or visible_global_variables.

python
from swarmforge.swarm import SwarmNode, function_tool


async def lookup_order(order_id: str, context=None, state=None):
    """Fetch order status for a customer order.

    Args:
        order_id: The order identifier.
    """

    return {
        "order_id": order_id,
        "account_id": context.state.get("account_id"),
        "tenant": (state or context.shared_state)["tenant"],
        "status": "shipped",
    }


node = SwarmNode(
    id="ops",
    node_key="ops",
    name="Ops",
    system_prompt="Use tools when an order lookup is needed.",
    enabled_tools=[function_tool(handler=lookup_order)],
    is_entry_node=True,
)

Pass tool_state=... into process_swarm_stream(...) when the handler needs runtime-only dependencies such as HTTP clients or credentials; the handler can receive that object through the injected state or shared_state parameter. This keeps external integration state outside the agent-visible session payload.

Released as open source.