快速入门指南

介绍

FlinkML 旨在从您的数据中学习一个简单的过程,抽象出来通常带有大数据学习任务的复杂性。 在这个快速入门指南中,我们将展示使用 FlinkML 解决一个简单的监督学习问题是多么的容易。 但是首先要介绍一些基础知识,如果你已经熟悉机器学习(ML),请随时跳过接下来的几行。

如 Murphy [1] 所定义的,机器学习(ML)用于检测数据中的模式,并使用这些学习到的模式来预测未来。 我们可以将大多数机器学习(ML)算法分为两大类:监督学习和无监督学习。

  • 监督学习 涉及从一个输入(特征)集合到一个输出集合学习一个函数(映射)。 学习是通过使用我们用来近似映射函数的(输入,输出)对训练集来完成的。监督学习问题进一步分为分类问题和回归问题。在分类问题中,我们尝试预测样例属于的类,例如用户是否要点击广告。另一方面,回归问题是要预测(实际的)数值,这个数值通常称为因变量,例如明天的温度是多少。

  • 无监督学习 用来发现数据中的模式和规律。 一个例子是聚类,我们尝试从描述性的特征中发现数据分组。 无监督学习也可用于特征选择,例如通过 主成分分析(principal components analysis) 进行特征选择。

连接 FlinkML

为了在您的项目中使用 FlinkML ,首先您必须建立一个 Flink 程序({{ site.baseurl }}/dev/linking_with_flink.html)。 . 接下来,您必须将 FlinkML 的依赖添加到项目的 pom.xml 中:

  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-ml{{ site.scala_version_suffix }}</artifactId>
  4. <version>{{site.version }}</version>
  5. </dependency>

加载数据

要加载与 FlinkML 一起使用的数据,我们可以使用 Flink 的 ETL 功能,或者使用处理诸如 LibSVM 格式的格式化数据的专门方法。 对于监督学习问题,通常使用 LabeledVector 类来表示 (标记,特征) 样例。 LabeledVector 对象将具有表示样例特征的 FlinkML Vector 成员,以及表示标记的 Double 成员,该标记可能是分类问题中的类,也可以是回归问题的因变量。

例如,我们可以使用 Haberman’s Survival 数据集,您可以从 UCI 机器学习数据库下载这个数据集。 该数据集“包含了对乳腺癌手术患者的存活进行研究的病例”。 数据来自逗号分隔的文件,前3列是特征,最后一列是类,第4列表示患者是否存活5年以上(标记1),或者5年内死亡(标记2)。 您可以查看 UCI 页面了解有关数据的更多信息。

我们可以先把数据加载为一个 DataSet[String]

  1. import org.apache.flink.api.scala._
  2. val env = ExecutionEnvironment.getExecutionEnvironment
  3. val survival = env.readCsvFile[(String, String, String, String)]("/path/to/haberman.data")

我们现在可以将数据转换成 DataSet[LabeledVector] 。 这将允许我们使用 FlinkML 分类算法的数据集。 我们知道数据集的第四个元素是类标记,其余的是特征,所以我们可以像这样构建 LabeledVector 元素:

  1. import org.apache.flink.ml.common.LabeledVector
  2. import org.apache.flink.ml.math.DenseVector
  3. val survivalLV = survival
  4. .map{tuple =>
  5. val list = tuple.productIterator.toList
  6. val numList = list.map(_.asInstanceOf[String].toDouble)
  7. LabeledVector(numList(3), DenseVector(numList.take(3).toArray))
  8. }

然后,我们可以使用这些数据来训练一个学习器。然而,我们将使用另一个数据集来示例建立学习器;这将让我们展示如何导入其他数据集格式。

LibSVM 文件

机器学习数据集的通用格式是 LibSVM 格式,并且可以在 LibSVM 数据集网站中找到使用该格式的多个数据集。 FlinkML 提供了通过 MLUtils 对象的 readLibSVM 函数加载 LibSVM 格式的数据集的实用程序。 您还可以使用 writeLibSVM 函数以 LibSVM 格式保存数据集。 让我们导入 svmguide1 数据集。 您可以在这里下载训练集测试集。 这是一个二进制分类数据集,由 Hsu 等人 [3] 在他们的实用支持向量机(SVM)指南中使用。 它包含4个数字特征和它的类标记。

我们可以简单地使用下面的代码导入数据集:

  1. import org.apache.flink.ml.MLUtils
  2. val astroTrain: DataSet[LabeledVector] = MLUtils.readLibSVM(env, "/path/to/svmguide1")
  3. val astroTest: DataSet[(Vector, Double)] = MLUtils.readLibSVM(env, "/path/to/svmguide1.t")
  4. .map(x => (x.vector, x.label))

它给了我们两个 DataSet 对象,我们会在下面的章节中使用这两个对象来生成一个分类器。

分类

一旦我们导入了数据集,我们可以训练一个 预测模型 ,如线性 SVM 分类器。 我们可以为分类器设置多个参数。 这里我们设置 Blocks 参数,它用于通过底层CoCoA算法 [2] 来分割输入。 正则化参数确定应用的 $l_2$ 正则化值,用于避免过拟合。 步长确定权重向量更新到下一个权重向量值的贡献。 此参数设置初始步长。

  1. import org.apache.flink.ml.classification.SVM
  2. val svm = SVM()
  3. .setBlocks(env.getParallelism)
  4. .setIterations(100)
  5. .setRegularization(0.001)
  6. .setStepsize(0.1)
  7. .setSeed(42)
  8. svm.fit(astroTrain)

我们现在可以对测试集进行预测,并使用 evaluate 函数创建(真值,预测)对。

  1. val evaluationPairs: DataSet[(Double, Double)] = svm.evaluate(astroTest)

接下来,我们将看到我们如何预处理我们的数据,并使用 FlinkML 的机器学习管道功能。

数据预处理和管道

在使用 SVM 分类时,经常被鼓励 [3] 的预处理步骤是将输入特征缩放到 [0,1] 范围,以避免极值特征的影响。 FlinkML 有一些转换器,例如被用于预处理数据的 MinMaxScaler 。 FlinkML 的一个关键特征是将 转换器预测模型 链接在一起的能力。 这样我们可以运行相同的转换流程,并且以直接的和类型安全的方式对训练和测试数据进行预测。您可以在管道文档中阅读更多关于FlinkML管道系统的信息。

我们首先为数据集中的特征创建一个归一化转换,并将其链接到一个新的 SVM 分类器。

  1. import org.apache.flink.ml.preprocessing.MinMaxScaler
  2. val scaler = MinMaxScaler()
  3. val scaledSVM = scaler.chainPredictor(svm)

我们现在可以使用我们新创建的管道来对测试集进行预测。 首先我们再次调用 fit 函数来训练缩放器和 SVM 分类器。 然后测试集的数据将被自动收敛,之后传递给 SVM 进行预测。

  1. scaledSVM.fit(astroTrain)
  2. val evaluationPairsScaled: DataSet[(Double, Double)] = scaledSVM.evaluate(astroTest)

收敛的输入应该会给我们更好的预测表现。

下一步

这个快速入门指南是一个对于 FlinkML 基础概念的介绍,但是你能做更多的事情。我们建议您查看 FlinkML 文档,尝试不同的算法。一个入门的好方法是用自己喜欢的来自于 UCI 机器学习库的数据集和 LibSVM 数据集进行试验。通过与其他数据科学家竞赛,从 KaggleDrivenData 这样的网站处理一个有趣的问题也是一种极好的学习方式。如果您想提供一些新的算法,请查看我们的贡献指南