跳转至

数据传递方式

为了保证能够在不同场景下能够高效、准确地进行数据传递, Agently Workflow 为您提供了多种数据传递方式。

工作块间数据传递

如上图所示,工作块间的数据传递主要经历了以下步骤:

① from_chunk工作块执行函数的return指令同时传递了两个信息,一是执行函数的最终执行结果,二是执行函数的执行完成信号,当return指令被执行时,执行函数的执行完成信号和最终执行结果沿着③from_chunk的输出端点(output handle)(默认为"default"端点)-④to_chunk的输入端点(input handle)(默认为"default"端点)传递,最终执行结果将被放入② to_chunk工作块执行函数的inputs参数与输入端点同名的键(默认为"default")中,但这时to_chunk的执行函数还不会被启动,直到to_chunk的全部输入端点都接收到执行完成信号后,to_chunk的执行函数才会被启动。

在实际开发中,对只有一个输出端点的上游工作块和只有一个输入端点的下游工作块进行连接关系表达时,输出端点和输入端点不需要被显性指定,以减轻开发者的表达负担,在这时,默认的输出端点和输入端点将会被使用,return给出的完整最终执行结果将从默认的输出端点原样输出,并传递到下游工作块执行函数的inputs参数的"default键"中,可以通过inputs["default"]完整取出。

关于执行函数触发条件与输入端点的关系的详细说明请点击这里回顾

对于工作流的第一个工作块的inputs,您可以通过在workflow.start()方法中传入数据的方式进行传递。

工作块间数据传递
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def from_chunk(inputs, storage):
    print("Init Data: ", inputs["default"])
    return "HI"

@workflow.chunk()
def to_chunk(inputs, storage):
    print("from_chunk passed data: ", inputs["default"])
    return

(
    workflow
        .connect_to("from_chunk")
        .connect_to("to_chunk")
        .connect_to("END")
)

workflow.start("This is init data.")
运行结果
Init Data:  This is init data.
from_chunk passed data:  HI

工作流内共享数据存储(storage)

工作块执行函数的第二个参数storage,是整个工作流内共享的数据存储管理器,它可以通过以下方式被使用:

  1. 在工作流开始执行时:当工作流开始执行时(即workflow.start()方法被调用时),您可以使用可选参数storage对工作流内共享数据存储进行初始化赋值,如workflow.start(storage={ "msg": "hi" }),注意,storage的初始化值必须为字典类型
  2. 在工作块执行函数中:通过执行函数的storage参数的.get("<键名>")方法从工作流内共享数据存储空间中取出特定键的值,通过.set("<键名>", <值>)方法向工作流内共享数据存储空间中写入特定键的值
  3. 在工作流结束时:当工作流执行完毕后,您可以通过workflow.storage.get()方法获取工作流内共享数据存储空间中的所有数据,或是通过workfow.storage.get("<键名>")方法获取特定键的值
工作流内共享数据存储
import Agently
workflow = Agently.Workflow()

@workflow.chunk()
def get_and_set_storage(inputs, storage):
    init_storage = storage.get("init")
    print("init storage: ", init_storage)
    storage.set("workflow_update", "Agently Workflow")
    return

workflow.connect_to("get_and_set_storage").connect_to("END")

workflow.start(storage={"init": "Hello"})
print("from workflow: ", workflow.storage.get("workflow_update"))
运行结果
init storage:  Hello
from workflow:  Agently Workflow

跨工作流的共享数据存储(public_storage)

在某些情况下,您可能还会需要在多个工作流之间进行数据共享,Agently Workflow 也为您提供了相应的解决方案。

Agently.Workflow.public_storage是Workflow类的静态属性,这个属性指向同一个数据存储管理器对象,因为静态属性的特性,所有工作流实例都可以通过共享同一个数据存储管理器对象,从而访问同一个公共数据存储空间。

跨工作流的共享数据存储
import Agently
workflow1 = Agently.Workflow()
workflow2 = Agently.Workflow()

@workflow1.chunk()
def set_public_storage(inputs, storage):
    # 您可以使用Agently.Workflow类的public_storage属性
    Agently.Workflow.public_storage.set("msg1", "Hello")
    # 您也可以使用workflow1实例的public_storage属性
    Agently.Workflow.public_storage.set("msg2", "Hi")
    return

@workflow2.chunk()
def get_public_storage(inputs, storage):
    print("msg1: ", Agently.Workflow.public_storage.get("msg1"))
    print("msg2: ", workflow2.public_storage.get("msg2"))
    return

workflow1.connect_to("set_public_storage").connect_to("END")
workflow2.connect_to("get_public_storage").connect_to("END")
workflow1.start()
workflow2.start()
运行结果
msg1:  Hello
msg2:  Hi