AI Agent 架构设计指南:从原理到生产实践
> 2026 年,AI 行业最大的变化不是模型更大、推理更快——而是 AI 从「回答问题的工具」变成了「干活的同事」。OpenAI 在 2026 年 3 月发布 Agents SDK,Google 在 4 月发布 A2A 协议,Anthropic 的 MCP 生态月下载量突破 9,700 万——它们指向同一件事:Agent 时代来了。 > > Gartner 预测到 2026 年底 40% 的企业应用会包含 AI Agent。但大多数开发者对 Agent 的理解还停留在「让 AI 多调几个工具」的阶段。本文要做的是:把 Agent 架构从头到尾讲清楚,让你能设计、能实现、能上线。
目录:
1. Agent 核心架构模式
1.1 什么是 Agent
Agent = LLM + 工具 + 记忆 + 自主决策。
Chatbot 的流程是:用户提问 → AI 回答。Agent 的流程是:用户给目标 → AI 思考 → 选择工具 → 执行 → 观察结果 → 再思考 → 再执行 → 直到目标达成。
这个「思考—行动—观察」循环,就是一切 Agent 架构的基础。
1.2 ReAct(Reasoning + Acting)
最经典的模式,也是大多数 Agent 实现的起点。
┌─────────────────────────────────────┐
│ ReAct 循环 │
│ │
│ User Goal ──→ Thought ──→ Action │
│ ↑ │ │
│ │ ↓ │
│ └── Observation │
│ │
│ 如果 Observation 表明目标已达成 → 返回结果 │
│ 否则 → 继续循环 │
└─────────────────────────────────────┘
Python 骨架实现:
import json
from typing import Any
class ReActAgent:
def __init__(self, llm, tools: dict):
self.llm = llm # LLM 调用接口
self.tools = tools # {"tool_name": callable}
self.max_steps = 10
def run(self, goal: str) -> str:
context = [{"role": "user", "content": goal}]
for step in range(self.max_steps):
# 1. 思考 + 决策
response = self.llm.chat(context, tools_schema=self._tool_schemas())
# 2. 如果模型直接回复(无工具调用),结束
if response.content and not response.tool_calls:
return response.content
# 3. 执行工具调用
for tool_call in response.tool_calls:
tool_name = tool_call.name
tool_args = json.loads(tool_call.arguments)
result = self.tools[tool_name](**tool_args)
# 4. 将观察结果加入上下文
context.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": str(result)
})
# 5. 将 AI 的思考也加入上下文
context.append(response.message)
return "Agent 达到最大步数,任务未完成"
def _tool_schemas(self) -> list[dict]:
"""将工具转换为 LLM 可理解的 JSON Schema"""
return [
{
"name": name,
"description": func.__doc__ or "",
"parameters": {"type": "object", "properties": {}}
}
for name, func in self.tools.items()
]
适用场景:
- 需要多步推理的任务(代码审查、故障排查)
- 工具调用结果不确定的情况(搜索、数据分析)
- 交互式任务(用户中途可干预)
局限:
- 没有全局规划,容易「走一步看一步」失去方向
- 步数多了之后上下文膨胀
- 不适合需要严格顺序执行的场景
1.3 Plan-Execute
先规划再执行。适合步骤明确的复杂任务。
┌──────────────┐ ┌──────────────────┐
│ Planner │ │ Executor │
│ (LLM Call) │ │ (ReAct Agent) │
│ │ │ │
│ Goal ──→ │ │ Step 1 ──→ ✓ │
│ Plan: │────→│ Step 2 ──→ ✓ │
│ 1. 分析需求 │ │ Step 3 ──→ ✗ │
│ 2. 设计方案 │ │ (Replan) │
│ 3. 编写代码 │ │ Step 3'──→ ✓ │
│ 4. 测试验证 │ │ → Result │
└──────────────┘ └──────────────────┘
Python 骨架实现:
class PlanExecuteAgent:
def __init__(self, llm, tools: dict):
self.llm = llm
self.tools = tools
self.executor = ReActAgent(llm, tools)
def run(self, goal: str) -> str:
# 1. 规划阶段
plan_prompt = f"""
为完成以下目标,制定一个详细的步骤计划。
目标:{goal}
可用工具:{', '.join(self.tools.keys())}
返回 JSON 格式:
{{"steps": ["步骤1", "步骤2", ...]}}
"""
plan_response = self.llm.chat([{"role": "user", "content": plan_prompt}])
plan = json.loads(plan_response.content)
# 2. 逐步执行
results = []
for i, step in enumerate(plan["steps"]):
step_result = self.executor.run(step)
results.append({"step": step, "result": step_result})
# 如果某步失败,重新规划剩余步骤
if "失败" in step_result or "错误" in step_result:
remaining = plan["steps"][i+1:]
replan = self._replan(goal, step, step_result, remaining)
plan["steps"] = plan["steps"][:i+1] + replan
return self._summarize(goal, results)
def _replan(self, goal, failed_step, error, remaining):
"""某步失败后重新规划"""
prompt = f"步骤 '{failed_step}' 失败:{error}。剩余步骤:{remaining}。重新规划剩余步骤。"
response = self.llm.chat([{"role": "user", "content": prompt}])
return json.loads(response.content)["steps"]
适用场景:
- 需求明确的开发任务(实现一个功能)
- 部署流水线(构建→测试→部署→验证)
- 数据分析工作流
局限:
- 初始规划可能不准确
- 规划本身需要一次 LLM 调用,增加延迟和成本
- 动态环境(如实时数据)不适合静态规划
1.4 Router Agent
一个主 Agent 根据任务类型路由到不同的专业 Agent:
┌──────────┐
│ Router │
│ Agent │
└────┬─────┘
│
┌────────┼────────┐
↓ ↓ ↓
┌─────────┐ ┌──────┐ ┌────────┐
│ Coder │ │Search│ │ Devops │
│ Agent │ │Agent │ │ Agent │
└─────────┘ └──────┘ └────────┘
class RouterAgent:
def __init__(self, llm, specialists: dict[str, ReActAgent]):
self.llm = llm
self.specialists = specialists
def run(self, task: str) -> str:
# 1. 分类任务
categories = "\n".join(f"- {name}: {agent.description}" for name, agent in self.specialists.items())
route_prompt = f"""
任务:{task}
可用专家:
{categories}
返回 JSON: {{"expert": "专家名称", "reason": "选择理由"}}
"""
response = self.llm.chat([{"role": "user", "content": route_prompt}])
route = json.loads(response.content)
# 2. 路由到专家
expert = self.specialists[route["expert"]]
return expert.run(task)
1.5 Multi-Agent Orchestra
多个 Agent 并行或串行协作。最复杂但最强大的模式:
┌──────────────┐
│ Orchestrator │
└──────┬───────┘
│
┌────────────────┼────────────────┐
↓ ↓ ↓
┌──────────┐ ┌──────────┐ ┌──────────┐
│ Architect│ │ Coder │ │ Reviewer │
│ Agent │ │ Agent │ │ Agent │
└──────────┘ └──────────┘ └──────────┘
│ │ │
└────────────────┼────────────────┘
↓
┌──────────┐
│ Result │
└──────────┘
详细实现见第 6 章。
2. 工具调用系统设计
2.1 Function Calling vs MCP
| 维度 | Function Calling | MCP |
|---|---|---|
| 标准化程度 | 厂商各自定义 | 开放协议,跨厂商通用 |
| 工具描述 | 在每次 API 调用中传 | Server 注册时声明 |
| 可移植性 | 绑定到特定 LLM | 一次实现,所有 MCP 客户端可用 |
| 部署 | LLM 内部 | 独立进程,可远程 |
| 安全性 | 依赖 LLM 厂商 | 进程隔离 + OAuth 2.1 |
选择建议:
- 个人项目 / 快速原型:Function Calling 更快
- 团队项目 / 需要复用:MCP 是更好的投资
- 两者可以并存——在 MCP Server 中封装 Function Calling 逻辑
2.2 工具描述工程
LLM 只能通过你的工具描述来判断「这个工具干什么」。描述质量 = 调用准确率。
好的工具描述示例:
# ❌ 差
def search(query: str) -> list:
"""搜索"""
# ✅ 好
def search_knowledge_base(query: str, top_k: int = 5, filters: dict = None) -> list:
"""在内部知识库中搜索与 query 语义相关的内容。
返回 top_k 条最相关的结果,每项包含 {title, content, score, url}。
支持按 filters = {category: "技术文档", date_range: "2026-01-01,2026-06-01"} 过滤。
当用户问到公司内部政策、技术方案或历史决策时优先使用此工具。
"""
描述最佳实践:
- 说清楚输入输出:参数类型、格式、示例
- 说清楚何时使用:比「做什么」更重要——直接告诉 LLM 什么场景该调用
- 写明限制条件:超时时间、最大返回数、权限要求
- 给参数提供默认值:减少 LLM 决策负担
2.3 工具分组与权限
class ToolRegistry:
def __init__(self):
self._tools: dict[str, dict] = {}
def register(self, name: str, func, group: str, risk_level: str):
self._tools[name] = {
"func": func,
"group": group,
"risk_level": risk_level, # "safe" | "write" | "dangerous"
}
def get_safe_tools(self) -> list:
"""只读操作,可以自动执行"""
return [t for t in self._tools.values() if t["risk_level"] == "safe"]
def get_write_tools(self) -> list:
"""写操作,需要用户确认"""
return [t for t in self._tools.values() if t["risk_level"] == "write"]
def get_dangerous_tools(self) -> list:
"""高危操作,需要额外审批"""
return [t for t in self._tools.values() if t["risk_level"] == "dangerous"]
# 使用示例
registry = ToolRegistry()
registry.register("read_file", read_file, group="filesystem", risk_level="safe")
registry.register("write_file", write_file, group="filesystem", risk_level="write")
registry.register("execute_sql", execute_sql, group="database", risk_level="dangerous")
2.4 错误处理与重试
import asyncio
from functools import wraps
def tool_retry(max_retries: int = 3, backoff: float = 2.0):
"""工具调用重试装饰器"""
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_retries):
try:
return await func(*args, **kwargs)
except Exception as e:
last_error = e
if attempt < max_retries - 1:
wait = backoff ** attempt
print(f"⚠️ 工具 {func.__name__} 失败 (attempt {attempt+1}/{max_retries}), {wait}s 后重试: {e}")
await asyncio.sleep(wait)
# 所有重试失败后返回结构化错误
return {
"error": str(last_error),
"tool": func.__name__,
"retries_exhausted": True
}
return wrapper
return decorator
3. 记忆系统架构
3.1 三层记忆模型
┌─────────────────────────────────────────────┐
│ 记忆系统架构 │
│ │
│ ┌───────────────────────────────────────┐ │
│ │ 短期记忆 (Conversation) │ │
│ │ 对话历史、当前上下文 │ │
│ │ 容量:~100K tokens │ │
│ │ 生命周期:单次会话 │ │
│ └───────────────────────────────────────┘ │
│ ↕ │
│ ┌───────────────────────────────────────┐ │
│ │ 工作记忆 (Scratchpad) │ │
│ │ 中间结果、当前计划、草稿 │ │
│ │ 容量:~10K tokens │ │
│ │ 生命周期:单次任务 │ │
│ └───────────────────────────────────────┘ │
│ ↕ │
│ ┌───────────────────────────────────────┐ │
│ │ 长期记忆 (Vector DB) │ │
│ │ 知识、经验、用户偏好 │ │
│ │ 容量:无限(受限于存储) │ │
│ │ 生命周期:永久 │ │
│ └───────────────────────────────────────┘ │
└─────────────────────────────────────────────┘
3.2 短期记忆管理
核心挑战:上下文窗口有限,长时间对话需要压缩。
class ShortTermMemory:
def __init__(self, max_tokens: int = 100_000):
self.max_tokens = max_tokens
self.messages: list = []
def add(self, message: dict):
self.messages.append(message)
self._maybe_compress()
def _maybe_compress(self):
"""当 token 数接近上限时压缩旧消息"""
total = self._count_tokens()
if total < self.max_tokens * 0.8:
return
# 保留最近 10 条消息不变,压缩更早的消息
recent = self.messages[-10:]
older = self.messages[:-10]
if older:
summary = self._summarize(older)
self.messages = [{"role": "system", "content": f"此前对话摘要:{summary}"}] + recent
def _summarize(self, messages: list) -> str:
"""调用 LLM 生成摘要"""
# 实际实现:调用一个轻量模型做摘要
return f"共 {len(messages)} 条历史消息(已压缩)"
def _count_tokens(self) -> int:
return sum(len(str(m.get("content", ""))) // 4 for m in self.messages)
3.3 长期记忆(pgvector 实现)
import psycopg2
from pgvector.psycopg2 import register_vector
class LongTermMemory:
def __init__(self, conn_string: str, embedding_model):
self.conn = psycopg2.connect(conn_string)
register_vector(self.conn)
self.embed = embedding_model
self._create_table()
def _create_table(self):
with self.conn.cursor() as cur:
cur.execute("""
CREATE TABLE IF NOT EXISTS agent_memory (
id SERIAL PRIMARY KEY,
content TEXT NOT NULL,
embedding vector(1536),
memory_type TEXT DEFAULT 'fact',
importance FLOAT DEFAULT 0.5,
created_at TIMESTAMP DEFAULT NOW(),
last_accessed TIMESTAMP DEFAULT NOW(),
access_count INT DEFAULT 0
);
CREATE INDEX IF NOT EXISTS agent_memory_embedding_idx
ON agent_memory USING ivfflat (embedding vector_cosine_ops);
""")
self.conn.commit()
def store(self, content: str, memory_type: str = "fact", importance: float = 0.5):
"""存储一条记忆"""
embedding = self.embed(content)
with self.conn.cursor() as cur:
cur.execute(
"""INSERT INTO agent_memory (content, embedding, memory_type, importance)
VALUES (%s, %s, %s, %s)""",
(content, embedding, memory_type, importance)
)
self.conn.commit()
def recall(self, query: str, top_k: int = 5) -> list[str]:
"""语义检索相关记忆"""
query_embedding = self.embed(query)
with self.conn.cursor() as cur:
cur.execute("""
SELECT content, 1 - (embedding <=> %s) AS similarity
FROM agent_memory
ORDER BY embedding <=> %s
LIMIT %s
""", (query_embedding, query_embedding, top_k))
results = cur.fetchall()
# 更新访问记录
for r in results:
with self.conn.cursor() as cur:
cur.execute(
"UPDATE agent_memory SET last_accessed = NOW(), access_count = access_count + 1 WHERE content = %s",
(r[0],)
)
self.conn.commit()
return [r[0] for r in results]
def forget(self, days_inactive: int = 30):
"""遗忘不活跃且不重要的记忆"""
with self.conn.cursor() as cur:
cur.execute("""
DELETE FROM agent_memory
WHERE last_accessed < NOW() - INTERVAL '%s days'
AND importance < 0.3
""", (days_inactive,))
self.conn.commit()
3.4 工作记忆(Scratchpad)
class Scratchpad:
"""临时工作区,存储当前任务中间结果"""
def __init__(self):
self.notes: dict[str, Any] = {}
self.plan: list[str] = []
self.current_step: int = 0
def set_plan(self, steps: list[str]):
self.plan = steps
self.current_step = 0
def note(self, key: str, value: Any):
self.notes[key] = value
def get_context_for_llm(self) -> str:
"""生成给 LLM 的工作记忆上下文"""
lines = []
if self.plan:
progress = "\n".join(
f" {'✅' if i < self.current_step else '⬜' if i > self.current_step else '🔄'} {step}"
for i, step in enumerate(self.plan)
)
lines.append(f"**当前计划**:\n{progress}")
if self.notes:
lines.append(f"**工作笔记**: {json.dumps(self.notes, ensure_ascii=False)}")
return "\n".join(lines)
4. 多 Agent 协作
4.1 A2A(Agent-to-Agent)协议
Google 于 2025 年 4 月发布的 Agent 间通信协议,2025 年 6 月捐赠给 Linux Foundation。
Agent A Agent B
│ │
│ 1. 发现 Agent Card │
│ ◄────────────────────── │ (/.well-known/agent-card.json)
│ │
│ 2. 发送任务 │
│ ───────────────────────→ │ (JSON-RPC: tasks/send)
│ │
│ 3. 轮询状态 │
│ ───────────────────────→ │ (JSON-RPC: tasks/get)
│ ◄────────────────────── │
│ │
│ 4. 接收结果 │
│ ◄────────────────────── │ (SSE stream / callback)
A2A Agent Card 示例:
{
"name": "CodeReviewerAgent",
"description": "审查代码变更,检查 Bug、安全问题和代码规范",
"url": "https://agents.company.com/code-reviewer",
"capabilities": {
"streaming": true,
"pushNotifications": true
},
"skills": [
{
"name": "review_pr",
"description": "审查指定的 Pull Request",
"inputSchema": {
"type": "object",
"properties": {
"repo": {"type": "string"},
"pr_number": {"type": "integer"}
}
}
}
]
}
4.2 MCP vs A2A
| 维度 | MCP | A2A |
|---|---|---|
| 方向 | Agent ↔ 工具(垂直) | Agent ↔ Agent(水平) |
| 核心问题 | 「我能用什么工具?」 | 「谁应该处理这个任务?」 |
| 协议层 | JSON-RPC 2.0 | JSON-RPC 2.0 + Agent Cards |
| 治理 | Linux Foundation | Linux Foundation |
它们不是竞争对手——是搭档。 多个 Agent 通过 A2A 协作,每个 Agent 内部通过 MCP 调用工具。
4.3 任务路由策略
class TaskRouter:
"""根据 Agent 能力自动路由任务"""
def __init__(self, agents: dict[str, dict]):
self.agents = agents # {name: {agent, capabilities, cost_per_token}}
def route(self, task: str) -> str:
# 1. 匹配能力
candidates = []
for name, info in self.agents.items():
if self._can_handle(task, info["capabilities"]):
candidates.append((name, info))
if not candidates:
return self._fallback(task)
# 2. 选择最优 Agent(能力匹配度 + 成本)
best = min(candidates, key=lambda c: c[1]["cost_per_token"])
return best[1]["agent"].run(task)
def _can_handle(self, task: str, capabilities: list[str]) -> bool:
task_lower = task.lower()
return any(cap.lower() in task_lower for cap in capabilities)
5. 框架选型指南
5.1 主流框架对比
| 框架 | 理念 | 语言 | Stars | 适合场景 |
|---|---|---|---|---|
| LangGraph | 状态图驱动的 Agent 编排 | Python/JS | ~25K | 复杂工作流,需要精细控制 |
| CrewAI | 角色扮演式多 Agent | Python | ~28K | 快速原型,非技术用户友好 |
| AutoGen | 对话驱动(微软) | Python | ~40K | 微软生态,企业级需求 |
| 自研 | 完全定制 | Any | — | 特殊需求,极致性能 |
5.2 选型决策矩阵
| 你的情况 | 推荐 | 理由 |
|---|---|---|
| 快速验证想法,不想写太多代码 | CrewAI | 声明式定义 Agent 角色,几行代码就能跑 |
| 需要精细控制执行流程 | LangGraph | 状态图可视化,分支/循环/条件全支持 |
| 微软 / Azure 生态 | AutoGen | 与 Azure AI 深度集成 |
| 特殊需求、极致性能、不想被框架限制 | 自研 | 30 行核心代码就能写一个 ReAct Agent |
一个实用建议:先用 CrewAI 或 LangGraph 做原型验证想法,确认 Agent 设计合理后,如果需要更高的性能或定制化,再迁移到自研方案。不要一开始就自研——你会花 80% 的时间解决框架已经解决过的问题。
6. 从零构建多 Agent 系统
6.1 项目:AI 代码助手
三个 Agent 协作完成代码任务:
用户需求
│
↓
┌──────────────┐
│ Architect │ 分析需求,输出技术方案
│ Agent │
└──────┬───────┘
│ 技术方案
↓
┌──────────────┐
│ Coder │ 根据方案编写代码
│ Agent │
└──────┬───────┘
│ 代码
↓
┌──────────────┐
│ Reviewer │ 审查代码,提出修改意见
│ Agent │
└──────┬───────┘
│ 审查通过的代码
↓
最终结果
6.2 完整实现
import json
import asyncio
from dataclasses import dataclass
@dataclass
class AgentConfig:
name: str
role: str
model: str
temperature: float = 0.3
class BaseAgent:
"""Agent 基类"""
def __init__(self, config: AgentConfig, llm, tools: dict = None):
self.config = config
self.llm = llm
self.tools = tools or {}
self.memory = ShortTermMemory()
async def think(self, task: str) -> str:
system_prompt = f"""你是 {self.config.name},{self.config.role}。
请根据任务要求,输出你的工作结果。
如果需要使用工具,在回复中明确说明使用哪个工具和参数。
"""
response = await self.llm.chat([
{"role": "system", "content": system_prompt},
{"role": "user", "content": task}
])
return response.content
class ArchitectAgent(BaseAgent):
"""需求分析 Agent"""
def __init__(self, llm):
super().__init__(
AgentConfig(
name="Architect",
role="技术架构师,负责分析需求并输出详细的技术方案",
model="claude-sonnet-4-6"
),
llm
)
async def design(self, requirement: str) -> dict:
prompt = f"""
请分析以下需求,输出一个详细的技术方案。
需求:{requirement}
输出 JSON 格式:
{{
"summary": "需求概述(一句话)",
"architecture": "架构方案描述",
"tech_stack": {{"frontend": "...", "backend": "...", "database": "..."}},
"api_design": ["端点1 描述", "端点2 描述"],
"components": ["组件1 描述", "组件2 描述"],
"risks": ["风险1", "风险2"],
"implementation_steps": ["步骤1", "步骤2", ...]
}}
"""
result = await self.think(prompt)
return json.loads(result)
class CoderAgent(BaseAgent):
"""编码 Agent"""
def __init__(self, llm, tools: dict):
super().__init__(
AgentConfig(
name="Coder",
role="高级软件工程师,根据技术方案编写高质量代码",
model="claude-sonnet-4-6"
),
llm,
tools
)
async def implement(self, design: dict) -> str:
prompt = f"""
根据以下技术方案编写完整代码:
架构方案:{design.get('architecture')}
技术栈:{json.dumps(design.get('tech_stack', {}), ensure_ascii=False)}
组件列表:{json.dumps(design.get('components', []), ensure_ascii=False)}
实施步骤:{json.dumps(design.get('implementation_steps', []), ensure_ascii=False)}
要求:
1. 编写完整可运行的代码
2. 包含必要的类型注解和注释
3. 每个文件用 ```language:path 标记
"""
return await self.think(prompt)
class ReviewerAgent(BaseAgent):
"""代码审查 Agent"""
def __init__(self, llm):
super().__init__(
AgentConfig(
name="Reviewer",
role="资深代码审查者,检查 Bug、安全问题和代码规范",
model="claude-sonnet-4-6"
),
llm
)
async def review(self, code: str, design: dict) -> dict:
prompt = f"""
审查以下代码,对照原始需求和技术方案进行检查。
原始需求方案:{json.dumps(design, ensure_ascii=False)[:2000]}
代码:
{code[:8000]}
检查项:
1. 是否实现了所有需求功能
2. 是否有安全漏洞
3. 是否有明显的 Bug
4. 代码风格是否一致
5. 是否有性能问题
输出 JSON 格式:
{{
"approved": true/false,
"score": 1-10,
"issues": [
{{"severity": "critical|major|minor", "file": "...", "line": 0, "description": "..."}}
],
"suggestions": ["建议1", "建议2"]
}}
"""
result = await self.think(prompt)
return json.loads(result)
class CodeAssistantOrchestrator:
"""编排器:协调三个 Agent 完成代码任务"""
def __init__(self, llm, tools: dict = None):
self.architect = ArchitectAgent(llm)
self.coder = CoderAgent(llm, tools)
self.reviewer = ReviewerAgent(llm)
self.max_review_rounds = 3
async def run(self, requirement: str) -> dict:
print(f"📋 收到需求: {requirement}\n")
# 1. 架构设计
print("🏗️ Architect Agent 开始分析需求...")
design = await self.architect.design(requirement)
print(f" 方案概要: {design.get('summary')}")
# 2. 编码
print("💻 Coder Agent 开始编写代码...")
code = await self.coder.implement(design)
print(f" 代码行数: {len(code.splitlines())}")
# 3. 审查(最多 3 轮)
for round_num in range(self.max_review_rounds):
print(f"🔍 Reviewer Agent 第 {round_num+1} 轮审查...")
review = await self.reviewer.review(code, design)
if review["approved"]:
print(f"✅ 审查通过!评分: {review['score']}/10")
return {
"requirement": requirement,
"design": design,
"code": code,
"review": review,
"review_rounds": round_num + 1,
"status": "approved"
}
print(f"⚠️ 发现 {len(review['issues'])} 个问题,Coder 开始修复...")
# Coder 根据审查意见修改代码
fix_prompt = f"""
根据审查意见修改代码:
审查意见:{json.dumps(review['issues'], ensure_ascii=False)}
建议:{json.dumps(review.get('suggestions', []), ensure_ascii=False)}
原代码:
{code[:8000]}
"""
code = await self.coder.think(fix_prompt)
print(f" 修复后代码行数: {len(code.splitlines())}")
return {
"requirement": requirement,
"design": design,
"code": code,
"review": review,
"review_rounds": self.max_review_rounds,
"status": "max_rounds_exceeded"
}
# 使用示例
async def main():
from anthropic import AsyncAnthropic
llm = AsyncAnthropic() # 需要设置 ANTHROPIC_API_KEY
assistant = CodeAssistantOrchestrator(llm)
result = await assistant.run("实现一个用户注册和登录的 REST API,支持 JWT 认证")
if result["status"] == "approved":
print(f"\n🎉 最终代码已通过审查!")
# 保存代码到文件
with open("output/generated_code.txt", "w") as f:
f.write(result["code"])
if __name__ == "__main__":
asyncio.run(main())
7. 生产环境考量
7.1 安全
| 层面 | 措施 | 实现 |
|---|---|---|
| 工具权限 | 分级(safe/write/dangerous) | ToolRegistry |
| 执行沙箱 | Docker/WebAssembly 隔离 | docker run --read-only --network=none agent-executor |
| Prompt 注入防护 | 输入清洗 + 输出验证 | 正则过滤 + 结构校验 |
| 审批流 | 敏感操作人审 | 危险工具调用前暂停等待人工确认 |
| 密钥管理 | 不在 Prompt 中传 API Key | 使用环境变量 + Secrets Manager |
7.2 成本控制
class CostTracker:
def __init__(self, budget: float):
self.budget = budget
self.spent = 0.0
# 模型价格(USD / 1M tokens)
self.pricing = {
"claude-sonnet-4-6": {"input": 3.0, "output": 15.0},
"claude-haiku-4-5": {"input": 0.8, "output": 4.0},
"deepseek-v4-pro": {"input": 0.55, "output": 2.19},
}
def track(self, model: str, input_tokens: int, output_tokens: int):
price = self.pricing.get(model, {"input": 1.0, "output": 5.0})
cost = (input_tokens / 1e6 * price["input"] +
output_tokens / 1e6 * price["output"])
self.spent += cost
if self.spent > self.budget * 0.8:
print(f"⚠️ 预算使用已达 80% (${self.spent:.2f}/${self.budget:.2f})")
if self.spent >= self.budget:
raise BudgetExceededError(f"预算耗尽: ${self.spent:.2f}")
def suggest_model(self, task_complexity: str) -> str:
"""根据任务复杂度和预算情况推荐模型"""
if self.spent > self.budget * 0.7:
return "claude-haiku-4-5" # 预算紧张,用便宜模型
if task_complexity == "simple":
return "claude-haiku-4-5"
if task_complexity == "complex":
return "claude-sonnet-4-6"
return "claude-haiku-4-5"
分层模型策略:路由 Agent 用 Haiku(便宜快速),核心推理用 Sonnet(推理能力强),简单文本处理用 DeepSeek(极低成本)。
7.3 监控与可观测性
import time
from contextlib import contextmanager
class AgentMetrics:
def __init__(self):
self.tool_calls: list[dict] = []
self.llm_calls: list[dict] = []
self.task_results: list[dict] = []
@contextmanager
def track_tool_call(self, tool_name: str):
start = time.time()
try:
yield
self.tool_calls.append({
"tool": tool_name,
"duration_ms": (time.time() - start) * 1000,
"status": "success"
})
except Exception as e:
self.tool_calls.append({
"tool": tool_name,
"duration_ms": (time.time() - start) * 1000,
"status": "error",
"error": str(e)
})
def report(self) -> dict:
return {
"total_llm_calls": len(self.llm_calls),
"total_tool_calls": len(self.tool_calls),
"tool_success_rate": sum(1 for t in self.tool_calls if t["status"] == "success") / max(len(self.tool_calls), 1),
"avg_tool_duration_ms": sum(t["duration_ms"] for t in self.tool_calls) / max(len(self.tool_calls), 1),
}
7.4 可靠性模式
class ReliableAgent:
def __init__(self, agent, max_retries: int = 3):
self.agent = agent
self.max_retries = max_retries
async def run(self, task: str) -> str:
for attempt in range(self.max_retries):
try:
# 设置超时
result = await asyncio.wait_for(
self.agent.run(task),
timeout=300 # 5 分钟
)
return result
except asyncio.TimeoutError:
print(f"⚠️ 超时 (attempt {attempt+1}),使用更简单的策略重试...")
task = f"{task}\n\n注意:上次执行超时,请用更简单的方案完成。"
except Exception as e:
print(f"❌ 错误 (attempt {attempt+1}): {e}")
if attempt == self.max_retries - 1:
return f"任务失败(已重试 {self.max_retries} 次): {e}"
await asyncio.sleep(2 ** attempt)
8. 未来趋势
| 趋势 | 时间窗口 | 影响 |
|---|---|---|
| MCP + A2A 深度融合 | 2026 H2 | 一个 Agent 既能用 MCP 调工具,又能通过 A2A 协作 |
| Agent UI(AUI) | 2026-2027 | 从 Chat UI 进化为 Agent 专属界面 |
| 安全沙箱标准化 | 2027 H1 | Docker/WebAssembly 成为 Agent 运行标准 |
| Agent 市场 | 2027 | 类似 App Store 的 Agent 发现和付费生态 |
| 自主 Agent 经济 | 2027+ | Agent 自主完成完整业务闭环 |
> 写在最后:Agent 架构不是什么魔法——它是 LLM、工具调用、记忆系统和多 Agent 协作的组合工程。本文提供的代码骨架足以搭建一个生产可用的 Agent 系统。最重要的不是选哪个框架,而是想清楚:Agent 在你的产品中解决什么问题?它的失败模式是什么?你怎么优雅地降级? > > 回答好这三个问题,你就是 Agent 时代的合格架构师。
版权声明
本文原创发布于 zhiqu.ac,未经书面许可禁止全文转载、采集、商用;转载必须完整标注原文链接、作者。
0 条评论