Skip to main content
This document covers detailed usage examples and component references for multi-agent orchestration in AgentScope.

Orchestration Paradigms

AgentScope supports two primary orchestration paradigms for building multi-agent applications:

Master-Worker Pattern

A central agent coordinates and delegates tasks to specialized workers. Best for task decomposition, routing, and hierarchical workflows.

Conversation / SOP / Workflow

Agents communicate peer-to-peer via Msg objects. Best for multi-agent discussions, simulations, and message-driven pipelines.

Master-Worker Pattern

In the master-worker pattern, a central agent (master) coordinates and delegates tasks to subordinate agents (workers). The master agent controls the execution flow and decides which worker should handle each task. Key characteristics:
  • Centralized control and decision-making
  • Explicit task delegation and routing
  • Workers typically don’t communicate directly with each other
  • Clear hierarchical structure
Use cases:
  • Task decomposition and parallel execution
  • Specialized agents for different domains (e.g., code generation, web search, data analysis)
  • Dynamic agent creation based on task requirements
  • Complex workflows requiring orchestration logic
Implementation approaches:
  • Explicit routing: Use structured output or tool calls to route tasks to different agents
  • Agent-as-tool: Wrap sub-agents as tool functions that the master agent can invoke

Conversation/SOP/Workflow Pattern

In the conversation pattern, agents communicate by broadcasting and passing Msg objects among themselves. There’s no central controller — agents interact peer-to-peer, and the execution flow emerges from their interactions. Key characteristics:
  • Decentralized communication
  • Message-driven coordination
  • Agents observe and respond to each other’s messages
  • Flexible, dynamic interaction patterns
Use cases:
  • Multi-agent discussions and debates
  • Collaborative problem-solving
  • Simulations with multiple autonomous entities
  • Scenarios requiring peer-to-peer communication
Core tools:
  • MsgHub: Automatically broadcasts messages among a group of agents
  • Pipeline: Provides structured execution patterns (sequential, fanout)

Combining Both Paradigms

A single application can use both paradigms. For example, a master agent might orchestrate multiple conversation groups, or agents within a conversation might delegate specialized tasks to worker agents.

Master-Worker Pattern

Explicit Routing

Route user queries to different downstream agents based on the query content. Approach 1: Structured Output
from pydantic import BaseModel, Field
from typing import Literal

class RoutingChoice(BaseModel):
    your_choice: Literal["Content Generation", "Programming", "Search", None] = Field(
        description="Choose the follow-up task",
    )

msg_res = await router(msg_user, structured_model=RoutingChoice)
# msg_res.metadata["your_choice"] == "Programming"
Approach 2: Tool Calls Wrap downstream agents as tool functions:
from agentscope.tool import Toolkit, ToolResponse

async def generate_python(demand: str) -> ToolResponse:
    """Generate Python code based on the demand."""
    python_agent = ReActAgent(name="PythonAgent", ...)
    res = await python_agent(Msg("user", demand, "user"))
    return ToolResponse(content=res.get_content_blocks("text"))

toolkit = Toolkit()
toolkit.register_tool_function(generate_python)

router = ReActAgent(name="Router", toolkit=toolkit, ...)
await router(msg_user)  # Automatically routes to generate_python if needed

Agent-as-Tool

Wrap entire agents as tool functions to enable dynamic agent creation and delegation. The master agent invokes these tools to create and execute worker agents.
from agentscope.tool import Toolkit, ToolResponse, execute_python_code

async def create_worker(task_description: str) -> ToolResponse:
    """Create a worker agent to finish the given task.

    Args:
        task_description: The description of the task to be done by the worker,
            should contain all the necessary information.

    Returns:
        ToolResponse containing the worker's result.
    """
    # Create a toolkit for the worker
    toolkit = Toolkit()
    toolkit.register_tool_function(execute_python_code)

    # Create a worker agent with specific capabilities
    worker = ReActAgent(
        name="Worker",
        sys_prompt="You're a worker agent. Finish the given task using your tools.",
        model=DashScopeChatModel(...),
        formatter=DashScopeChatFormatter(),
        toolkit=toolkit,
    )

    # Execute the task
    res = await worker(Msg("user", task_description, "user"))
    return ToolResponse(content=res.get_content_blocks("text"))


# The master agent uses create_worker as a tool
toolkit = Toolkit()
toolkit.register_tool_function(create_worker)

master = ReActAgent(
    name="Master",
    sys_prompt="Decompose tasks and create workers to finish them.",
    toolkit=toolkit,
    ...
)

await master(Msg("user", "Execute hello world in Python", "user"))
Key benefits:
  • Dynamic worker creation based on task requirements
  • Workers can have different capabilities and tools
  • Master agent focuses on planning and coordination
  • Workers are isolated and can run concurrently

Conversation/SOP/Workflow Pattern

MsgHub

MsgHub is an async context manager that automatically broadcasts messages among a group of agents:
import asyncio
import os
from agentscope.agent import ReActAgent
from agentscope.formatter import DashScopeMultiAgentFormatter
from agentscope.message import Msg
from agentscope.model import DashScopeChatModel
from agentscope.pipeline import MsgHub


def create_agent(name, age, career):
    return ReActAgent(
        name=name,
        sys_prompt=f"You're {name}, a {age}-year-old {career}",
        model=DashScopeChatModel(
            model_name="qwen-max",
            api_key=os.environ["DASHSCOPE_API_KEY"],
        ),
        formatter=DashScopeMultiAgentFormatter(),
    )


alice = create_agent("Alice", 50, "teacher")
bob = create_agent("Bob", 35, "engineer")
charlie = create_agent("Charlie", 28, "designer")


async def example_broadcast():
    async with MsgHub(
        participants=[alice, bob, charlie],
        announcement=Msg(
            "user",
            "Now introduce yourself in one sentence.",
            "user",
        ),
    ) as hub:
        await alice()
        await bob()
        await charlie()

asyncio.run(example_broadcast())
Example output:
Alice: Hello, I'm Alice, a 50-year-old teacher with a passion for education.
Bob: Hello, I'm Bob, a 35-year-old engineer who enjoys solving complex problems.
Charlie: Hi, I'm Charlie, a 28-year-old designer with a keen eye for aesthetics.
Dynamic participant management:
async with MsgHub(participants=[alice]) as hub:
    hub.add(bob)            # Add new participant
    hub.delete(alice)       # Remove participant
    await hub.broadcast(    # Manually broadcast a message
        Msg("system", "Topic changed!", "system"),
    )

MsgHub Parameter Reference

from agentscope.pipeline import MsgHub

async with MsgHub(
    participants=[alice, bob, charlie],
    announcement=Msg("system", "Let's begin.", "system"),
    enable_auto_broadcast=True,
    name="meeting-room",
) as hub:
    await alice()   # Bob and Charlie auto-receive Alice's reply
    await bob()     # Alice and Charlie auto-receive Bob's reply
ParameterDefaultDescription
participantsList of agents to include
announcementNoneMessage(s) to broadcast on enter
enable_auto_broadcastTrueAuto-broadcast replies to all participants
namerandom UUIDName identifier for this hub
Methods:
MethodDescription
hub.add(agent)Add one or more agents as participants
hub.delete(agent)Remove one or more agents
hub.broadcast(msg)Manually broadcast a message to all participants
hub.set_auto_broadcast(bool)Enable/disable auto-broadcast
How it works: When entering the context, MsgHub registers each participant as a subscriber of all other participants. When any participant generates a reply via __call__, the reply message is automatically sent to all other participants via their observe() method. On exit, all subscriptions are cleaned up.
Newly added participants (via hub.add()) will not receive previous messages — only future ones.
When enable_auto_broadcast=False, MsgHub only broadcasts via the announcement parameter and the broadcast() method. This is useful when you want fine-grained control over message routing.

Pipeline

Sequential Pipeline

Execute agents in order, passing output from one to the next.
from agentscope.pipeline import sequential_pipeline, SequentialPipeline

# Functional style
msg = await sequential_pipeline(agents=[alice, bob, charlie], msg=initial_msg)

# Class-based (reusable)
pipeline = SequentialPipeline(agents=[alice, bob, charlie])
msg = await pipeline(msg=initial_msg)
ParameterDescription
agentsList of agents to execute in order
msgInitial input message (can be None)
Behavior: Equivalent to msg = await alice(msg); msg = await bob(msg); msg = await charlie(msg).

Fanout Pipeline

Distribute the same input to multiple agents and collect responses.
from agentscope.pipeline import fanout_pipeline, FanoutPipeline

# Functional style
msgs = await fanout_pipeline(
    agents=[alice, bob, charlie],
    msg=input_msg,
    enable_gather=True,
)

# Class-based (reusable)
pipeline = FanoutPipeline(agents=[alice, bob, charlie])
msgs = await pipeline(msg=input_msg)
ParameterDefaultDescription
agentsList of agents to receive the input
msgInput message (deep-copied for each agent)
enable_gatherTrueTrue: concurrent via asyncio.gather(). False: sequential
Returns: A list of Msg objects, one from each agent.
Choose enable_gather=True for performance (parallel I/O), or False for deterministic ordering.

Stream Printing Messages

Convert an agent’s internal print messages into an async generator for streaming to a web UI or other consumers.
from agentscope.pipeline import stream_printing_messages

agent.set_console_output_enabled(False)  # Avoid duplicate output

async for msg, last in stream_printing_messages(
    agents=[agent],
    coroutine_task=agent(Msg("user", "Hello!", "user")),
):
    print(msg.get_text_content(), end="\r")
    if last:
        print()  # Final message
ParameterDescription
agentsList of agents whose print messages to capture
coroutine_taskThe coroutine to execute while capturing messages
queueOptional custom asyncio.Queue (uses agents’ shared queue by default)
end_signalSignal string for end of stream (default: "[END]")
yield_speechIf True, yields (msg, last, speech) tuples including audio data
How messages are identified: Messages with the same msg.id are considered the same message being updated (streaming). The content is accumulative (not delta), so each yield contains the latest full content.

ChatRoom

from agentscope.pipeline import ChatRoom

chat_room = ChatRoom(agents=[agent1, agent2])
await chat_room.start(outgoing_queue)   # Connect all agents
await chat_room.stop()                  # Disconnect all agents
Parameter / MethodDescription
ChatRoom(agents)Initialize with a list of RealtimeAgent instances
await start(outgoing_queue)Connect all agents, start the internal forwarding loop
await stop()Disconnect all agents, cancel the forwarding loop
await handle_input(event)Forward a ClientEvent from the frontend to all agents
Internal forwarding loop: The ChatRoom maintains a central asyncio.Queue. When a ServerEvent is received from any agent, it is forwarded to the outgoing_queue (for the frontend) and broadcast to all other agents (excluding the sender, identified by agent_id). When a ClientEvent is received, it is distributed to all agents via handle_input().

Formatter Reference

For an introduction to Chat vs MultiAgent formatters and how to choose one, see Model — Formatter.

Formatter Table

ProviderChat FormatterMultiAgent Formatter
DashScopeDashScopeChatFormatterDashScopeMultiAgentFormatter
OpenAIOpenAIChatFormatterOpenAIMultiAgentFormatter
AnthropicAnthropicChatFormatterAnthropicMultiAgentFormatter
GeminiGeminiChatFormatterGeminiMultiAgentFormatter
OllamaOllamaChatFormatterOllamaMultiAgentFormatter
DeepSeekDeepSeekChatFormatterDeepSeekMultiAgentFormatter
All formatters are importable from agentscope.formatter.

Formatting Example

Example of how DashScopeMultiAgentFormatter transforms messages:
# Input messages
[
    Msg("system", "You're Bob.", "system"),
    Msg("Alice", "Hi!", "user"),
    Msg("Bob", "Nice to meet you.", "assistant"),
    Msg("Charlie", "Me too!", "assistant"),
]

# Formatted output for LLM
[
    {"role": "system", "content": "You're Bob."},
    {"role": "user", "content": "# Conversation History\nThe content between <history></history> tags contains your conversation history\n<history>\nAlice: Hi!\nBob: Nice to meet you.\nCharlie: Me too!\n</history>"},
]
The system message is preserved as-is. All other messages are combined into a &lt;history&gt; section within a single user message, with each speaker’s name prefixed to their text.

Workflow Pattern Examples

This section provides practical code examples demonstrating how to implement common multi-agent orchestration patterns using both paradigms.

Conversation/SOP Pattern Examples

User-Agent Conversation (Chatbot)

The simplest pattern — a user and an agent take turns:
import asyncio
import os
from agentscope.agent import ReActAgent, UserAgent
from agentscope.formatter import DashScopeChatFormatter
from agentscope.model import DashScopeChatModel

friday = ReActAgent(
    name="Friday",
    sys_prompt="You're a helpful assistant named Friday",
    model=DashScopeChatModel(
        model_name="qwen-max",
        api_key=os.environ["DASHSCOPE_API_KEY"],
    ),
    formatter=DashScopeChatFormatter(),  # "Chat" formatter for two-party conversation
)
user = UserAgent(name="User")

async def run_chatbot():
    msg = None
    while True:
        msg = await friday(msg)
        msg = await user(msg)
        if msg.get_text_content() == "exit":
            break

asyncio.run(run_chatbot())
Use ChatFormatter (e.g., DashScopeChatFormatter) for user-agent conversations — it uses the role field to distinguish user and assistant. Use MultiAgentFormatter when more than two agents are involved.

Multi-Agent Discussion

When more than two agents are involved, use MultiAgentFormatter (e.g., DashScopeMultiAgentFormatter) and MsgHub:
from agentscope.formatter import DashScopeMultiAgentFormatter

# MultiAgentFormatter wraps all messages into a single user message
# with name prefixes, so the LLM can distinguish different speakers
formatter = DashScopeMultiAgentFormatter()
The formatter converts multi-party history like this:
Alice: Hi!
Bob: Nice to meet you guys.
Charlie: Me too!
into a single user message with XML-tagged history, suitable for LLM APIs that only support user/assistant roles. Combined with MsgHub, a multi-agent discussion is simply:
async with MsgHub(
    [alice, bob, charlie],
    announcement=Msg("system", "Introduce yourselves.", "system"),
):
    await alice()
    await bob()
    await charlie()

Multi-Agent Debate

Multiple agents discuss a topic in rounds, with a moderator deciding when consensus is reached:
from pydantic import BaseModel, Field

class JudgeModel(BaseModel):
    finished: bool = Field(description="Whether the debate is finished.")
    correct_answer: str | None = Field(default=None)


async def run_debate():
    while True:
        # Debaters discuss within MsgHub
        async with MsgHub(participants=[alice, bob, moderator]):
            await alice(Msg("user", "Present your viewpoint.", "user"))
            await bob(Msg("user", "Present your counter-argument.", "user"))

        # Moderator judges outside MsgHub (debaters don't need to see the verdict)
        msg_judge = await moderator(
            Msg("user", "Can you determine the correct answer?", "user"),
            structured_model=JudgeModel,
        )

        if msg_judge.metadata.get("finished"):
            print("Answer:", msg_judge.metadata.get("correct_answer"))
            break

asyncio.run(run_debate())

Concurrent Agents

Use asyncio.gather() to run agents concurrently:
async def run_concurrent():
    agent1 = ReActAgent(name="Agent1", ...)
    agent2 = ReActAgent(name="Agent2", ...)

    results = await asyncio.gather(
        agent1(Msg("user", "Task A", "user")),
        agent2(Msg("user", "Task B", "user")),
    )

asyncio.run(run_concurrent())
Both agents start simultaneously and run in parallel (since LLM API calls are I/O-bound, asyncio handles them efficiently).
Combine MsgHub with sequential_pipeline or fanout_pipeline for more complex workflows.

Realtime Voice Chat Room

For realtime voice agent scenarios, ChatRoom orchestrates multiple RealtimeAgent instances sharing a session:
from agentscope.pipeline import ChatRoom

chat_room = ChatRoom(agents=[agent1, agent2])
await chat_room.start(outgoing_queue)   # Connect all agents, start forwarding loop

# ... handle events ...

await chat_room.stop()                  # Disconnect all agents, cancel forwarding loop
Unlike MsgHub (which works with text-based agents), ChatRoom handles ServerEvents and ClientEvents in the realtime voice pipeline. When one agent generates a response, ChatRoom forwards it both to the frontend and to other agents (excluding the sender).

Master-Worker Pattern Examples

Routing

Route user queries to different downstream agents based on the query content. Two approaches: Approach 1: Structured Output
from pydantic import BaseModel, Field
from typing import Literal

class RoutingChoice(BaseModel):
    your_choice: Literal["Content Generation", "Programming", "Search", None] = Field(
        description="Choose the follow-up task",
    )

msg_res = await router(msg_user, structured_model=RoutingChoice)
# msg_res.metadata["your_choice"] == "Programming"
Approach 2: Tool Calls Wrap downstream agents as tool functions:
from agentscope.tool import Toolkit, ToolResponse

async def generate_python(demand: str) -> ToolResponse:
    """Generate Python code based on the demand."""
    python_agent = ReActAgent(name="PythonAgent", ...)
    res = await python_agent(Msg("user", demand, "user"))
    return ToolResponse(content=res.get_content_blocks("text"))

toolkit = Toolkit()
toolkit.register_tool_function(generate_python)

router = ReActAgent(name="Router", toolkit=toolkit, ...)
await router(msg_user)  # Automatically routes to generate_python if needed

Orchestrator-Workers (Handoffs)

An orchestrator decomposes tasks and dynamically creates worker agents:
from agentscope.tool import execute_python_code

async def create_worker(task_description: str) -> ToolResponse:
    """Create a worker to finish the given task."""
    toolkit = Toolkit()
    toolkit.register_tool_function(execute_python_code)

    worker = ReActAgent(
        name="Worker",
        sys_prompt="You're a worker agent. Finish the given task.",
        model=DashScopeChatModel(...),
        formatter=DashScopeChatFormatter(),
        toolkit=toolkit,
    )
    res = await worker(Msg("user", task_description, "user"))
    return ToolResponse(content=res.get_content_blocks("text"))


# The orchestrator uses create_worker as a tool
toolkit = Toolkit()
toolkit.register_tool_function(create_worker)

orchestrator = ReActAgent(
    name="Orchestrator",
    sys_prompt="Decompose the task and create workers to finish them.",
    toolkit=toolkit,
    ...
)
await orchestrator(Msg("user", "Execute hello world in Python", "user"))