Skip to content

DCChains 动态编排链

DCChains 通过 LLM 生成 tf_plan 脚本来动态组合多个 Chain/Chains,适合处理包含分支、循环或重试逻辑的复杂任务。与 SeqChains 的固定顺序、ParallelChains 的并发执行相比,DCChains 把编排权交给可插拔的 orchestrate_chain 逻辑,使链路在运行时按需求重构。

核心特性

  • 定义位置tfrobot.brain.chain.chain_structures.dc_chains.DCChains
  • 继承关系:继承 Chains,保持 run / async_run 接口
  • 动态规划orchestrate_chain 返回 tf_plan 脚本,由 TFChainInterpreter 解析并执行
  • 结果聚合:所有子链的 ChainResult 累积到单一返回值,保持上下文与 Token 统计

主要配置

参数 类型 说明 默认值
name ClassVar[str] 链类型标识 "DCChains"
description ClassVar[str] 描述信息 动态编排 Chains 的链结构
abilities_purpose Optional[str] 提供给 LLM 的能力说明 “当前的思维链是DCChains...”
orchestrate_chain Callable[[ChainContext, DCChains, list[str], list[str]], str] 负责编排的回调,返回包裹 tf_plan 的字符串 default_orchestrate_chain
max_plan_times int 规划失败后最多重试次数 3

orchestrate_chain 回调

  1. 入参包含:当前链上下文、DCChains 实例、历史失败的 tf_plan 列表与错误信息。实现可以基于这些信息决定是否重新规划或降级策略。
  2. 返回值必须是形如 tf_plan\n...\n 的字符串,运行前 DCChains 会去除外层标记并交给 Lark + TFASTTransformer 解析。
  3. 默认实现 default_orchestrate_chain 使用 DeepSeek 模型生成脚本,自动把可用子链的输入/输出 schema 注入到模板中。

运行流程

  1. 上下文构造
  2. _run / _async_run 首先把传入的 current_inputconversationknowledgetoolsintermediate_msgs 封装到 ChainContext
  3. 初始化空的 ChainResult,用于累计执行结果与 Token 统计。

  4. 规划与重试

  5. 最多 max_plan_times 次循环调用 orchestrate_chain 获取最新脚本。
  6. 每次失败都会把原脚本与失败原因写入 failed_plans / failed_infos,供下一轮规划参考。

  7. 脚本解析执行

  8. 使用 Lark(PYTHON_GRAMMAR) 解析脚本,TFASTTransformer 转换为 AST。
  9. TFChainInterpreter 负责遍历 AST:根据脚本调用 chains_map 内注册的子链、执行工具、记录日志与 Token 使用。

  10. 结果合并

  11. interpreter.evaluate(...) 返回 ConsoleResult,其中包含此次执行的 ChainResult
  12. DCChains 调用 chain_res += console_res,触发 ChainResult.__iadd__

    • 合并 contentorigincurrent_generate 等字段;
    • 将所有子链的 usage 通过 merge_usages 汇总;
    • 把生成的 intermediate_msgstool_returns 附加到共享上下文。
  13. 成功返回或最终失败

  14. 任意一次执行成功即返回累计后的 ChainResult
  15. 超过 max_plan_times 仍失败则抛出 ValueError,并保留失败脚本供排查。

_CHAINS_INTERMEDIATE_RESULTS 的缓存传递

  1. 共享 current_input
  2. TFChainInterpreter 初始化时持有 current_input.additional_kwargs 引用。脚本中的子链若开启 catch_intermediate_chain_result,会在各自成功状态下把结构化数据写入 current_input.additional_kwargs["__chains_intermediate_results"]
  3. 因为所有子链共用同一个 current_input 对象,缓存会逐步累加。对于字符串输出的链,可配合 result_cache_key 指定唯一键名。

  4. 脚本内读取

  5. 规划脚本可在任意步骤调用其它子链前,先读取 _CHAINS_INTERMEDIATE_RESULTS(例如通过 DSL 中的工具或直接访问 additional_info)以复用结果,避免重复工具调用。

  6. 失败回滚策略

  7. 如果子链抛出异常导致本轮脚本失败,已写入 _CHAINS_INTERMEDIATE_RESULTS 的内容不会回滚;下一次重新规划时仍然可用,有助于构建渐进式求解流程。

usage 统计与合并

  1. Interpreter 层聚合
  2. TFChainInterpreter 在每次子链运行后把 ChainResult.usage 存入内部列表,并在 evaluate/aevaluate 收尾阶段调用 merge_usages 汇总 Token 消耗。

  3. DCChains 层累积

  4. DCChains 在 chain_res += console_res 时会将 interpreter 返回的 token_usage 继续累加到顶层 ChainResult.usage。因此调用方获取到的 result.usage 已经包含本轮脚本中所有子链与 LLM 调用的整体预算。

  5. 持续失败的记录

  6. 即便脚本失败并被加入 failed_plans,当轮执行的 Token 消耗也已被记录,方便后续分析问题或优化规划逻辑。

使用建议

  1. 脚本安全性:限制 orchestrate_chain 输出的 DSL 仅包含允许的函数/链名称,避免执行未授权的行为。
  2. 缓存命名约定:若多条链写入 _CHAINS_INTERMEDIATE_RESULTS,请为 result_cache_key 设计清晰的命名模式,防止键名冲突。
  3. 监控 usage:复杂脚本可能在规划失败后持续重试,建议结合 max_plan_times 与 Token 用量监控,必要时引入提前中断策略。
  4. 复用工具集:脚本可调用 tools列表中的工具。对于 DSL 中需要重复使用的能力,建议包裹为 Chain,再由 DCChains 统一调度,便于结果缓存与统计。
  5. 调试日志TFChainInterpreter 会把执行过程写入 logs,在 ChainResult 中可查看完整轨迹,对排错非常重要。

异步执行说明

  • _async_run 与同步版本逻辑一致,差异仅在使用 interpreter.aevaluate 与异步链的 async_run
  • 同样支持 _CHAINS_INTERMEDIATE_RESULTSusage 聚合,适用于需要并发调用子链或工具的场景。