Flink兼容Apache Hadoop MapReduce的接口,因此可以使用面向MapReduce的代码。
你可以:
- Flink中使用Hadoop
Writable数据类型(Data type). - 使用Hadoop
InputFormat作为数据源(DataSource). - 使用Hadoop
OutputFormat作为 数据落地(DataSink). - 使用Hadoop
Mapper作为 FlatMapFunction. - 使用Hadoop
Reducer作为 GroupReduceFunction.
这篇文档展示如何在Flink中使用现存的Hadoop MapReduce代码。可以参考 连接其他系统 来了解如何从Hadoop支持的文件系统中读取数据。
- This will be replaced by the TOC {:toc}
项目配置
支持Hadoop的输入输出(input/output)格式是flink-java和flink-scala的maven模块的一部分,这两部分是在编写Flink任务时经常需要用到的。 mapred和mapreduce 的api代码分别在org.apache.flink.api.java.hadoop和org.apache.flink.api.scala.hadoop以及一个额外的子包中。
对Hadoop MapReduce的支持是在flink-hadoop-compatibility的maven模块中。代码具体在org.apache.flink.hadoopcompatibility包中。
如果想要重复使用Mappers and Reducers, 需要在maven中的pom.xml中添加下面依赖:
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-hadoop-compatibility{{ site.scala_version_suffix }}</artifactId><version>{{site.version}}</version></dependency>
使用Hadoop数据类型
Flink支持所有的Hadoop Writable 和 WritableComparable 数据类型, 不用额外添加Hadoop Compatibility 依赖。 可以参考Programming Guide了解如何使用Hadoop数据类型(Hadoop data type)。
使用Hadoop输入格式
可以使用Hadoop输入格式来创建数据源,具体是调用 ExecutionEnvironment 的 readHadoopFile 或 createHadoopInput方法。 前者用于来自FileInputFormat的输入格式, 后者用于普通的输入格式。
创建的数据集包含的是一个“键-值”2元组,“值”是从Hadoop输入格式获得的数值。
下面的例子介绍如何使用Hadoop的 TextInputFormat。
使用Hadoop输出格式
Flink提供兼容Hadoop输出格式(Hadoop OutputFormat)的封装。支持任何实现org.apache.hadoop.mapred.OutputFormat接口或者继承org.apache.hadoop.mapreduce.OutputFormat的类。输出格式的封装需要的输入是“键值对”形式。他们将会交给Hadoop输出格式处理。
下面的例子介绍如何使用Hadoop的 TextOutputFormat。
使用Hadoop Mappers和Reducers
Hadoop Mappers 语法上等价于Flink的FlatMapFunctions,Hadoop Reducers语法上等价于Flink的GroupReduceFunctions。 Flink同样封装了Hadoop MapReduce的Mapper and Reducer接口的实现。 用户可以在Flink程序中复用Hadoop的Mappers and Reducers。 这时,仅仅org.apache.hadoop.mapred的Mapper and Reducer接口被支持。
The wrappers take a DataSet<Tuple2<KEYIN,VALUEIN>> as input and produce a DataSet<Tuple2<KEYOUT,VALUEOUT>> as output where KEYIN and KEYOUT are the keys and VALUEIN and VALUEOUT are the values of the Hadoop key-value pairs that are processed by the Hadoop functions. For Reducers, Flink offers a wrapper for a GroupReduceFunction with (HadoopReduceCombineFunction) and without a Combiner (HadoopReduceFunction). The wrappers accept an optional JobConf object to configure the Hadoop Mapper or Reducer.
封装函数用DataSet<Tuple2<KEYIN,VALUEIN>>作为输入, 产生DataSet<Tuple2<KEYOUT,VALUEOUT>>作为输出, 其中KEYIN和KEYOUT是“键” ,VALUEIN 和VALUEOUT 是“值”,它们是Hadoop函数处理的键值对。 对于Reducers,Flink将GroupReduceFunction封装成HadoopReduceCombineFunction,但没有Combiner(HadoopReduceFunction)。 封装函数接收可选的JobConf对象来配置Hadoop的Mapper or Reducer。
Flink的方法封装有
org.apache.flink.hadoopcompatibility.mapred.HadoopMapFunction,org.apache.flink.hadoopcompatibility.mapred.HadoopReduceFunction, andorg.apache.flink.hadoopcompatibility.mapred.HadoopReduceCombineFunction. 他们可以被用于FlatMapFunctions或GroupReduceFunctions.
下面的例子介绍如何使用Hadoop的Mapper和Reducer 。
// 获取待处理数据DataSet<Tuple2<Text, LongWritable>> text = [...]DataSet<Tuple2<Text, LongWritable>> result = text// 使用Hadoop Mapper (Tokenizer)作为Map函数.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())).groupBy(0)// 使用Hadoop Reducer (Counter)作为Reduce函数.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));
需要注意: Reducer封装处理由Flink中的groupBy()定义的groups。 它并不考虑任何在JobConf定义的自定义的分区器(partitioners), 排序(sort)或分组(grouping)的比较器。
完整Hadoop WordCount示例
下面给出一个完整的使用Hadoop 数据类型, InputFormat/OutputFormat/Mapper/Reducer的示例。
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();// 创建和初始化Hadoop TextInputFormat.Job job = Job.getInstance();HadoopInputFormat<LongWritable, Text> hadoopIF =new HadoopInputFormat<LongWritable, Text>(new TextInputFormat(), LongWritable.class, Text.class, job);TextInputFormat.addInputPath(job, new Path(inputPath));// 从Hadoop TextInputFormat读取数据.DataSet<Tuple2<LongWritable, Text>> text = env.createInput(hadoopIF);DataSet<Tuple2<Text, LongWritable>> result = text// 使用Hadoop Mapper (Tokenizer)作为Map函数.flatMap(new HadoopMapFunction<LongWritable, Text, Text, LongWritable>(new Tokenizer())).groupBy(0)// 使用Hadoop Reducer (Counter)作为Reduce函数.reduceGroup(new HadoopReduceCombineFunction<Text, LongWritable, Text, LongWritable>(new Counter(), new Counter()));// 创建和初始化Hadoop TextOutputFormat.HadoopOutputFormat<Text, IntWritable> hadoopOF =new HadoopOutputFormat<Text, IntWritable>(new TextOutputFormat<Text, IntWritable>(), job);hadoopOF.getConfiguration().set("mapreduce.output.textoutputformat.separator", " ");TextOutputFormat.setOutputPath(job, new Path(outputPath));// 使用Hadoop TextOutputFormat输出结果.result.output(hadoopOF);// 执行程序env.execute("Hadoop WordCount");
