The core concepts and deployment strategies for transforming agents into scalable, production-ready services
Agent as Service (AaaS) represents the core mission of AgentScope Runtime: transforming agent applications into deployable, scalable services that can be accessed through standardized APIs and interfaces, just like any other production service.
AgentScope Runtime is a full-stack runtime for AI agents designed to solve two core challenges: efficient deployment & serving and secure sandboxed execution.In short:AgentScope Runtime=Tool Sandboxing+AaaS APIs+Scalable Deployment+Full-stack Observability (Logs/Traces)+Framework Compatibility
This example demonstrates how to create an agent API server using agentscope ReActAgent and AgentApp. To run a minimal AgentScope Agent with AgentScope Runtime, you generally need to implement:
Define lifespan – Use contextlib.asynccontextmanager to manage resource initialization (e.g., state services) at startup and cleanup on exit.
@agent_app.query(framework="agentscope") – Core logic for handling requests, must usestream_printing_messages to yield msg, last for streaming output
agent_app.py
# Shared configuration for all deployment methods# -*- coding: utf-8 -*-import asyncioimport osfrom contextlib import asynccontextmanagerfrom fastapi import FastAPIfrom agentscope.agent import ReActAgentfrom agentscope.formatter import DashScopeChatFormatterfrom agentscope.model import DashScopeChatModelfrom agentscope.pipeline import stream_printing_messagesfrom agentscope.tool import Toolkit, execute_python_codefrom agentscope.memory import InMemoryMemoryfrom agentscope.session import RedisSessionfrom agentscope_runtime.engine.app import AgentAppfrom agentscope_runtime.engine.schemas.agent_schemas import AgentRequest# Define Lifecycle@asynccontextmanagerasync def lifespan(app: FastAPI): # Startup Phase import fakeredis fake_redis = fakeredis.aioredis.FakeRedis( decode_responses=True ) # NOTE: This FakeRedis instance is for development/testing only. # In production, replace it with your own Redis client/connection # (e.g., aioredis.Redis) app.state.session = RedisSession( connection_pool=fake_redis.connection_pool ) try: yield finally: print("AgentApp is shutting down...")# Pass the defined lifespan to AgentAppapp = AgentApp( app_name="Friday", app_description="A helpful assistant", lifespan=lifespan,)# Define Request Handling Logic@app.query(framework="agentscope")async def query_func( self, msgs, request: AgentRequest = None, **kwargs,): assert kwargs is not None, "kwargs is Required for query_func" session_id = request.session_id user_id = request.user_id toolkit = Toolkit() toolkit.register_tool_function(execute_python_code) agent = ReActAgent( name="Friday", model=DashScopeChatModel( "qwen-turbo", api_key=os.getenv("DASHSCOPE_API_KEY"), enable_thinking=True, stream=True, ), sys_prompt="You're a helpful assistant named Friday.", toolkit=toolkit, memory=InMemoryMemory(), formatter=DashScopeChatFormatter(), ) # Load agent state await app.state.session.load_session_state( session_id=session_id, user_id=user_id, agent=agent, ) try: async for msg, last in stream_printing_messages( agents=[agent], coroutine_task=agent(msgs), ): yield msg, last except asyncio.CancelledError: # Interruption logic print(f"Task {session_id} was manually interrupted.") await agent.interrupt() raise finally: # Save agent state await app.state.session.save_session_state( session_id=session_id, user_id=user_id, agent=agent, )# Create AgentApp with multiple endpoints@app.post("/stop")async def stop_task(request: AgentRequest): # Endpoint to trigger task interruption await app.stop_chat( user_id=request.user_id, session_id=request.session_id, ) return { "status": "success", "message": "Interrupt signal broadcasted.", }@app.endpoint("/sync")def sync_handler(request: AgentRequest): return {"status": "ok", "payload": request}@app.endpoint("/async")async def async_handler(request: AgentRequest): return {"status": "ok", "payload": request}@app.endpoint("/stream_async")async def stream_async_handler(request: AgentRequest): for i in range(5): yield f"async chunk {i}, with request payload {request}\n"@app.endpoint("/stream_sync")def stream_sync_handler(request: AgentRequest): for i in range(5): yield f"sync chunk {i}, with request payload {request}\n"@app.task("/task", queue="celery1")def task_handler(request: AgentRequest): import time time.sleep(30) return {"status": "ok", "payload": request}@app.task("/atask")async def atask_handler(request: AgentRequest): import asyncio await asyncio.sleep(15) return {"status": "ok", "payload": request}print("✅ Agent and endpoints configured successfully")
The above configuration is shared across all deployment methods below. Each method will show only the deployment-specific code.
After grasping the concepts and completing the quickstart, deployment is the bridge that turns experimental prototypes into reliable services. Its significance is underpinned by three core pillars:
Connect to real workloads: Moving agents from notebooks or scripts into a continuously running environment is the only way to serve real users, tools, and data.
Gain operational stability: Runtime offers standardized lifecycles, health checks, and scaling hooks that simplify monitoring and rollback.
Reuse the ecosystem: A unified deployment approach lets you reuse memory, sandbox, state, and other foundational services instead of rebuilding them per project.
Once deployed, you can test the endpoints using curl or Python:Using curl:
# Test health endpointcurl http://localhost:8080/health# Call sync endpointcurl -X POST http://localhost:8080/sync \ -H "Content-Type: application/json" \ -d '{"input": [{"role": "user", "content": [{"type": "text", "text": "What is the weather in Beijing?"}]}], "session_id": "123"}'# Call streaming endpointcurl -X POST http://localhost:8080/stream_sync \ -H "Content-Type: application/json" \ -d '{"input": [{"role": "user", "content": [{"type": "text", "text": "What is the weather in Beijing?"}]}], "session_id": "123"}'# Submit a taskcurl -X POST http://localhost:8080/task \ -H "Content-Type: application/json" \ -d '{"input": [{"role": "user", "content": [{"type": "text", "text": "What is the weather in Beijing?"}]}], "session_id": "123"}'
Using OpenAI SDK:
from openai import OpenAIclient = OpenAI(base_url="http://0.0.0.0:8080/compatible-mode/v1")response = client.responses.create( model="any_name", input="What is the weather in Beijing?")print(response)
# Ensure environment variables are setexport DASHSCOPE_API_KEY="your-dashscope-api-key"export ALIBABA_CLOUD_ACCESS_KEY_ID="your-access-key-id"export ALIBABA_CLOUD_ACCESS_KEY_SECRET="your-access-key-secret"export MODELSTUDIO_WORKSPACE_ID="your-workspace-id"# Optional: Set this if you are using STS temporary credentials (Recommended)export ALIBABA_CLOUD_SECURITY_TOKEN="your-sts-token"# Optional OSS-specific credentialsexport OSS_ACCESS_KEY_ID="your-oss-access-key-id"export OSS_ACCESS_KEY_SECRET="your-oss-access-key-secret"export OSS_SESSION_TOKEN="your-oss-sts-token"
Best for: Enterprise users who need to deploy on Alibaba Cloud PAI platform, leveraging LangStudio for project management and EAS (Elastic Algorithm Service) for service deployment.
If using a RAM user account, PAI Developer Role must be assigned
OSS bucket must be configured for storing build artifacts
(Optional) VPC with public network access if using DashScope models
Services deployed to PAI EAS have no public network access by default. If using DashScope models, configure a VPC with public network access. Reference: Configure Network Connectivity
PAI deployment recommends using configuration files for clarity and maintainability:Method 1: Using Configuration File (Recommended)
# Navigate to example directorycd examples/deployments/pai_deploy# Deploy using config fileagentscope deploy pai --config deploy_config.yaml# Deploy with CLI overridesagentscope deploy pai --config deploy_config.yaml --name new-service-name
Method 9: Serverless Deployment: Function Compute (FC)
Best For: Alibaba Cloud users who need to deploy agents to Function Compute (FC) service with automated build, upload, and deployment workflows. FC provides a true serverless experience with pay-per-use pricing and automatic scaling.
FCConfig( access_key_id="your-access-key-id", access_key_secret="your-access-key-secret", account_id="your-account-id", region_id="cn-hangzhou", # Supported regions: cn-hangzhou, cn-beijing, etc. cpu=2.0, # CPU cores memory=2048, # Memory in MB disk=512, # Disk in MB)