Skip to content

Chains 结构类文档

Chains Structure Class Documentation

Chains 主要设计用来表达对一组 Chain 的组合调用,可以串行、并行、树分叉、图循环等。它解决了多个 Chain 之间的调度、数据传递、上下文刷新和结果合并等问题。 The Chains class is mainly designed to express the combined invocation of a group of Chain instances, which can be serial, parallel, tree-branched, graph-looped, etc. It addresses issues such as scheduling, data transfer, context refreshing, and result merging among multiple Chain instances.

Bases: TFNeuralModel, ABC

Chains主要设计用来表达对一组Chain的组合调用,可以串行、并行、树分叉、图循环等

Chains is mainly designed to express the combination of a group of Chains, which can be called in series, parallel, tree branching, graph looping, etc.

Chains主要解决的问题如下:

  1. Chains内不同Chain的调用顺序或者调度规则问题,比如是串行调度还是并行调度或者有专门的dispatch规则
  2. Chains内不同Chain之间的数据传递问题,比如上一个Chain的输出作为下一个Chain的输入,Additional_kwargs是否合并等
  3. 不同Chain调度过程中,elements/knowledge/tools的刷新问题,是否需要在每个Chain调度前刷新,以什么规则刷新
  4. Chain最终生成的ChainResult的合并问题,比如是直接合并还是有特殊的合并规则

以上4点问题,不同的选择或者方案会有不同的Chains实现,这里的Chains主要是为了解决这些问题,提供一个通用的Chains实现

Attributes:

Name Type Description
chains SerializeAsAny[list[Union[Chain, Chains]]]

链列表,会串行调用

tags property

tags: list[str]

获取当前Chains的tags,会从所有子Chain中提取tags,合并后去重

Returns:

Type Description
list[str]

list[str]: 当前Chains的tags

additional_kwargs_schema property

additional_kwargs_schema: Optional[JsonSchemaValue]

通过此属性获取可以供此LLM正常运行的CurrentInput.AdditionalKwargs的JsonSchema要求。必须满足此要求,才可以正常渲染相关Prompt, 否则会触发异常。但需要注意,异常不一定会被抛出,为了最大化保证用户体验,Prompt渲染异常会被跳过。但仍然需要通过此字段的能力,来提醒用户 或者前置系统,按此要求来准备相关元数据。

Returns:

Type Description
Optional[JsonSchemaValue]

Optional[JsonSchemaValue]: 所有Prompt的additional_kwargs的JsonSchema | The JsonSchema of all prompts' additional_kwargs

output_json_schema abstractmethod property

output_json_schema: JsonSchemaValue

返回当前Chain的输出JSON Schema

Returns:

Name Type Description
JsonSchemaValue JsonSchemaValue

当前Chain的输出JSON Schema

run

run(current_input: UserAndAssMsg, conversation: Optional[list[BaseMessage]] = None, elements: Optional[list[DocElement]] = None, knowledge: Optional[str] = None, tools: Optional[list[BaseTool]] = None, intermediate_msgs: Optional[list[BaseMessage]] = None) -> ChainResult

Complete chat | 生成聊天结果

  1. 如果当前Chains配置了validate_result函数,则会进入self-ask模式,self-ask模式会有自迭代次数限制,如果超过最大迭代次数,则会返回当前结果
  2. 如果当前Chains未配置validate_result函数,则只进行单次调用,不会进行self-ask

Parameters:

Name Type Description Default
current_input BaseMessage

current input | 当前输入

required
conversation Optional[list[BaseMessage]]

Conversation history | 会话历史

None
elements Optional[list[DocElement]]

Document elements | 文档元素

None
knowledge Optional[str]

Related knowledge | 相关知识

None
tools Optional[list[BaseTool]]

Available tool list | 可用的工具列表

None
intermediate_msgs Optional[list[BaseMessage]]

Intermediate messages, it will be generated during llm work in chain, list tool's response or other system message, those messages will not save into memory, but usefully during chain work | 中间消息,它将在链式工作中生成,列表工具的响应或其他系统消息,这些消息不会保存到记忆体中, 但在链式工作中非常有用

None

Returns:

Name Type Description
ChainResult ChainResult

The result of completion | 完成的结果

Source code in tfrobot/brain/chain/chain_structures/base.py
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
def run(
    self,
    current_input: UserAndAssMsg,
    conversation: Optional[list[BaseMessage]] = None,
    elements: Optional[list[DocElement]] = None,
    knowledge: Optional[str] = None,
    tools: Optional[list[BaseTool]] = None,
    intermediate_msgs: Optional[list[BaseMessage]] = None,
) -> ChainResult:
    """
    Complete chat | 生成聊天结果

    1. 如果当前Chains配置了validate_result函数,则会进入self-ask模式,self-ask模式会有自迭代次数限制,如果超过最大迭代次数,则会返回当前结果
    2. 如果当前Chains未配置validate_result函数,则只进行单次调用,不会进行self-ask

    Args:
        current_input (BaseMessage): current input | 当前输入
        conversation (Optional[list[BaseMessage]]): Conversation history | 会话历史
        elements (Optional[list[DocElement]]): Document elements | 文档元素
        knowledge (Optional[str]): Related knowledge | 相关知识
        tools (Optional[list[BaseTool]]): Available tool list | 可用的工具列表
        intermediate_msgs (Optional[list[BaseMessage]]): Intermediate messages, it will be generated during llm work
            in chain, list tool's response or other system message, those messages will not save into memory, but
            usefully during chain work | 中间消息,它将在链式工作中生成,列表工具的响应或其他系统消息,这些消息不会保存到记忆体中,
            但在链式工作中非常有用

    Returns:
        ChainResult: The result of completion | 完成的结果
    """
    if not self.validate_result:
        return self._run(
            current_input=current_input,
            conversation=conversation,
            elements=elements,
            knowledge=knowledge,
            tools=tools,
            intermediate_msgs=intermediate_msgs,
        )
    else:
        chain_res = self._run(
            current_input=current_input,
            conversation=conversation,
            elements=elements,
            knowledge=knowledge,
            tools=tools,
            intermediate_msgs=intermediate_msgs,
        )
        for _ in range(self.max_iter):
            is_valid, ask = self.validate_result(chain_res)
            if is_valid:
                break
            if ask:
                if intermediate_msgs is None:
                    intermediate_msgs = []  # pragma: no cover
                intermediate_msgs.append(ask)
            chain_res += self._run(
                current_input=current_input,
                conversation=conversation,
                elements=elements,
                knowledge=knowledge,
                tools=tools,
                intermediate_msgs=intermediate_msgs,
            )
        return chain_res

async_run async

async_run(current_input: UserAndAssMsg, conversation: Optional[list[BaseMessage]] = None, elements: Optional[list[DocElement]] = None, knowledge: Optional[str] = None, tools: Optional[list[BaseTool]] = None, intermediate_msgs: Optional[list[BaseMessage]] = None) -> ChainResult

提供与BaseChain相同的async_run方法,用于调用Chains | Provide the same async_run method as BaseChain for calling Chains

Complete chat async | 异步生成聊天结果

Parameters:

Name Type Description Default
current_input BaseMessage

current input | 当前输入

required
conversation Optional[list[BaseMessage]]

Conversation history | 会话历史

None
elements Optional[list[DocElement]]

Document elements | 文档元素

None
knowledge Optional[str]

Related knowledge | 相关知识

None
tools Optional[list[BaseTool]]

Available tool list | 可用的工具列表

None
intermediate_msgs Optional[list[BaseMessage]]

Intermediate messages, it will be generated during llm work in chain, list tool's response or other system message, those messages will not save into memory, but usefully during chain work | 中间消息,它将在链式工作中生成,列表工具的响应或其他系统消息,这些消息不会保存到记忆体中, 但在链式工作中非常有用

None

Returns:

Name Type Description
ChainResult ChainResult

The result of completion | 完成的结果

Source code in tfrobot/brain/chain/chain_structures/base.py
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
async def async_run(
    self,
    current_input: UserAndAssMsg,
    conversation: Optional[list[BaseMessage]] = None,
    elements: Optional[list[DocElement]] = None,
    knowledge: Optional[str] = None,
    tools: Optional[list[BaseTool]] = None,
    intermediate_msgs: Optional[list[BaseMessage]] = None,
) -> ChainResult:
    """
    提供与BaseChain相同的async_run方法,用于调用Chains | Provide the same async_run method as BaseChain for calling Chains

    Complete chat async | 异步生成聊天结果

    Args:
        current_input (BaseMessage): current input | 当前输入
        conversation (Optional[list[BaseMessage]]): Conversation history | 会话历史
        elements (Optional[list[DocElement]]): Document elements | 文档元素
        knowledge (Optional[str]): Related knowledge | 相关知识
        tools (Optional[list[BaseTool]]): Available tool list | 可用的工具列表
        intermediate_msgs (Optional[list[BaseMessage]]): Intermediate messages, it will be generated during llm work
            in chain, list tool's response or other system message, those messages will not save into memory, but
            usefully during chain work | 中间消息,它将在链式工作中生成,列表工具的响应或其他系统消息,这些消息不会保存到记忆体中,
            但在链式工作中非常有用

    Returns:
        ChainResult: The result of completion | 完成的结果
    """
    if not self.validate_result:
        return await self._async_run(
            current_input=current_input,
            conversation=conversation,
            elements=elements,
            knowledge=knowledge,
            tools=tools,
            intermediate_msgs=intermediate_msgs,
        )
    else:
        chain_res = await self._async_run(
            current_input=current_input,
            conversation=conversation,
            elements=elements,
            knowledge=knowledge,
            tools=tools,
            intermediate_msgs=intermediate_msgs,
        )
        for _ in range(self.max_iter):
            is_valid, ask = self.validate_result(chain_res)
            if is_valid:
                break
            if ask:
                if intermediate_msgs is None:
                    intermediate_msgs = []
                intermediate_msgs.append(ask)
            chain_res += await self._async_run(
                current_input=current_input,
                conversation=conversation,
                elements=elements,
                knowledge=knowledge,
                tools=tools,
                intermediate_msgs=intermediate_msgs,
            )
        return chain_res

append_chain

append_chain(chain: Any) -> None

添加Chain到Chains中

Parameters:

Name Type Description Default
chain BaseChain | Chains

Chain实例

required

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def append_chain(self, chain: Any) -> None:
    """
    添加Chain到Chains中

    Args:
        chain(BaseChain | Chains): Chain实例

    Returns:
        None
    """
    if not isinstance(chain, (Chain, Chains)):
        raise ValueError("The chain must be a BaseChain or Chains instance.")
    if self._neural:
        self._neural.register(chain)
    self.chains.append(chain)

insert_chain

insert_chain(index: int, chain: Any) -> None

在指定位置添加Chain到Chains中

Parameters:

Name Type Description Default
index int

位置

required
chain Chain | Chains

Chain实例

required

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
def insert_chain(self, index: int, chain: Any) -> None:
    """
    在指定位置添加Chain到Chains中

    Args:
        index(int): 位置
        chain(Chain | Chains): Chain实例

    Returns:
        None
    """
    if not isinstance(chain, (Chain, Chains)):
        raise ValueError("The chain must be a BaseChain or Chains instance.")
    if self._neural:
        self._neural.register(chain)
    self.chains.insert(index, chain)

remove_chain

remove_chain(chain: Any) -> None

从Chains中移除Chain

Parameters:

Name Type Description Default
chain Chain | Chains

Chain实例

required

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
def remove_chain(self, chain: Any) -> None:
    """
    从Chains中移除Chain

    Args:
        chain(Chain | Chains): Chain实例

    Returns:
        None
    """
    if not isinstance(chain, (Chain, Chains)):
        raise ValueError("The chain must be a BaseChain or Chains instance.")
    if chain not in self.chains:
        raise ValueError("The chain is not in the chains.")
    if self._neural:
        chain.disconnect_from_neural(self._neural)
    self.chains.remove(chain)

clear_chains

clear_chains() -> None

清空Chains

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
429
430
431
432
433
434
435
436
437
438
439
def clear_chains(self) -> None:
    """
    清空Chains

    Returns:
        None
    """
    for c in self.chains:
        if self._neural:
            c.disconnect_from_neural(self._neural)
    self.chains.clear()

pop_chain

pop_chain() -> Optional[Union[Chain, Chains]]

弹出最后一个Chain

Returns:

Type Description
Optional[Union[Chain, Chains]]

None

Source code in tfrobot/brain/chain/chain_structures/base.py
441
442
443
444
445
446
447
448
449
450
451
452
def pop_chain(self) -> Optional[Union[Chain, "Chains"]]:
    """
    弹出最后一个Chain

    Returns:
        None
    """
    if self.chains:
        if self._neural:
            self.chains[-1].disconnect_from_neural(self._neural)
        return self.chains.pop()
    return None

connect_to_neural

connect_to_neural(neural: Neural) -> None

实现NeuralProtocol协议,向Neural注册自己

Parameters:

Name Type Description Default
neural Neural

Neural实例

required

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def connect_to_neural(self, neural: Neural) -> None:
    """
    实现NeuralProtocol协议,向Neural注册自己

    Args:
        neural(Neural): Neural实例

    Returns:
        None
    """
    if self._neural and self._neural is not neural:
        raise ValueError("The neural is already registered by another neural.")  # pragma: no cover
    self._neural = neural
    for c in self.chains:
        self._neural.register(c)

disconnect_from_neural

disconnect_from_neural(neural: Neural) -> None

实现NeuralProtocol协议,从Neural注销自己

Parameters:

Name Type Description Default
neural Neural

Neural实例

required

Returns:

Type Description
None

None

Source code in tfrobot/brain/chain/chain_structures/base.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
def disconnect_from_neural(self, neural: Neural) -> None:
    """
    实现NeuralProtocol协议,从Neural注销自己

    Args:
        neural(Neural): Neural实例

    Returns:
        None
    """
    if self._neural is not neural:  # pragma: no cover
        raise ValueError("The neural is not the same as the registered neural.")  # pragma: no cover
    for c in self.chains:
        self._neural.unregister(c)
    self._neural = None

get_instance_description

get_instance_description(visited: Optional[dict] = None, index_prefix: Optional[str] = None) -> str

实例描述。用于描述当前Chains实例的作用。因为有些时候需要大模型或者其它工具进行动态判断使用什么Chains进行操作。而这个动态属性就是为了表达 这个实例的特性与能力范围。

实例描述尽量简短,明确地表达当前实例的能力

Parameters:

Name Type Description Default
visited Optional[dict]

已访问的Chains实例,用于检查是否有循环引用。key为Chain实例,value为索引位置

None
index_prefix Optional[str]

索引前缀,用于表达当前实例在Chains中的索引位置

None

Returns:

Name Type Description
str str

默认返回当前cls.description,子类如有需要进行override

Source code in tfrobot/brain/chain/chain_structures/base.py
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
def get_instance_description(self, visited: Optional[dict] = None, index_prefix: Optional[str] = None) -> str:
    """
    实例描述。用于描述当前Chains实例的作用。因为有些时候需要大模型或者其它工具进行动态判断使用什么Chains进行操作。而这个动态属性就是为了表达
    这个实例的特性与能力范围。

    实例描述尽量简短,明确地表达当前实例的能力

    Args:
        visited(Optional[dict]): 已访问的Chains实例,用于检查是否有循环引用。key为Chain实例,value为索引位置
        index_prefix(Optional[str]): 索引前缀,用于表达当前实例在Chains中的索引位置

    Returns:
        str: 默认返回当前cls.description,子类如有需要进行override
    """
    des = self.abilities_purpose if self.abilities_purpose else "当前是一个思维链的集合,有以下Chain可用:"
    indent = "\t"
    visited = dict() if visited is None else visited
    for index, c in enumerate(self.chains):
        des += "\n\n---\n\n"
        current_index = f"{index_prefix}-{index}" if index_prefix else f"{index}"
        # 检查是否已访问,避免无限递归
        if c in visited:
            # TODO 这里需要添加一个循环引用可以参考的 Index,方便用户理解这个引用与哪个链重合,可以去查找。
            return indent + "<循环引用,已省略> 请参考: " + visited[c]
        visited.update({c: current_index})
        sub_des = (
            c.instance_description
            if isinstance(c, BaseChain)
            else c.get_instance_description(
                visited, index_prefix=f"{index_prefix}-{index}" if index_prefix else f"{index}"
            )
        )
        des += indent + "index: " + current_index + "\n"
        des += "\n".join([(indent + line) if line else line for line in sub_des.split("\n")])
    return des