Appearance
Orchestration
This guide explains how the runtime processes a user turn, manages state, runs tools, and emits checkpoints. It does not explain graph authoring or FastAPI route design in detail.
This guide is for developers who need to understand the runtime contract behind process_swarm_stream(...). It assumes that you already know the basic SwarmForge objects and want deeper control over execution.
After reading this guide, you should be able to:
- understand the turn runner contract
- trace the event and checkpoint flow
- reason about handoffs, tool execution, and session state
- identify the right extension points for persistence and state management
The orchestration package executes one user turn against a SwarmSession. You provide a turn runner for one agent step, usually backed by a real provider call, and the SDK manages routing, tools, reducers, and checkpoints around it.
Execution Model
- Create or load a
SwarmSession. - Call
process_swarm_stream(...)with the user input. - Provide a turn runner so the SDK can request one agent turn at a time.
- Consume emitted events and persisted checkpoints.
Runtime Responsibilities
- per-agent histories and runtime context
- automatic
transfer_to_agentinjection from graph edges - reducer-based variable application
- queued-intent routing
- live tool handler execution
- tool call checkpointing
tool_mock_requiredevents when a tool is called without a mock
Runner Contract
turn_runner.run_turn(...) receives:
agent_nodeThe activeSwarmNode.contentsThe agent-visible conversation history for the current turn.configAnAgentTurnConfigcontaining the system instruction, available tools, visible variables, and behavior config.
The runner returns an AgentTurnResult with:
response_texttool_callsraw_response
All runnable examples on this page assume you already copied .env.example to .env, set MODEL_PROVIDER, set LLM_MODEL, and added the matching API key.
Minimal multi-agent flow
python
import asyncio
import json
from swarmforge.env import require_env_vars
from swarmforge.evaluation.provider import ModelConfig
from swarmforge.swarm import (
InMemorySessionStore,
SessionStateManager,
SwarmDefinition,
SwarmEdge,
SwarmNode,
SwarmSession,
build_turn_runner,
process_swarm_stream,
)
swarm = SwarmDefinition(
id="support",
name="Support Swarm",
nodes=[
SwarmNode(
id="triage",
node_key="triage",
name="Triage",
system_prompt=(
"You triage support requests. "
"Immediately call transfer_to_agent for billing issues once the route is clear."
),
is_entry_node=True,
),
SwarmNode(
id="billing",
node_key="billing",
name="Billing",
system_prompt="You handle billing.",
),
],
edges=[
SwarmEdge(
id="triage->billing",
source_node_id="triage",
target_node_id="billing",
handoff_description="Transfer billing issues after intent confirmation.",
required_variables=["account_id"],
)
],
)
session = SwarmSession(id="session-1", swarm=swarm)
store = InMemorySessionStore()
async def extract_required_variables(user_input="", **_kwargs):
if "ACME-991" in user_input:
return {"account_id": "ACME-991"}
return {}
async def main():
require_env_vars("MODEL_PROVIDER", "LLM_MODEL")
turn_runner = build_turn_runner(ModelConfig())
async for event in process_swarm_stream(
session,
"I need help with a charge on account ACME-991.",
store=store,
turn_runner=turn_runner,
extract_required_variables=extract_required_variables,
):
print(event)
asyncio.run(main())Event Stream
openAnnounces the starting agent for the turn.tool_useReports model-requested tool calls.handoffReports successful routing to another node.handoff_blockedReports a blocked transfer when required variables are still missing. The runtime also appends a structured tool result for the blockedtransfer_to_agentcall, including a concreterejection_reason, before asking the current agent for its next reply.tool_mock_requiredReports a tool call that has neither a handler nor a mock.chunkStreams assistant text.doneEmits the final assembled agent response.
With a tool-capable model, the runnable flow on this page typically emits:
text
open -> tool_use -> handoff -> chunk -> doneWhen a transfer is blocked, the visible flow still looks like open -> tool_use -> handoff_blocked -> chunk -> done, but the final chunk is generated after the agent receives the blocked transfer as a tool result in its turn history.
The handoff event carries both routing and variable resolution:
json
{
"event": "handoff",
"data": {
"fromNodeKey": "triage",
"toNodeKey": "billing",
"requiredVariables": ["account_id"],
"resolvedVariables": {"account_id": "ACME-991"}
}
}Persistence Contract
SessionStore implementations back session and checkpoint persistence. The interface is:
get_session(session_id)save_session(session)append_checkpoint(checkpoint)list_checkpoints(session_id)
State Management
SessionStateManager is the preferred runtime state abstraction for reducer-aware session data. GlobalVariableManager remains as a compatibility alias.
It is the recommended extension point when you need custom validation or normalization policies because it is shared by:
- direct
process_swarm_stream(...)usage create_fastapi_app(...)create_swarm_app(...)
The default behavior respects swarm variable reducer rules. If you need custom logic, subclass it and pass the instance into the runtime.
python
class CustomState(SessionStateManager):
def normalize_value(self, key, value, *, session, current_node=None):
if key == "account_id":
return str(value).strip().upper()
return value
def validate_value(self, key, value, *, session, current_node=None, reducer_rule):
if key == "account_id" and not value:
raise ValueError("account_id cannot be empty")
state_manager = CustomState.from_swarm(swarm)
async for event in process_swarm_stream(
session,
"I need help with account acme-991",
store=store,
turn_runner=build_turn_runner(ModelConfig()),
global_variable_manager=state_manager,
):
print(event)When the caller already has request-scoped facts, you can apply them before the turn using the same manager contract the API uses:
python
state_manager.update_state(
session,
{"account_id": "ACME-991", "priority": "high"},
current_node=session.current_node,
)Dynamic System Prompt Context
For code-defined swarms, a node prompt can be a function instead of a static string.
python
from swarmforge.swarm import SwarmNode, SystemPromptContext
def billing_prompt(context: SystemPromptContext) -> str:
account_id = context.state.get("account_id") or "unknown"
return f"You handle billing issues. Current account context: {account_id}."
node = SwarmNode(
id="billing",
node_key="billing",
name="Billing",
system_prompt=billing_prompt,
is_entry_node=True,
)This keeps code-defined prompts dynamic without changing the JSON authoring format.
The important part is where account_id came from: you pass it in from outside the swarm, either through SwarmSession.from_state(...) in code or through the API state payload, and the runtime injects it into the prompt context.
Single-agent mode
The same runtime works with one entry node and no edges. See Examples for the single-agent script.
Tool Execution Model
For Python usage, a node tool can execute real logic instead of returning only mock data. The simplest path is to attach a handler directly to a tool definition and use context plus tool_state for runtime dependencies. JSON schema is inferred automatically from the function signature, and the tool description is pulled from the function docstring when you do not provide one explicitly.
Handlers can be sync or async, receive tool args by name, and can optionally accept injected runtime-only arguments such as context, state, shared_state, session, agent_node, user_input, visible_state, or visible_global_variables.
python
from swarmforge.swarm import SwarmNode, function_tool
async def lookup_order(order_id: str, context=None, state=None):
"""Fetch order status for a customer order.
Args:
order_id: The order identifier.
"""
return {
"order_id": order_id,
"account_id": context.state.get("account_id"),
"tenant": (state or context.shared_state)["tenant"],
"status": "shipped",
}
node = SwarmNode(
id="ops",
node_key="ops",
name="Ops",
system_prompt="Use tools when an order lookup is needed.",
enabled_tools=[function_tool(handler=lookup_order)],
is_entry_node=True,
)Pass tool_state=... into process_swarm_stream(...) when the handler needs runtime-only dependencies such as HTTP clients or credentials; the handler can receive that object through the injected state or shared_state parameter. This keeps external integration state outside the agent-visible session payload.