ReduceStreamOperator 用来将流式数据转换为非流式数据。
有两种方式可以使用ReduceStreamOperator 算子。
通过reduce 函数构建 ReduceStreamOperator
from dbgpt.core.awel import DAG, ReduceStreamOperatorwith DAG("awel_reduce_operator") as dag:task = ReduceStreamOperator(reduce_function=lambda x, y: x + y)
实现一个自定义的 ReduceStreamOperator 算子
from dbgpt.core.awel import DAG, ReduceStreamOperatorclass MySumOperator(ReduceStreamOperator[int, int]):async def reduce(self, x: int, y: int) -> int:return x + ywith DAG("awel_reduce_operator") as dag:task = MySumOperator()
例子
在awel_tutorial 目录下创建一个名为 reduce_operator_sum_numbers.py的文件,其中代码如下
import asynciofrom typing import AsyncIteratorfrom dbgpt.core.awel import DAG, ReduceStreamOperator, StreamifyAbsOperatorclass NumberProducerOperator(StreamifyAbsOperator[int, int]):"""Create a stream of numbers from 0 to `n-1`"""async def streamify(self, n: int) -> AsyncIterator[int]:for i in range(n):yield iclass MySumOperator(ReduceStreamOperator[int, int]):async def reduce(self, x: int, y: int) -> int:return x + ywith DAG("sum_numbers_dag") as dag:task = NumberProducerOperator()sum_task = MySumOperator()task >> sum_tasko1 = asyncio.run(sum_task.call(call_data=5))if o1 == sum(range(5)):print(f"Success! n is 5, sum is {o1}")else:print("Failed")o2 = asyncio.run(sum_task.call(call_data=10))if o2 == sum(range(10)):print(f"Success! n is 10, sum is {o2}")else:print("Failed")
运行如下命令查看代码输出
poetry run python awel_tutorial/reduce_operator_sum_numbers.pySuccess! n is 5, sum is 10Success! n is 10, sum is 45
