跳转至

工作块

Agently Workflow的工作块结构

工作块结构说明:

  • 输入端点(Input Handles):输入端点为工作块提供了从上游接入连接关系的能力,帮助工作块确认上游工作流程的就绪情况,进而决定自己应该开始执行或是继续等待,同时,输入端点也能够帮助工作块获取从上游工作流程直接传递的数据,在工作块执行函数里通过inputs["端点名"]的方式即可读取
  • 执行函数(Chunk Executor):执行函数是工作块进行任务执行处理的核心部分,工作块的核心执行逻辑都通过执行函数进行编程和管理,在 Agently Workflow 中,执行函数是一个具有参数规范要求的函数,除了参数规范要求之外,您可以使用程序语言所支持的任何语法表达进行编程表达,引入任何其他模块,或是与其他程序结构在程序语言允许的范围内进行相互调用、信息传递,具有极高的灵活性
  • 输出端点(Output Handles):输出端点为工作块提供了向下游分发连接关系的能力,当工作块自身任务执行完毕后,工作块任务执行完毕的信号以及执行结果数据会通过输出端点向下游传递

工作块的基本创建方法

Agently Workflow 的工作块也是一个对象,绝大部分情况下需要归属于特定工作流,因此,您需要使用工作流对象提供的方法创建工作块对象:

创建工作块的基本方法
import Agently
workflow = Agently.Workflow()

def executor_function(inputs, storage):
    # 执行函数规范要求定义两个标准参数inputs和stroage
    # 参数inputs承接上游直接传递的数据
    # 参数storage管理单个工作流内共享的数据
    storage.set("input_data", inputs["default"])
    return "done"

my_chunk = workflow.schema.create_chunk( # 通过my_chunk变量存储创建好的工作块对象
    type="Normal", # 默认为Normal,可选值:Normal | Start | End | Condition | Loop
    title="save_input_data", # 可选,影响绘制流程图时展现的工作块名称
    executor=executor_function, # 必填,工作块执行函数
    handles={
        "inputs": [{ "handle": "input_handle_name" }]
        "outputs": [{ "handle": "output_handle_name" }]
    }, # 可选,定义工作块的输入和输出端点
)

Warning

先介绍工作块的基础创建方法更多是为了方便您理解我们后续提供的便利设计所依赖的基础设计,但我们并不建议和鼓励您使用基础创建方法来创建工作块,因为这将导致我们在框架中提供的很多便利语法失效,例如工作块不会被自动加入workflow.chunks工作块存储池,而.connect_to("工作块名")这样的简写方式也会失效等。

工作块的快速创建方法,根本停不下来的开发体验

看完上面的创建方法,可能有些朋友已经开始难受了,我还没开始写业务逻辑呢,光是定义一个工作块需要思考这么多事情?说好的 Agently 框架对保障开发者在开发过程中思路流畅、不被业务不相关信息干扰打断的极致追求呢?就这?!

别担心,上面那个只是基本方法,让我们简单做一下实际开发过程的分析:

  • 需要存储工作块对象: 在上面才说完工作块通常都需要归属于特定工作流,这里又需要自己用变量进行管理,太麻烦了,不能直接存进工作流对象里吗?
  • type: 这个参数大部分情况下都是使用"Normal"这个值,"Condition"和"Loop"通常是连接关系,用户不需要自己定义,而"Start"和"End"是工作流最基础的工作块,如果我们内置到工作流中,这个参数就不用填了
  • title: 大部分情况下用户都会直接给工作块取和executor_function的函数名一样的名字,如果能够取到函数名,这个参数也不用填了
  • handles: 仔细想想,通常我们在编写执行函数以及工作块连接关系的时候,才会逐渐清晰需要使用的端点情况,而对端点的最终定义表达,在定义连接关系的时候也会明确包含,为什么一定要在这里打断业务开发过程的思路去考虑这些细枝末节的东西?

对吧?整个过程分析下来,我们发现,除了executor_function之外,其他的信息都不需要在创建环节过多考虑,因此,我们为您提供了下面这种高效开发的工作块创建方式:

通过函数装饰器高效创建工作块
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def save_input_data(inputs, storage):
    storage.set("input_data", inputs["default"])
    return "done"

# -> workflow.chunks["save_input_data"]

使用工作流对象workflow内置的函数装饰器@workflow.chunk(),能够快速将一个函数定义为工作块,并置入工作流对象的工作块存储池workflow.chunks,工作块名和函数名保持一致,可以直接通过workflow.chunks["函数名"]的方式取出使用。

经过这样的优化,定义工作块的过程就变得非常轻松了吧?把您的注意力专注在捕捉思路里飞速闪现的业务处理逻辑灵感上吧,跟着灵感一个函数接着一个函数不被打断地写下去,根本停不下来,这是文思如泉涌一般无比顺畅的开发感受

使用工作流对象workflow内置的函数装饰器@workflow.chunk()进行工作块定义也是我们推荐的 Agently Workflow 开发实践方式。

Info

@workflow.chunk()装饰器也支持传入和workflow.schema.create_chunk()一致的任意参数,以满足开发者进一步定制修改的需要

工作块执行函数的定义规则

工作块执行函数是一类特殊的函数,需要遵循以下规则进行函数表达:

  • 输入参数
    • inputs:用于承接上游块发送过来的数据,inputs数据是字典结构,字典中的键与接收数据的输入端点相关,如果指定了输入端点,您就可以使用inputs["输入端点名"]获取数据,如果没有指定端点,数据将被传入到"default"键中,这也是最常见的情况,所以在代码中大量看到我们会使用inputs["default"]获取数据
    • storage:用于在整个工作流中进行数据共享,storage是一个数据存储器对象,你可以使用例如.set().get()等方式进行数据存取
  • 输出结果
    • return:在未指定输出端点时,通过函数的return输出的结果,将作为工作块的最终产出结果完整传递给下游,而如果指定了输出端点,则需要return时输出字典格式的数据,输出端点会查找字典中对应的键值,如果找到,则传递指定键值,否则传递None

Info

关于端点的知识,在本文档靠后的工作块的端点部分有更详细介绍

已定义工作块的获取和连接关系表达

Agently Workflow 为所有工作块对象都提供了表达连接关系的方法,如表达直接连接关系的.connect_to(),表达条件连接关系的.if_condition()等。

同时,如工作块的基本创建方法部分提到,如果使用workflow.schema.create_chunk()方法创建工作块对象,需要由开发者自行管理工作块对象的存储:

使用基本方法创建的工作块需要开发者自行存储和使用
"""使用变量存储定义好的工作块对象"""
from_chunk = workflow.schema.create_chunk(
    title="from_chunk",
    executor=from_chunk_executor,
)

to_chunk = workflow.schema.create_chunk(
    title="to_chunk",
    executor=to_chunk_executor,
)

"""使用工作块对象提供的连接方法进行工作块连接"""
from_chunk.connect_to(to_chunk)

而如工作块的快速创建方法部分提到,如果使用@workflow.chunk()装饰器定义的工作流,开发者将可以从工作流自带的工作块存储池workflow.chunks中找到已经定义的工作块并使用:

使用函数装饰器创建的工作块可以直接从workflow.chunks中取出
"""使用函数装饰器创建工作块对象"""
@workflow.chunk()
def from_chunk(inputs, storage):
    pass

@workflow.chunk()
def to_chunk(inputs, storage):
    pass

"""从工作块存储池中取出工作块对象直接使用"""
workflow.chunks["from_chunk"].connect_to(workflow.chunks["to_chunk"])

更多的连接关系表达将在连接关系部分进一步讨论。

预创建的工作块:START和END

为了方便开发者的使用,减少不必要的代码表达,我们已经将每个工作流都会用到的 开始工作块(START Chunk)结束工作块(END Chunk) 预先创建在workflow.chunks之中,您可以直接通过workflow.chunks["START"]workflow.chunks["END"]取出使用。

Tip

注:在更早期版本,我们将这两个工作块预创建为workflow.chunks["start"]workflow.chunks["end"],现在您同样可以继续使用,但"START"和"END"的命名是更规范的表达

同时,为了进一步简化开发者的表达,当您需要使用workflow.chunks["START"]作为起点进行连接关系表达时(这是最常见的工作流连接关系表达类型),您可以省略掉chunks["START"]部分,直接使用workflow进行连接,如:workflow.connect_to(workflow.chunks["to_chunk"])

嗯,对于减少不必要的开发过程表达,降低开发者开发过程负担这件事,我们是认真的!

工作块的端点

在介绍工作块结构的时候,您可能已经注意到,在 Agently Workflow 对工作块结构理解中,端点也是工作块结构的重要组成部分。

工作块端点的获取和连接关系表达

您可以通过workflow.chunks["chunk_name"].handle("handle_name")的方式使用工作块的特定端点,端点和工作块对象一样,可以使用各种表达连接关系的方法,端点也可以作为被连接关系指向的对象,如:

端点可以使用连接关系方法,也可以被连接关系方法指向
(
    # 将工作块from_chunk的输出端点output_handle
    workflow.chunks["from_chunk"].handle("output_handle")
        # 连接到工作块to_chunk的输入端点input_handle
        .connect_to(workflow.chunks["to_chunk"].handle("input_handle"))
)

输入端点同时承担接收上游任务完成信号和接收上游数据的双重责任

事实上,端点对于工作块的结构性而言非常重要,但在实际业务逻辑撰写时,大部分简单的工作流里只需要考虑工作块与工作块之间的简单连接,端点的存在感并不太强。当您发现自己需要使用端点的时候,您可能已经在编写一个非常复杂的工作流程了。

了解在 Agently Workflow 中,端点是如何影响工作块的执行顺序逻辑,以及如何与工作块执行函数配合进行数据传递的,将帮助您更好理解在 Agently Workflow 的设计理念里,工作流是如何运行的。

让我们看这样的一个例子:

如上图所示,在工作块的上游分别有Task 1Task 2两个任务,当任务完成后,分别会向下游传递{ "value": "1" }{ "value": "2" }两份数据。

当下游的承接工作块只使用一个输入端点承接Task 1Task 2(如CONDITION A所示)的时候,它的工作方式与下游的承接工作块使用两个输入端点分别承接Task 1Task 2(如CONDITION B所示)的时候会有哪些不同?

首先,我们需要理解 Agently Workflow 工作流执行逻辑的一个底层设计理念:如果一个工作块的全部输入端点都已经接到上游的任务完成信号,那么这个工作块就应该开始进行自己的执行工作

在这样的执行逻辑设计理念支撑下,在CONDITION A的情况下,两个上游任务Task 1Task 2在完成时,分别向承接工作块的唯一输入端点input_handle_a发送了一次任务完成信号,并分别发送了自己任务完成时需要向下游发送的数据,这会两次触发承接工作块的执行条件,让承接工作块执行两次:

CONDITION A演示
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def task_1(inputs, storage):
    return { "value": "1" }

@workflow.chunk()
def task_2(inputs, storage):
    return { "value": "2" }

@workflow.chunk()
def echo(inputs, storage):
    print("[Data Received]: ", str(inputs))
    return inputs

(
    workflow
        .connect_to(workflow.chunks["task_1"])
        .connect_to(workflow.chunks["echo"].handle("input_handle"))
)
(
    workflow
        .connect_to(workflow.chunks["task_2"])
        .connect_to(workflow.chunks["echo"].handle("input_handle"))
)
workflow.chunks["echo"].connect_to(workflow.chunks["END"])

result = workflow.start()
print(result)
运行结果
[Data Received]:  {'input_handle': {'value': '1'}}
[Data Received]:  {'input_handle': {'value': '2'}}
{'default': {'input_handle': {'value': '2'}}}
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
    d100ae41-7ec3-439d-9ce5-e4e848423853("START"):::chunk_style -.-> |"* -->-- default"| e064e7c2-027f-4cb0-ae19-ad4c62795dee("task_1"):::chunk_style
    e064e7c2-027f-4cb0-ae19-ad4c62795dee("task_1"):::chunk_style -.-> |"* -->-- input_handle"| 57728f0f-68b3-4c7c-9a6c-f545dd9b5103("echo"):::chunk_style
    d100ae41-7ec3-439d-9ce5-e4e848423853("START"):::chunk_style -.-> |"* -->-- default"| 919cf394-7c4e-459a-812d-45c3b74ea264("task_2"):::chunk_style
    919cf394-7c4e-459a-812d-45c3b74ea264("task_2"):::chunk_style -.-> |"* -->-- input_handle"| 57728f0f-68b3-4c7c-9a6c-f545dd9b5103("echo"):::chunk_style
    57728f0f-68b3-4c7c-9a6c-f545dd9b5103("echo"):::chunk_style -.-> |"* -->-- default"| 7b13fed8-a5bf-4dd2-bcad-c0c5f32372c0("END"):::chunk_style

而在CONDITION B的情况下,上游任务Task 1在完成时向承接工作块的输入端点input_handle_a发送了任务完成信号和数据,但这时对于承接工作块而言,因为input_handle_b的存在,并不是所有的输入端点都接收到了上游的任务完成信号,承接工作块将继续等待直到上游任务Task 2也完成并向输入端点input_handle_b发送任务完成信号和数据才会开始自己的任务执行:

CONDITION B演示
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def task_1(inputs, storage):
    return { "value": "1" }

@workflow.chunk()
def task_2(inputs, storage):
    return { "value": "2" }

@workflow.chunk()
def echo(inputs, storage):
    print("[Data Received]: ", str(inputs))
    return inputs

(
    workflow
        .connect_to(workflow.chunks["task_1"])
        .connect_to(workflow.chunks["echo"].handle("input_handle_a"))
)
(
    workflow
        .connect_to(workflow.chunks["task_2"])
        .connect_to(workflow.chunks["echo"].handle("input_handle_b"))
)
workflow.chunks["echo"].connect_to(workflow.chunks["END"])

result = workflow.start()
print(result)
运行结果
[Data Received]:  {'input_handle_a': {'value': '1'}, 'input_handle_b': {'value': '2'}}
{'default': {'input_handle_a': {'value': '1'}, 'input_handle_b': {'value': '2'}}}
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
    ca3a0c5d-e2f2-4593-bdc9-d045fd78a500("START"):::chunk_style -.-> |"* -->-- default"| a1a06785-4038-4f27-ac4f-b687713075df("task_1"):::chunk_style
    a1a06785-4038-4f27-ac4f-b687713075df("task_1"):::chunk_style -.-> |"* -->-- input_handle_a"| d8940420-c613-40a4-bd18-d1675aae18c8("echo"):::chunk_style
    ca3a0c5d-e2f2-4593-bdc9-d045fd78a500("START"):::chunk_style -.-> |"* -->-- default"| bbf420f0-0605-41b1-8263-d513bd3fe252("task_2"):::chunk_style
    bbf420f0-0605-41b1-8263-d513bd3fe252("task_2"):::chunk_style -.-> |"* -->-- input_handle_b"| d8940420-c613-40a4-bd18-d1675aae18c8("echo"):::chunk_style
    d8940420-c613-40a4-bd18-d1675aae18c8("echo"):::chunk_style -.-> |"* -->-- default"| da3e4227-607b-48b6-8970-0bca01a39cc8("END"):::chunk_style

可以看到,两段演示代码仅仅在连接端点上有微小差异,但对于运行逻辑却有非常显著的影响。开发者在复杂工作流编排时,需要选择合适的工作块连接方式。

输出端点主要承担对字典结构数据的分发责任

相较于输入端点,输出端点目前在工作流中对工作流执行逻辑的影响要小得多,因为在目前版本中,工作块执行函数的完成和输出是一次性的,通过函数的return进行表达及数据传递。而在这样的前提下,输出端点能做的仅仅是对字典类数据做进一步解析,把字典内的指定键的数据向下游传递,减少传递数据量,降低下游处理成本,仅此而已。事实上,在只有一个输出端点时,开发者更直觉地会在return时自行指定更少量准确的数据,而不是把一大块数据向下游传递,所以输出端点的存在感就更弱了。

下面举一个简单的例子说明输出端点在当前版本的作用:

输出端点在当前版本的作用
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def multi_output_handles(inputs, storage):
    return inputs["default"]

@workflow.chunk()
def print_a(inputs, storage):
    print(inputs["default"])
    return

@workflow.chunk()
def print_b(inputs, storage):
    print(inputs["default"])
    return

workflow.connect_to(workflow.chunks["multi_output_handles"])
(
    workflow.chunks["multi_output_handles"].handle("a")
        .connect_to(workflow.chunks["print_a"])
        #连接到END的不同输入端点确保两个任务都完成后才结束
        .connect_to(workflow.chunks["END"].handle("a"))
)
(
    workflow.chunks["multi_output_handles"].handle("b")
        .connect_to(workflow.chunks["print_b"])
        .connect_to(workflow.chunks["END"].handle("b"))
)

workflow.start({ "a": "Hello", "b": "World" })
运行结果
Hello
World
%%{ init: { 'flowchart': { 'curve': 'linear' }, 'theme': 'neutral' } }%%
%% Rendered By Agently %%
flowchart LR
classDef chunk_style fill:#fbfcdb,stroke:#666,stroke-width:1px,
classDef condition_chunk_style fill:#ECECFF,stroke:#9370DB,stroke-width:1px,
classDef loop_style fill:#f5f7fa,stroke:#666,stroke-width:1px,color:#333,stroke-dasharray: 5 5
    572ecfd6-4d2e-4db4-8496-1c9f0102b051("START"):::chunk_style -.-> |"* -->-- default"| b0ad66e6-7585-4ef3-a84e-47437d250416("multi_output_handles"):::chunk_style
    b0ad66e6-7585-4ef3-a84e-47437d250416("multi_output_handles"):::chunk_style -.-> |"a -->-- default"| 29c7a809-ceff-4e9b-b677-c6e8dd1d2021("print_a"):::chunk_style
    29c7a809-ceff-4e9b-b677-c6e8dd1d2021("print_a"):::chunk_style -.-> |"* -->-- a"| 529258a9-caad-4a58-aecc-c4c13eb3dc90("END"):::chunk_style
    b0ad66e6-7585-4ef3-a84e-47437d250416("multi_output_handles"):::chunk_style -.-> |"b -->-- default"| e53224b7-3040-4af8-bb64-8add6453192e("print_b"):::chunk_style
    e53224b7-3040-4af8-bb64-8add6453192e("print_b"):::chunk_style -.-> |"* -->-- b"| 529258a9-caad-4a58-aecc-c4c13eb3dc90("END"):::chunk_style

虽然现在看起来,输出端点的作用远没有输入端点的大,但试想另一个场景,如果我们允许异步+使用generator作为输出结果时,或是当我们尝试构建基于流式输出的工作流时,这样的分发是否就变得有意义了?这是我们目前仍在尝试探索的方向,欢迎一起探讨并完善设计。

在连接关系表达中隐藏的端点定义

在上面的代码样例中,可能您也发现了,我们并没有对工作块进行显性的端点定义,但我们在连接关系表达的过程中,描述了自己想要连接的端点,而这些端点就被自动创建了。

这也是我们为减轻开发者开发负担所做的一项减负设计。Enjoy it~

Info

如果您更习惯一切都预先定义,您仍然可以使用@workflow.chunks(handles = { "inputs": { "handle": "input_handle" }, "outputs": { "handle": "output_handle" } })的方式预先定义端点,但请注意,如果您定义了多个输入端点但没有给所有的输入端点连接可以完成的上游任务,这可能会导致工作块无法正确被触发执行