ParallelChain 并行思维链¶
ParallelChain(ParallelChains 类)用于同时调度多个 Chain/Chains 实例并行运行,将各自的输出合并成单一结果。相比串行链,ParallelChain 更适合“同题多解”“多角色协作”或需要并行探索不同方案的场景。
类概览¶
- 定义位置:
tfrobot.brain.chain.chain_structures.parallel_chains.ParallelChains - 继承关系:继承自
Chains,提供run/async_run接口 - 核心能力:并行执行子链、自动合并
ChainResult - 输出规范:通过
merge_schemas合并所有子链的output_json_schema
配置参数¶
| 参数 | 类型 | 说明 | 默认值 |
|---|---|---|---|
name |
ClassVar[str] |
类名常量,用于注册链类型 | "ParallelChain" |
description |
ClassVar[str] |
类描述常量,概括并行执行特性 | ParallelChain allow multichain work together... |
abilities_purpose |
Optional[str] |
对并行链目标的说明,可展示给 LLM 作为上下文 | “当前的思维链是并行链...” |
chains |
list[Chain \| Chains] |
需要并行执行的子链列表,可继续嵌套结构化链 | [] |
参数细化说明¶
chains列表:- 所有子链会同步触发;同步版本顺序遍历,异步版本使用
asyncio.gather。 -
传入的
current_input、conversation、elements、knowledge、tools在各子链之间共享,方便统一上下文管理。 -
abilities_purpose: - 默认描述强调“并行执行并合并结果”。实际使用时建议结合业务语境重写,使大模型能理解每个并行节点的职责。
运行流程¶
- 上下文准备:
- 创建
ChainContext,把当前输入、会话、知识、工具等打包传入。 -
初始化空的
ChainResult,用于累积所有子链输出、使用量等信息。 -
触发子链:
- 同步版在循环中执行
chain.run(...);异步版构造协程列表后用asyncio.gather并行拉起。 -
为避免并发修改
intermediate_msgs,传入子链的intermediate_msgs会copy.deepcopy一份,仅用于该链内部使用。 -
结果汇总 (
|=): -
使用
chain_res |= child_result调用ChainResult.__ior__合并结果。此操作会同时更新:origin:记录子链原始输出,多个结果会被并集化;content:保持结构化结果的合并(按ChainResult内的实现);usage:通过merge_usages统计各子链 Token 消耗;intermediate_msgs/tool_results:在不相互污染的前提下合并保存。
-
异常处理:
- 子链执行失败不会中断整体流程,而是记录
warnings.warn,继续合并其它子链结果。 - 调用方可通过
ChainResult.warnings或日志定位失败链路。
Chain 运算结果的缓存与传递¶
- 当前输入级缓存:
- 所有子链共享同一个
current_input对象,因此若子链开启catch_intermediate_chain_result,则会把结构化输出写入current_input.additional_kwargs[_CHAINS_INTERMEDIATE_RESULTS]。 -
因为
intermediate_msgs在并行阶段是深拷贝,子链间无法通过消息列表互通结果;跨链数据主要依赖_CHAINS_INTERMEDIATE_RESULTS或自定义的result_cache_key。 -
合并后的 ChainResult:
ChainResult.__ior__会把各子链最新的current_generate、tool_returns等运行产物统一挂载,调用方可在最终结果中查看每个子链的产出。-
若需要区分不同子链的缓存,可在各链
result_cache_key中使用唯一键名,然后在后处理阶段读取_CHAINS_INTERMEDIATE_RESULTS中的对应字段。 -
Token 使用统计:
- 累积的
usage字段会体现每个并行分支合计的 Token 消耗,便于在上层做预算或限流控制。
使用建议¶
- 并行探索方案:适合“多专家投票”“多种提示词尝试”类场景,再由外部逻辑读取合并后的结构化结果进行仲裁。
- 控制上下文污染:如需在并行链之间共享工具输出,应在链内部把必要数据写入
_CHAINS_INTERMEDIATE_RESULTS,避免直接修改intermediate_msgs带来的竞争风险。 - 谨慎处理异常:虽然异常被捕获,但建议在测试环境中监控
warnings并为关键链设置validate_result,确保失败时有补救措施。 - Schema 合并:当各子链输出 JSON 时,确保字段之间不存在冲突;
merge_schemas(strict=True)会在冲突时抛错,应提前设计好输出结构。
异步支持¶
ParallelChains._async_run通过asyncio.gather并发执行所有子链的async_run。- 异常以
return_exceptions=True的方式收集,不会阻断其它协程;最终仍按同步逻辑合并结果。
与基础 Chain 的关系¶
- 上下文共享:ParallelChain 与基础
Chain共用ChainContext,因而可以直接复用现有链实例。 - 结果拼接:相比单独运行多个链,ParallelChain 的
|=操作提供了统一的结果聚合方式,使上层调用者只需处理一次ChainResult。 - 配合 SeqChain:可以在串行链中嵌套并行链(或反之),实现“分支探索→汇总→下一阶段处理”的复杂流程。