Skip to content

ParallelChain 并行思维链

ParallelChain(ParallelChains 类)用于同时调度多个 Chain/Chains 实例并行运行,将各自的输出合并成单一结果。相比串行链,ParallelChain 更适合“同题多解”“多角色协作”或需要并行探索不同方案的场景。

类概览

  • 定义位置tfrobot.brain.chain.chain_structures.parallel_chains.ParallelChains
  • 继承关系:继承自 Chains,提供 run / async_run 接口
  • 核心能力:并行执行子链、自动合并 ChainResult
  • 输出规范:通过 merge_schemas 合并所有子链的 output_json_schema

配置参数

参数 类型 说明 默认值
name ClassVar[str] 类名常量,用于注册链类型 "ParallelChain"
description ClassVar[str] 类描述常量,概括并行执行特性 ParallelChain allow multichain work together...
abilities_purpose Optional[str] 对并行链目标的说明,可展示给 LLM 作为上下文 “当前的思维链是并行链...”
chains list[Chain \| Chains] 需要并行执行的子链列表,可继续嵌套结构化链 []

参数细化说明

  1. chains 列表
  2. 所有子链会同步触发;同步版本顺序遍历,异步版本使用 asyncio.gather
  3. 传入的 current_inputconversationelementsknowledgetools 在各子链之间共享,方便统一上下文管理。

  4. abilities_purpose

  5. 默认描述强调“并行执行并合并结果”。实际使用时建议结合业务语境重写,使大模型能理解每个并行节点的职责。

运行流程

  1. 上下文准备
  2. 创建 ChainContext,把当前输入、会话、知识、工具等打包传入。
  3. 初始化空的 ChainResult,用于累积所有子链输出、使用量等信息。

  4. 触发子链

  5. 同步版在循环中执行 chain.run(...);异步版构造协程列表后用 asyncio.gather 并行拉起。
  6. 为避免并发修改 intermediate_msgs,传入子链的 intermediate_msgscopy.deepcopy 一份,仅用于该链内部使用。

  7. 结果汇总 (|=)

  8. 使用 chain_res |= child_result 调用 ChainResult.__ior__ 合并结果。此操作会同时更新:

    • origin:记录子链原始输出,多个结果会被并集化;
    • content:保持结构化结果的合并(按 ChainResult 内的实现);
    • usage:通过 merge_usages 统计各子链 Token 消耗;
    • intermediate_msgs / tool_results:在不相互污染的前提下合并保存。
  9. 异常处理

  10. 子链执行失败不会中断整体流程,而是记录 warnings.warn,继续合并其它子链结果。
  11. 调用方可通过 ChainResult.warnings 或日志定位失败链路。

Chain 运算结果的缓存与传递

  1. 当前输入级缓存
  2. 所有子链共享同一个 current_input 对象,因此若子链开启 catch_intermediate_chain_result,则会把结构化输出写入 current_input.additional_kwargs[_CHAINS_INTERMEDIATE_RESULTS]
  3. 因为 intermediate_msgs 在并行阶段是深拷贝,子链间无法通过消息列表互通结果;跨链数据主要依赖 _CHAINS_INTERMEDIATE_RESULTS 或自定义的 result_cache_key

  4. 合并后的 ChainResult

  5. ChainResult.__ior__ 会把各子链最新的 current_generatetool_returns 等运行产物统一挂载,调用方可在最终结果中查看每个子链的产出。
  6. 若需要区分不同子链的缓存,可在各链 result_cache_key 中使用唯一键名,然后在后处理阶段读取 _CHAINS_INTERMEDIATE_RESULTS 中的对应字段。

  7. Token 使用统计

  8. 累积的 usage 字段会体现每个并行分支合计的 Token 消耗,便于在上层做预算或限流控制。

使用建议

  1. 并行探索方案:适合“多专家投票”“多种提示词尝试”类场景,再由外部逻辑读取合并后的结构化结果进行仲裁。
  2. 控制上下文污染:如需在并行链之间共享工具输出,应在链内部把必要数据写入 _CHAINS_INTERMEDIATE_RESULTS,避免直接修改 intermediate_msgs 带来的竞争风险。
  3. 谨慎处理异常:虽然异常被捕获,但建议在测试环境中监控 warnings 并为关键链设置 validate_result,确保失败时有补救措施。
  4. Schema 合并:当各子链输出 JSON 时,确保字段之间不存在冲突;merge_schemas(strict=True) 会在冲突时抛错,应提前设计好输出结构。

异步支持

  • ParallelChains._async_run 通过 asyncio.gather 并发执行所有子链的 async_run
  • 异常以 return_exceptions=True 的方式收集,不会阻断其它协程;最终仍按同步逻辑合并结果。

与基础 Chain 的关系

  • 上下文共享:ParallelChain 与基础 Chain 共用 ChainContext,因而可以直接复用现有链实例。
  • 结果拼接:相比单独运行多个链,ParallelChain 的 |= 操作提供了统一的结果聚合方式,使上层调用者只需处理一次 ChainResult
  • 配合 SeqChain:可以在串行链中嵌套并行链(或反之),实现“分支探索→汇总→下一阶段处理”的复杂流程。