Java 类名:com.alibaba.alink.operator.stream.recommendation.RecommendationRankingStreamOp
Python 类名:RecommendationRankingStreamOp
功能介绍
该组件功能是对召回的结果进行排序,并输出排序后的TopK个object,此处排序算法用户可以通过创建PipelineModel的方式定制,具体使用方式参见代码示例。
参数说明
| 名称 | 中文名称 | 描述 | 类型 | 是否必须? | 取值范围 | 默认值 |
|---|---|---|---|---|---|---|
| mTableCol | Not available! | Not available! | String | ✓ | Selected column type is [M_TABLE] | |
| modelFilePath | 模型的文件路径 | Model file path | String | null | ||
| outputCol | 输出结果列 | Output result column name, optional, default null. | String | null | ||
| rankingCol | 用来排序的得分列 | Score column used for sorting. | String | null | ||
| reservedCols | 算法保留列名 | Algorithm reserved columns | String[] | null | ||
| topN | 前N的数据 | Select the closest N data | Integer | [1, +inf) | 10 | |
| modelStreamFilePath | 模型流的文件路径 | File path of the model stream | String | null | ||
| modelStreamScanInterval | 扫描模型路径的时间间隔 | Time interval for scanning the model path, in seconds. | Integer | 10 | ||
| modelStreamStartTime | 模型流的起始时间 | Start time of the model stream. Default is to read from the current time. Use yyyy-mm-dd hh:mm:ss.fffffffff format, see Timestamp.valueOf(String s) for details. | String | null |
代码示例
Python 代码
from pyalink.alink import *import pandas as pduseLocalEnv(1)import pandas as pddata = pd.DataFrame([["u6", "0.0 1.0", 0.0, 1.0, 1, "{\"data\":{\"iid\":[18,19,88]},\"schema\":\"iid INT\"}"]])predData = StreamOperator.fromDataframe(data, schemaStr='uid string, uf string, f0 double, f1 double, labels int, ilist string')predData = predData.link(ToMTableStreamOp().setSelectedCol("ilist"))data = pd.DataFrame([["u0", "1.0 1.0", 1.0, 1.0, 1, 18],["u1", "1.0 1.0", 1.0, 1.0, 0, 19],["u2", "1.0 0.0", 1.0, 0.0, 1, 88],["u3", "1.0 0.0", 1.0, 0.0, 0, 18],["u4", "0.0 1.0", 0.0, 1.0, 1, 88],["u5", "0.0 1.0", 0.0, 1.0, 0, 19],["u6", "0.0 1.0", 0.0, 1.0, 1, 88]]);trainData = BatchOperator.fromDataframe(data, schemaStr='uid string, uf string, f0 double, f1 double, labels int, iid string')oneHotCols = ["uid", "f0", "f1", "iid"]multiHotCols = ["uf"]pipe = Pipeline() \.add( \OneHotEncoder() \.setSelectedCols(oneHotCols) \.setOutputCols(["ovec"])) \.add( \MultiHotEncoder().setDelimiter(" ") \.setSelectedCols(multiHotCols) \.setOutputCols(["mvec"])) \.add( \VectorAssembler() \.setSelectedCols(["ovec", "mvec"]) \.setOutputCol("vec")) \.add(LogisticRegression() \.setVectorCol("vec") \.setLabelCol("labels") \.setReservedCols(["uid", "iid"]) \.setPredictionDetailCol("detail") \.setPredictionCol("pred")) \.add( \JsonValue() \.setSelectedCol("detail") \.setJsonPath(["$.1"]) \.setOutputCols(["score"]))lrModel = pipe.fit(trainData)rank = RecommendationRankingStreamOp(lrModel.save())\.setMTableCol("ilist")\.setOutputCol("il")\.setTopN(2)\.setRankingCol("score")\.setReservedCols(["uid", "labels"])rank.linkFrom(predData).print()StreamOperator.execute()
Java 代码
import org.apache.flink.types.Row;import com.alibaba.alink.operator.batch.StreamOperator;import com.alibaba.alink.operator.batch.source.MemSourceStreamOp;import com.alibaba.alink.pipeline.Pipeline;import com.alibaba.alink.pipeline.classification.LogisticRegression;import com.alibaba.alink.pipeline.dataproc.JsonValue;import com.alibaba.alink.pipeline.dataproc.vector.VectorAssembler;import com.alibaba.alink.pipeline.feature.MultiHotEncoder;import com.alibaba.alink.pipeline.feature.OneHotEncoder;import org.junit.Test;import java.util.Arrays;public class RecommendationRankingTest {@Testpublic void test() throws Exception {Row[] predArray = new Row[] {Row.of("u6", "0.0 1.0", 0.0, 1.0, 1, "{\"data\":{\"iid\":[18,19,88]},"+ "\"schema\":\"iid INT\"}")};Row[] trainArray = new Row[] {Row.of("u0", "1.0 1.0", 1.0, 1.0, 1, 18),Row.of("u1", "1.0 1.0", 1.0, 1.0, 0, 19),Row.of("u2", "1.0 0.0", 1.0, 0.0, 1, 88),Row.of("u3", "1.0 0.0", 1.0, 0.0, 1, 18),Row.of("u4", "0.0 1.0", 0.0, 1.0, 1, 88),Row.of("u5", "0.0 1.0", 0.0, 1.0, 1, 19),Row.of("u6", "0.0 1.0", 0.0, 1.0, 1, 88)};BatchOperator <?> trainData = new MemSourceBatchOp(Arrays.asList(trainArray),new String[] {"uid", "uf", "f0", "f1", "labels", "iid"});StreamOperator <?> predData = new MemSourceStreamOp(Arrays.asList(predArray),new String[] {"uid", "uf", "f0", "f1", "labels", "ilist"});String[] oneHotCols = new String[] {"uid", "f0", "f1", "iid"};String[] multiHotCols = new String[] {"uf"};Pipeline pipe = new Pipeline().add(new OneHotEncoder().setSelectedCols(oneHotCols).setOutputCols("ovec")).add(new MultiHotEncoder().setDelimiter(" ").setSelectedCols(multiHotCols).setOutputCols("mvec")).add(new VectorAssembler().setSelectedCols("ovec", "mvec").setOutputCol("vec")).add(new LogisticRegression().setVectorCol("vec").setLabelCol("labels").setReservedCols("uid", "iid").setPredictionDetailCol("detail").setPredictionCol("pred")).add(new JsonValue().setSelectedCol("detail").setJsonPath("$.1").setOutputCols("score"));RecommendationRankingStreamOp rank = new RecommendationRankingStreamOp(pipe.fit(trainData).save()).setMTableCol("ilist").setOutputCol("ilist").setTopN(2).setRankingCol("score").setReservedCols("uid", "labels");rank.linkFrom(predData).print();StreamOperator.execute();}}
运行结果
| uid | uf | f0 | f1 | labels | ilist |
|---|---|---|---|---|---|
| u6 | 0.0 1.0 | 0.0000 | 1.0000 | 1 | {“data”:{“iid”:[18,88],”score”:[0.9999999999999553,0.9999999999999472]},”schema”:”iid INT,score DOUBLE”} |
