在某些算法中,可能需要为数据集数据元分配唯一标识符。本文档展示了如何将DataSetUtils用于此目的。
压缩密集索引
zipWithIndex为数据元分配连续标签,接收数据集作为输入并返回(unique id, initial value)2元组的新数据集。此过程需要两次传递,首先计算然后标记数据元,并且由于计数的同步而不能流水线化。该替代方案zipWithUniqueId以流水线方式工作,并且在唯一标签足够时是优选的。例如,以下代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithIndex(in);result.writeAsCsv(resultPath, "\n", ",");env.execute();
import org.apache.flink.api.scala._val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")val result: DataSet[(Long, String)] = input.zipWithIndexresult.writeAsCsv(resultPath, "\n", ",")env.execute()
from flink.plan.Environment import get_environmentenv = get_environment()env.set_parallelism(2)input = env.from_elements("A", "B", "C", "D", "E", "F", "G", "H")result = input.zip_with_index()result.write_text(result_path)env.execute()
可以产生元组:(0,G),(1,H),(2,A),(3,B),(4,C),(5,D),(6,E),(7, F)
使用唯一标识符进行压缩
在许多情况下,可能不需要分配连续标签。 zipWithUniqueId以流水线方式工作,加快标签分配过程。此方法接收数据集作为输入,并返回(unique id, initial value)2元组的新数据集。例如,以下代码:
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataSet<String> in = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H");DataSet<Tuple2<Long, String>> result = DataSetUtils.zipWithUniqueId(in);result.writeAsCsv(resultPath, "\n", ",");env.execute();
import org.apache.flink.api.scala._val env: ExecutionEnvironment = ExecutionEnvironment.getExecutionEnvironmentenv.setParallelism(2)val input: DataSet[String] = env.fromElements("A", "B", "C", "D", "E", "F", "G", "H")val result: DataSet[(Long, String)] = input.zipWithUniqueIdresult.writeAsCsv(resultPath, "\n", ",")env.execute()
可以产生元组:(0,G),(1,A),(2,H),(3,B),(5,C),(7,D),(9,E),(11, F)
