本文所涉及的代码:https://github.com/LiuSandy/agent-playground/tree/tutorial-01
同一套 Agent 代码,跑在不同的任务上
上一篇文章我们用 LangGraph 搭了一个能计算、能搜索的 Agent。它的执行方式很简单:LLM 拿到用户输入 → 判断是否需要调用工具 → 需要就调、不需要就直接回答 → 调完工具拿到结果再判断 → 一直循环到能回答为止。代码量不多,跑起来也干净利落。
这套代码跑简单任务效果很好。问天气、算个数、搜个文档,一两轮就结束了。但如果换一个任务呢?
调研三个主流 Python Web 框架(Django、Flask、FastAPI),从性能、生态、学习成本三个维度对比,输出一份报告。
这时候问题来了。这个任务暗含多个子目标——先分别调研每个框架,再归纳出三个维度的对比,最后组织成报告格式——子目标之间有明确的结构依赖。而原来的 Agent 是走一步看一步的:它只看当前这一刻的状态,决定"下一步该做什么"。面对多子目标的复杂任务,这种策略很容易失控:
- 搜了 Django → 觉得信息不够 → 又搜一次 Django → 搜到一半想起 FastAPI 还没搜 → 跳过去搜 FastAPI → 忘了 Django 查到哪了
- 最后输出的可能只有第一个框架的零散信息,但 Token 已经烧了好几千
像一个下棋只看眼前一步的人——每一步也许都不算错,但十步之后可能偏离目标很远。
问题不在于 Agent 写得不好,而在于"走一步看一步"这个策略本身的视野太窄。 不同复杂度的任务,需要不同的决策模式来应对。Agent 的世界里不是只有一种做法,而是三种——这正是 Agent 架构设计中最核心的一组选择:
Reactive — 走一步看一步,快速反应,适合简单任务
Deliberative — 先整体规划再逐步执行,适合明确的多步复杂任务
Hybrid — 先判断任务复杂度,再选择 Reactive 或 Deliberative,适合入口任务复杂度不稳定的场景
三种模式分别对应不同的任务复杂度,这篇文章就逐一拆解它们的原理、LangGraph 实现和适用场景,帮你在面对具体任务时做出正确的选择。
Reactive 模式
什么是 Reactive Agent?
Reactive Agent 的定义很简单:感知当前状态 → 推理下一步行动 → 执行 → 观察结果 → 再次感知,如此循环。整个过程没有全局规划,每一步的决策只依赖当前可用的信息。
类比一下,打乒乓球就是典型的 Reactive 行为——对手怎么打我就怎么回,我并不会提前预测三个回合之后球会在哪。球来了 → 判断轨迹 → 挥拍 → 看对手反应 → 下一轮。每一步都很快,反馈周期极短。
在 LangGraph 中,Reactive 模式对应的图结构就是我们上一篇文章构建的:
START → call_model → 条件判断 → call_tool → call_model → ... → END
这是一个简单的"推理-工具"循环,没有规划节点,没有反思步骤。LLM 就是那个边走边看的人。
ReAct 循环:Reactive 模式的核心
ReAct 是 Reactive 模式最经典的实现,全称是 Reasoning + Acting。它把 Agent 的每一步决策拆成三个子步骤:
- Thought(思考):LLM 分析当前需要什么信息,决定是否调用工具
- Action(行动):调用选定的工具,传入参数
- Observation(观察):接收工具返回的结果,作为下一轮思考的输入
用一个具体例子来看。用户问"北京今天天气怎么样,适合跑步吗?"
Step 1:
Thought: 用户想了解北京天气,我需要调用天气查询工具
Action: get_weather("北京")
Step 2:
Observation: 晴,25°C,PM2.5: 35
Thought: 天气晴朗,温度适宜,空气质量良好,很适合户外跑步
Step 3:
无需更多工具,最终回答:"北京今天晴,25°C,空气质量良好,非常适合跑步"
这个例子只用了两轮就结束了。关键点在于:Observation 是真实世界反馈——它不是 LLM 自己"脑补"的,而是工具真实返回的数据。每轮循环都在用外部数据纠正 LLM 可能产生的幻觉,这就是 ReAct 比纯生成更可靠的底层原因。
代码实现:把 ReAct 循环显式展示出来
在上一篇文章的代码基础上,我们把 ReAct 的三个步骤可视化出来。下面是完整的实现,文件结构沿用 agent-playground/ 项目:
# agent-playground/app/core/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
State 定义和上一篇文章完全一样,核心是 add_messages reducer 保证消息历史不会丢失。
# agent-playground/app/core/llm.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
load_dotenv()
def create_llm(tools=None):
llm = ChatOpenAI(
model=os.getenv("DEEPSEEK_MODEL", "deepseek-chat"),
base_url=os.getenv("DEEPSEEK_BASE_URL", "https://api.deepseek.com/v1"),
api_key=os.getenv("DEEPSEEK_API_KEY", ""),
temperature=0,
)
if tools:
llm = llm.bind_tools(tools)
return llm
这里我们把 LLM 初始化抽成了一个工厂函数 create_llm,后面所有三种模式都复用它。
# agent-playground/app/core/graph.py
from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import AgentState
def build_reactive_graph(llm, tools, max_steps=10):
def call_model(state: AgentState) -> dict:
response = llm.invoke(state["messages"])
step = state.get("step_count", 0) + 1
print(f"\n--- Step {step} ---")
if hasattr(response, "tool_calls") and response.tool_calls:
for tc in response.tool_calls:
print(f" Action: {tc['name']}({tc['args']})")
else:
print(f" Response: {response.content[:100]}...")
return {"messages": [response], "step_count": step}
def should_continue(state: AgentState) -> str:
last_message = state["messages"][-1]
if state.get("step_count", 0) >= max_steps:
return END
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "call_tool"
return END
def observe_result(state: AgentState) -> dict:
last_message = state["messages"][-1]
print(f" Observation: {last_message.content[:100]}")
return {}
workflow = StateGraph(AgentState)
workflow.add_node("call_model", call_model)
workflow.add_node("call_tool", ToolNode(tools))
workflow.add_node("observe", observe_result)
workflow.add_edge(START, "call_model")
workflow.add_conditional_edges("call_model", should_continue, {
"call_tool": "call_tool",
END: END,
})
workflow.add_edge("call_tool", "observe")
workflow.add_edge("observe", "call_model")
return workflow.compile()
这段代码在上一篇文章的基础上加了两个东西:
- 步骤计数:
step_count跟踪执行了多少步,配合max_steps参数自动截断,防止死循环 - 观察节点:
observe节点负责打印工具返回的结果,让 ReAct 的 Thought → Action → Observation 三步在终端里清晰可视
图结构变成了:

observe 是一个纯日志节点,不修改 State,只是为了在执行日志里把 Observation 单独标出来。
# agent-playground/app/core/agent.py
from langchain_core.messages import HumanMessage
from .llm import create_llm
from .graph import build_reactive_graph
from .tools import get_tools
def create_reactive_agent():
tools = get_tools()
llm = create_llm(tools)
return build_reactive_graph(llm, tools)
def run_agent(agent, user_input: str):
result = agent.invoke({"messages": [HumanMessage(content=user_input)]})
return result["messages"][-1].content
create_reactive_agent 是 Reactive 模式的便利入口,把工具加载、LLM 初始化和图构建三步收拢到一起。
跑一下看看效果:
# agent-playground/main.py
from app.core.agent import create_reactive_agent, run_agent
agent = create_reactive_agent()
answer = run_agent(agent, "北京今天天气怎么样?")
print(f"\n最终回答: {answer}")
终端输出的日志会是这样的:
--- Step 1 ---
Action: search_tool({'query': '北京今天天气'})
--- Step 2 ---
Observation: 北京今天晴,气温22°C ~ 32°C,空气质量良
--- Step 3 ---
Response: 北京今天晴,气温在22°C到32°C之间...
Thought → Action → Observation 三步清晰可见,这就是 ReAct 循环的完整面貌。
适用场景与局限性
适用场景:
- 开放域问答、信息检索、天气查询、翻译等单步或少量步数就能完成的任务
- 工具调用场景:API 调用、数据库查询、文件操作
- 特点:反馈周期短、每一步都能看到效果,适合"边走边看"型任务
局限性:
- 缺乏全局规划:走一步看一步,可能"见树不见林"。面对多子目标的复杂任务,没有整体策略就容易陷入局部搜索
- 容易迷失方向:多步任务中可能出现重复劳动——反复调用同一个工具查相同的内容,或者在不同子目标之间无意义地跳来跳去
- 不适合需要预先判断任务复杂度的场景:ReAct 对"这个任务大概需要几步能完成"没有任何概念,它只会一直循环直到 LLM 觉得可以回答了(或者被
max_steps截断)
简单来说:Reactive 模式是 Agent 的默认选项。大多数日常任务用它就够,但当你发现 Agent 在面对复杂任务时反复绕圈、Token 烧了很多却答不到点子上——说明该考虑 Deliberative 了。
Deliberative 模式
什么是 Deliberative(Plan-and-Execute)?
Deliberative Agent 的核心思想:先整体规划,再逐步执行,执行中可反思和重规划。
这个和 Reactive 最大的区别在于决策的"视野"。Reactive 只看下一步,Deliberative 先在脑子里过一遍全局。类比的话,下象棋就是典型的 Deliberative 行为——你不会只看眼前的这一步,而是想好几步之后的局面,走一步之后再根据对手的回应调整策略。
Deliberative 模式通常包含三个阶段:
- 规划阶段(Plan):LLM 分析用户需求,把大任务拆解成有序的步骤列表
- 执行阶段(Execute):按计划逐步执行每一步,每步可能有自己的工具调用
- 反思阶段(Replan):检查已执行步骤的结果,判断是否需要调整后续计划
流程示意:
User → Planner → [Step1, Step2, Step3, ...]
↓
Executor(逐步执行)
↓
Replanner → 结果 OK?→ 结束
↓ 结果不行
调整计划,重新执行
三个阶段的循环保证了 Agent 既有全局视野,又能在执行过程中根据实际情况灵活调整。
代码实现:规划、执行、反思三步图
Deliberative 模式的图结构比 Reactive 复杂不少——它有三个核心节点,加上条件路由在它们之间切换。
先定义新的 State。Deliberative Agent 除了消息历史,还需要存储执行计划:
# agent-playground/app/core/state.py
from typing import Annotated, TypedDict
from langchain_core.messages import BaseMessage
from langgraph.graph.message import add_messages
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
step_count: int
class DeliberativeState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
plan: list[str]
current_step: int
step_results: dict[int, str]
original_question: str
tool_rounds: int
plan 是一个字符串列表,每个元素是一个可执行步骤。current_step 记录当前执行到哪一步。step_results 存储每步的执行结果,供反思阶段参考。original_question 保存用户的原始问题,避免执行某个子步骤时丢掉全局上下文;tool_rounds 用来限制单个步骤内部最多调用几轮工具,防止某一步陷入循环。
Planner 节点负责生成初始计划:
# agent-playground/app/core/graph.py(追加到文件末尾)
import json
from langchain_core.messages import HumanMessage, SystemMessage, ToolMessage
from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import DeliberativeState
PLANNER_PROMPT = SystemMessage(content="""你是一个任务分解专家。请将用户的请求分解为可执行的步骤列表。
规则:
1. 每个步骤应该是一个独立、可执行的操作
2. 步骤之间应该有明确的先后顺序
3. 返回格式为 JSON 数组,每个元素是一个步骤描述字符串
4. 如果任务简单,可能只有 1-2 个步骤
示例:
用户:"调研 Django 和 Flask 的性能差异"
输出:["搜索 Django 性能数据", "搜索 Flask 性能数据", "对比两者的性能差异并输出结论"]
""")
EXECUTOR_PROMPT = SystemMessage(content="""你正在执行一个任务计划中的某一个步骤。
当前步骤是你唯一需要关注的任务,请使用可用的工具完成它。
完成后,简洁地输出执行结果。""")
REPLANNER_PROMPT = SystemMessage(content="""你是一个任务审核专家。请检查以下已完成步骤的结果,判断是否需要调整后续计划。
已完成的步骤及其结果:
{step_results}
剩余步骤:
{remaining_steps}
如果需要调整计划(比如某步骤的结果表明后续方向需要改变),请返回更新后的剩余步骤 JSON 数组。
如果不需要调整,请返回原来的剩余步骤。""")
def build_deliberative_graph(plan_llm, execute_llm, tools, max_tool_rounds_per_step=2):
llm_with_tools = execute_llm.bind_tools(tools)
def parse_plan(content: str) -> list[str]:
if "```" in content:
content = content.split("```")[1]
if content.startswith("json"):
content = content[4:]
try:
plan = json.loads(content)
if isinstance(plan, list):
return [str(step).strip() for step in plan if str(step).strip()]
except json.JSONDecodeError:
pass
return [
line.strip("- ").strip()
for line in content.strip().split("\n")
if line.strip()
]
def current_tool_exchange(messages):
for idx in range(len(messages) - 1, -1, -1):
message = messages[idx]
if hasattr(message, "tool_calls") and message.tool_calls:
return messages[idx:]
return []
def current_tool_observations(messages) -> list[str]:
return [
message.content
for message in current_tool_exchange(messages)
if isinstance(message, ToolMessage)
]
def planner(state: DeliberativeState) -> dict:
user_msg = state["messages"][-1]
response = plan_llm.invoke([PLANNER_PROMPT, user_msg])
plan = parse_plan(response.content)
if not plan:
plan = [user_msg.content]
print(f"\n[Plan] {' → '.join(plan)}")
return {
"plan": plan,
"current_step": 0,
"step_results": {},
"original_question": user_msg.content,
"tool_rounds": 0,
}
def executor(state: DeliberativeState) -> dict:
step_idx = state["current_step"]
step_desc = state["plan"][step_idx]
print(f"\n[Execute] Step {step_idx + 1}/{len(state['plan'])}: {step_desc}")
previous_results = "\n".join(
f"{state['plan'][idx]}: {result}"
for idx, result in sorted(state["step_results"].items())
) or "暂无"
task_msg = HumanMessage(content=f"请完成以下任务步骤:{step_desc}")
context_msg = HumanMessage(content=(
f"用户原始问题:{state.get('original_question', '')}\n\n"
f"已完成步骤结果:\n{previous_results}"
))
messages = [EXECUTOR_PROMPT, context_msg, task_msg]
if isinstance(state["messages"][-1], ToolMessage):
messages.extend(current_tool_exchange(state["messages"]))
response = llm_with_tools.invoke(messages)
if hasattr(response, "tool_calls") and response.tool_calls:
for tool_call in response.tool_calls:
print(f" Action: {tool_call['name']}({tool_call['args']})")
else:
print(f" Result: {response.content[:100]}")
return {"messages": [response]}
def observe_tool_result(state: DeliberativeState) -> dict:
tool_messages = [
message for message in state["messages"]
if isinstance(message, ToolMessage)
]
if tool_messages:
observations = [message.content for message in tool_messages[-2:]]
print(f" Observation: {' | '.join(observations)[:160]}")
return {"tool_rounds": state.get("tool_rounds", 0) + 1}
def record_step_result(state: DeliberativeState) -> dict:
step_idx = state["current_step"]
response = state["messages"][-1]
new_results = dict(state["step_results"])
if isinstance(response, ToolMessage):
observations = current_tool_observations(state["messages"])
content = "\n".join(observations) or response.content
print(f" Result: {content[:100]}")
else:
content = response.content
new_results[step_idx] = content
return {
"step_results": new_results,
"current_step": step_idx + 1,
"tool_rounds": 0,
}
def replanner(state: DeliberativeState) -> dict:
if state["current_step"] >= len(state["plan"]):
return {} # 全部完成,不需要重规划
remaining = state["plan"][state["current_step"]:]
results_str = "\n".join(
f"Step {k+1}: {v}" for k, v in state["step_results"].items()
)
prompt = HumanMessage(content=REPLANNER_PROMPT.content.format(
step_results=results_str,
remaining_steps="\n".join(remaining),
))
response = plan_llm.invoke([prompt])
new_plan = parse_plan(response.content)
if new_plan and new_plan != remaining:
print(f"\n[Replan] 调整计划: {' → '.join(new_plan)}")
updated_plan = (
state["plan"][:state["current_step"]] + new_plan
)
return {"plan": updated_plan}
return {}
def should_continue_after_planning(state: DeliberativeState) -> str:
if state["current_step"] >= len(state["plan"]):
return "summarize"
return "executor"
def should_continue_after_executor(state: DeliberativeState) -> str:
last_message = state["messages"][-1]
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "call_tool"
return "record_step_result"
def should_continue_after_observation(state: DeliberativeState) -> str:
if state.get("tool_rounds", 0) >= max_tool_rounds_per_step:
print(" Tool limit reached for this step; using current observations.")
return "record_step_result"
return "executor"
def should_continue_after_step(state: DeliberativeState) -> str:
if state["current_step"] >= len(state["plan"]):
return "summarize"
if state["current_step"] > 0 and state["current_step"] % 3 == 0:
return "replanner"
return "executor"
def summarize(state: DeliberativeState) -> dict:
results_str = "\n".join(
f"{state['plan'][k]}: {v}"
for k, v in sorted(state["step_results"].items())
)
summary_prompt = HumanMessage(content=(
"请根据以下执行结果,生成最终回答:\n\n" + results_str
))
response = plan_llm.invoke([summary_prompt])
print(f"\n[Final] {response.content[:100]}")
return {"messages": [response]}
workflow = StateGraph(DeliberativeState)
workflow.add_node("planner", planner)
workflow.add_node("executor", executor)
workflow.add_node("call_tool", ToolNode(tools))
workflow.add_node("observe_tool_result", observe_tool_result)
workflow.add_node("record_step_result", record_step_result)
workflow.add_node("replanner", replanner)
workflow.add_node("summarize", summarize)
workflow.add_edge(START, "planner")
workflow.add_conditional_edges("planner", should_continue_after_planning, {
"executor": "executor",
"summarize": "summarize",
})
workflow.add_conditional_edges("executor", should_continue_after_executor, {
"call_tool": "call_tool",
"record_step_result": "record_step_result",
})
workflow.add_edge("call_tool", "observe_tool_result")
workflow.add_conditional_edges("observe_tool_result", should_continue_after_observation, {
"executor": "executor",
"record_step_result": "record_step_result",
})
workflow.add_conditional_edges("record_step_result", should_continue_after_step, {
"executor": "executor",
"replanner": "replanner",
"summarize": "summarize",
})
workflow.add_edge("replanner", "executor")
workflow.add_edge("summarize", END)
return workflow.compile()
这段代码虽然长,但结构很清晰:
- Planner:用一个专门的 Prompt 让 LLM 拆解任务,输出 JSON 格式的步骤列表
- Executor:每次拿一个步骤描述,让绑定了工具的 LLM 判断是否需要调用工具
- ToolNode + Observation:如果 Executor 返回了
tool_calls,ToolNode负责真正执行工具,observe_tool_result负责打印观察结果 - Record Step Result:当当前步骤不再需要工具,或工具轮数达到上限时,把这一小步的结果写入
step_results - Replanner:每执行完 3 个计划步骤触发一次,把已完成步骤的结果给 LLM 审查,如果发现需要调整方向就更新后续计划
- Summarize:全部步骤执行完后,把所有结果汇总成最终答案
这里有一个容易混淆的点:execute_llm.bind_tools(tools) 只是让模型知道"有哪些工具可以调用",并允许它返回 tool_calls;真正执行工具的是 ToolNode(tools)。所以 Deliberative 模式不是简单地"规划后每步调一次 LLM",而是外层按计划推进,内层每个步骤仍然允许一小段工具调用循环。
图结构可以理解成:

和 Reactive 模式相比,关键区别在于多了两条"管理线":Planner 在开始时生成策略,Replanner 在中途调整策略。Executor 内部仍然保留工具调用能力,但它服务于"当前计划步骤",而不是无目标地一路往前走。
用起来也很简单,两个 LLM 分别负责规划和执行:
# agent-playground/app/core/agent.py(追加)
from .graph import build_deliberative_graph
def create_deliberative_agent():
tools = get_tools()
plan_llm = create_llm() # 负责规划,不需要工具
execute_llm = create_llm() # 负责执行,工具在 graph 内部绑定
return build_deliberative_graph(plan_llm, execute_llm, tools)
plan_llm 不需要绑工具——Planner 的工作是拆解任务,不是执行工具。execute_llm 需要工具能力,但绑定动作放在 build_deliberative_graph 内部完成:llm_with_tools = execute_llm.bind_tools(tools)。这样 graph 本身同时持有"会生成 tool_calls 的 LLM"和"真正执行工具的 ToolNode",职责更完整。
调用入口:
# agent-playground/main.py
from app.core.agent import create_deliberative_agent, run_agent
agent = create_deliberative_agent()
answer = run_agent(agent, "调研 Django 和 Flask 的性能差异")
print(f"\n最终回答: {answer}")
运行时会先看到 Planner 生成的步骤列表,然后 Executor 逐步执行。每个步骤内部,如果 LLM 判断需要外部信息,就会进入 call_tool → observe_tool_result → executor 的小循环;如果不再需要工具,就进入 record_step_result,推进到下一个计划步骤。每 3 个计划步骤触发一次 Replanner 检查方向,最后 Summarize 汇总输出。
适用场景与缺点
适用场景:
- 复杂多步任务:代码生成 → 运行测试 → 根据报错修复,这种有明确步骤依赖的任务
- 需要全局视野的任务:研究报告撰写、技术方案设计、竞品分析
- 任务依赖关系复杂:步骤之间有先后顺序,前一步的结果直接影响后一步怎么做
缺点:
- 规划本身有成本:Planner 节点会额外消耗 Token 和时间。对于简单任务(比如查个天气),先规划三步再执行是纯浪费
- 对简单任务"小题大做":用户的输入是"1+1 等于几",结果 Planner 拆成了"搜索加法定义 → 调用计算器验证 → 输出结果",三行代码能搞定的事花了 20 秒和 500 Token
- 计划可能本身就错了:如果 Planner 一开始就理解偏了,接下来的执行就是在错误的方向上努力,Replanner 也不一定能纠正回来
一句话总结:Deliberative 模式适合你明确知道这个任务很复杂的场景。如果任务的复杂度未知,还需要更灵活的模式——这就是 Hybrid 要解决的问题。
Hybrid 模式
什么是 Hybrid?
Hybrid 模式把 Reactive 的速度和 Deliberative 的全局规划能力放到同一张图里。它的核心思路不是"每个子任务都嵌套一个 Reactive Agent",而是:先判断任务复杂度,再选择合适的执行路径。
工作流程:
- Router 先判断任务复杂度:简单问题走 Reactive,复杂任务走 Deliberative
- 简单任务直接响应:比如查天气、算表达式、问一个事实,不需要先规划
- 复杂任务再进入规划流程:比如调研、对比、分析、写方案,交给 Planner 拆步骤,再按 Deliberative 流程执行
类比一下,这像一个客服系统的分诊台。用户只是问"订单什么时候发货?",直接查一下就回答;如果用户说"帮我分析这批订单为什么延迟,并给一个优化方案",才需要进入规划、执行、复盘的复杂流程。Hybrid 的价值就在这里:简单任务不小题大做,复杂任务不硬靠 Reactive 瞎跑。
代码实现要点
Hybrid 模式的核心是一个路由结构:入口先经过 router,然后根据 mode 进入 Reactive 分支或 Deliberative 分支。下面的代码沿用前面 Deliberative 部分抽出来的节点构造函数:_create_deliberative_nodes、_add_deliberative_nodes 和 _add_deliberative_edges。
# agent-playground/app/core/graph.py(追加,Hybrid 模式)
from langgraph.constants import END
from langgraph.graph import START, StateGraph
from langgraph.prebuilt import ToolNode
from .state import HybridState
def build_hybrid_graph(
plan_llm,
execute_llm,
tools,
max_steps=10,
max_tool_rounds_per_step=2,
):
reactive_llm_with_tools = execute_llm.bind_tools(tools)
deliberative_nodes = _create_deliberative_nodes(
plan_llm,
execute_llm.bind_tools(tools),
max_tool_rounds_per_step,
)
def router(state: HybridState) -> dict:
user_msg = state["messages"][-1]
content = user_msg.content.lower()
complex_markers = [
"调研", "研究", "分析", "比较", "对比", "规划", "计划",
"总结", "多步", "步骤", "方案", "benchmark", "performance",
"compare", "research", "analyze",
]
is_complex = len(user_msg.content) > 40 or any(
marker in content for marker in complex_markers
)
mode = "deliberative" if is_complex else "reactive"
print(f"\n[Hybrid] Route: {mode}")
return {"mode": mode, "original_question": user_msg.content}
def choose_mode(state: HybridState) -> str:
return state.get("mode", "reactive")
def reactive_call_model(state: HybridState) -> dict:
response = reactive_llm_with_tools.invoke(
[SYSTEM_PROMPT] + state["messages"]
)
step = state.get("step_count", 0) + 1
print(f"\n[Reactive] Step {step}")
if hasattr(response, "tool_calls") and response.tool_calls:
for tool_call in response.tool_calls:
print(f" Action: {tool_call['name']}({tool_call['args']})")
else:
print(f" Response: {response.content[:100]}...")
return {"messages": [response], "step_count": step}
def reactive_observe(state: HybridState) -> dict:
last_message = state["messages"][-1]
print(f" Observation: {last_message.content[:100]}")
return {}
def should_continue_reactive(state: HybridState) -> str:
last_message = state["messages"][-1]
if state.get("step_count", 0) >= max_steps:
return END
if hasattr(last_message, "tool_calls") and last_message.tool_calls:
return "reactive_call_tool"
return END
workflow = StateGraph(HybridState)
workflow.add_node("router", router)
workflow.add_node("reactive_call_model", reactive_call_model)
workflow.add_node("reactive_call_tool", ToolNode(tools))
workflow.add_node("reactive_observe", reactive_observe)
_add_deliberative_nodes(workflow, deliberative_nodes, tools)
workflow.add_edge(START, "router")
workflow.add_conditional_edges("router", choose_mode, {
"reactive": "reactive_call_model",
"deliberative": "planner",
})
workflow.add_conditional_edges("reactive_call_model", should_continue_reactive, {
"reactive_call_tool": "reactive_call_tool",
END: END,
})
workflow.add_edge("reactive_call_tool", "reactive_observe")
workflow.add_edge("reactive_observe", "reactive_call_model")
_add_deliberative_edges(workflow, deliberative_nodes)
return workflow.compile()
这段实现里,Hybrid 不是一个新的执行算法,而是把前面两种模式放进同一张图,再加一个入口路由。
- Reactive 分支:
reactive_call_model → reactive_call_tool → reactive_observe → reactive_call_model,适合简单、短链路任务 - Deliberative 分支:复用前面 Deliberative 的
planner / executor / call_tool / observe_tool_result / record_step_result / replanner / summarize - Router:根据输入长度和关键词做一个轻量判断,比如"调研、分析、对比、方案、research、compare"这类任务会被路由到 Deliberative
流程图如下

适用场景
- 任务复杂度不稳定:同一个入口既会收到"查一下天气",也会收到"调研三个框架并输出报告"
- 想控制简单任务成本:简单任务不进入 Planner,直接走 Reactive,少一次规划开销
- 想保留复杂任务质量:复杂任务自动切到 Deliberative,先拆计划再执行,不靠 Reactive 硬撑
对比总结与决策框架
三种模式不是"哪个更好"的关系,而是"哪个更适合当前任务"。
| 维度 | Reactive | Deliberative | Hybrid |
|---|---|---|---|
| 规划阶段 | 无 | 有(详细步骤) | 简单任务无,复杂任务有 |
| 执行方式 | 单步决策 | 按计划逐步执行 | Router 选择 Reactive 或 Deliberative |
| 反馈回路 | 每一步 | 每执行一轮 | 取决于被路由到哪条分支 |
| 响应速度 | 快 | 慢 | 中 |
| Token 消耗 | 低 | 高 | 中 |
| 简单任务表现 | 好 | 浪费 | 好(走 Reactive) |
| 复杂任务表现 | 差 | 好 | 好 |
| 实现复杂度 | 低 | 高 | 中 |
决策框架——拿到一个任务时,按这个顺序判断:
- 任务是简单的信息查询或单步操作? → 用 Reactive。不要多想,直接上 ReAct 循环
- 任务明显是多步骤的、步骤之间有明确的先后依赖? → 用 Deliberative。先让 Planner 拆出计划,再按计划执行
- 任务复杂度不确定,入口既可能收到简单问题也可能收到复杂任务? → 用 Hybrid。先路由,简单任务走 Reactive,复杂任务走 Deliberative
- 如果你不确定该选哪个——从 Reactive 开始,发现 Agent 在绕圈、迷失方向时再升级到 Hybrid
最后一句话总结:Reactive 是默认选项,当 Reactive 搞不定时再考虑更复杂的模式。不要为了"架构看起来高级"而选 Deliberative 或 Hybrid——你的 Token 和时间都是成本。
和"四种 Agent 架构"那篇是什么关系?
如果你读过前面的《四种主流的 Agent 架构:你写的是哪一种》,可能会发现一个问题:那篇文章讲的是 ReAct、Plan-Execute、Reflexion 和 Multi-Agent 四种架构,而这篇文章讲的是 Reactive、Deliberative 和 Hybrid 三种模式。它们不是两套互相冲突的分类,而是从不同维度看同一件事。
那篇文章更像是 Agent 架构全景图,关注的是 Agent 的循环机制怎么被扩展:ReAct 是最基础的"思考-行动-观察"循环;Plan-Execute 在循环前加了规划;Reflexion 在一次完整尝试后加了复盘和记忆;Multi-Agent 则把任务拆给多个角色协作。
这篇文章关注的是 单个 Agent 面对任务时的决策方式:Reactive 对应 ReAct,强调边走边看;Deliberative 对应 Plan-Execute,强调先规划再执行;Hybrid 则是工程实践里常见的折中——入口先判断任务复杂度,简单任务走 Reactive,复杂任务走 Deliberative。
所以可以这样理解:ReAct 和 Plan-Execute 基本能分别映射到 Reactive 和 Deliberative;Hybrid 是把两者组合起来的一种任务执行策略;而 Reflexion 和 Multi-Agent 更像可叠加的增强机制。你可以让一个 Reactive Agent 带 Reflexion,也可以让一个 Multi-Agent 系统里的每个角色分别采用 Reactive、Deliberative 或 Hybrid。