JoinOperator 算子用于将多路输入数据连接成单个输入数据。比如说,如果有两个父节点,你可以通过Join算子将其连接为一个数据分支。
JoinOperator的构建方式如下
通过组合函数来构建
from dbgpt.core.awel import DAG, JoinOperatorwith DAG("awel_join_operator") as dag:task = JoinOperator(combine_function=lambda x, y: x + y)
例子
两数之和
在awel_tutorial目录下创建一个名称为join_operator_sum_number.py 的文件,代码如下
import asynciofrom dbgpt.core.awel import (DAG, JoinOperator, MapOperator, InputOperator, SimpleCallDataInputSource)with DAG("sum_numbers_dag") as dag:# Create a input task to receive data from call_datainput_task = InputOperator(input_source=SimpleCallDataInputSource())task1 = MapOperator(map_function=lambda x: x["t1"])task2 = MapOperator(map_function=lambda x: x["t2"])sum_task = JoinOperator(combine_function=lambda x, y: x + y)input_task >> task1 >> sum_taskinput_task >> task2 >> sum_taskif asyncio.run(sum_task.call(call_data={"t1": 5, "t2": 8})) == 13:print("Success!")else:print("Failed")
通过如下命令进行运行, 并观察输出:
poetry run python awel_tutorial/join_operator_sum_numbers.py> Success!
DAG流程图如下

