Java 类名:com.alibaba.alink.operator.stream.dataproc.LookupRedisStreamOp
Python 类名:LookupRedisStreamOp
功能介绍
支持数据查找功能,支持多个key的查找,并将查找后的结果中的value列添加到待查询数据后面。
功能类似于 LookUpStreamOp ,不同的是被查找的数据存储在 Redis 中。
参数说明
| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
|---|---|---|---|---|---|---|
| outputSchemaStr | Schema | Schema。格式为”colname coltype[, colname2, coltype2[, …]]”,例如”f0 string, f1 bigint, f2 double” | String | ✓ | ||
| pluginVersion | 插件版本号 | 插件版本号 | String | ✓ | ||
| selectedCols | 选择的列名 | 计算列对应的列名列表 | String[] | ✓ | ||
| clusterMode | Not available! | Not available! | Boolean | false | ||
| databaseIndex | Not available! | Not available! | Long | |||
| redisIP | Not available! | Not available! | String | |||
| redisIPs | Not available! | Not available! | String[] | |||
| redisPassword | Not available! | Not available! | String | |||
| redisPort | Not available! | Not available! | Integer | 6379 | ||
| reservedCols | 算法保留列名 | 算法保留列 | String[] | null | ||
| timeout | Not available! | Not available! | Integer |
代码示例
以下代码仅用于示意,可能需要修改部分代码或者配置环境后才能正常运行!
Python 代码
df = pd.DataFrame([["id001", 123, 45.6, "str"]])inOp = BatchOperator.fromDataframe(df, schemaStr='id string, col0 bigint, col1 double, col2 string')redisIP = "*"redisPort = 26379RedisSinkBatchOp()\.setRedisIP(redisIP)\.setRedisPort(redisPort)\.setKeyCols(["id"])\.setPluginVersion("2.9.0")\.setValueCols(["col0", "col1", "col2"])\.linkFrom(inOp)BatchOperator.execute()df2 = pd.DataFrame([["id001"]])needToLookup = StreamOperator.fromDataframe(df2, schemaStr="id string")LookupRedisStreamOp()\.setRedisIP(redisIP)\.setRedisPort(redisPort)\.setPluginVersion("2.9.0")\.setSelectedCols(["id"])\.setOutputSchemaStr("col0 bigint, col1 double, col2 string")\.linkFrom(needToLookup)\.print()StreamOperator.execute()
Java 代码
import org.apache.flink.types.Row;import com.alibaba.alink.common.AlinkGlobalConfiguration;import com.alibaba.alink.operator.batch.BatchOperator;import com.alibaba.alink.operator.batch.sink.RedisSinkBatchOp;import com.alibaba.alink.operator.batch.source.MemSourceBatchOp;import com.alibaba.alink.operator.stream.StreamOperator;import com.alibaba.alink.operator.stream.dataproc.LookupRedisStreamOp;import com.alibaba.alink.operator.stream.source.MemSourceStreamOp;import com.alibaba.alink.testutil.AlinkTestBase;import org.junit.Test;import java.util.Collections;public class LookupRedisStreamOpTest extends AlinkTestBase {@Testpublic void map() throws Exception {String redisIP = "*";int redisPort = 26379;MemSourceBatchOp memSourceBatchOp = new MemSourceBatchOp(Collections.singletonList(Row.of("id001", 123L, 45.6, "str")),"id string, col0 bigint, col1 double, col2 string");new RedisSinkBatchOp().setRedisIP(redisIP).setRedisPort(redisPort).setKeyCols("id").setPluginVersion("2.9.0").setValueCols("col0", "col1", "col2").linkFrom(memSourceBatchOp);BatchOperator.execute();MemSourceStreamOp needToLookup = new MemSourceStreamOp(Collections.singletonList(Row.of("id001")),"id string");new LookupRedisStreamOp().setRedisIP(redisIP).setRedisPort(redisPort).setPluginVersion("2.9.0").setSelectedCols("id").setOutputSchemaStr("col0 bigint, col1 double, col2 string").linkFrom(needToLookup).print();StreamOperator.execute();}}
运行结果
| id | col0 | col1 | col2 |
|———-+———+————-+———|
| id001 | 123 | 45.6000 | str |
