1.整合思路
HBase与MapReduce整合时,有三种情形:HBase作为MapReduce的数据流向;HBase作为MapReduce的数据来源;HBase同时作为MapReduce的数据来源与数据流向。当HBase作为数据来源时,如分析HBase里的数据,自定义Mapper需继承TableMapper,实质上是使用TableInputFormat取得数据。当HBase作为数据流向时,如从HDFS里向HBase里导入数据,自定义Reducer需继承TableReducer,实际上是使用TableOutputFormat进行格式化输出。同时,需要调用TableMapReduceUtil类的静态方法initTableMapperJob来标示作为数据输入来源的HBase表名和自定义Mapper类,用TableMapReduceUtil类的静态方法initTableReducerJob来标示作为数据输出流向的HBase表名和自定义Reducer类。
2.例子
有如下数据存于music1.txt文件中,每一条代表一首音乐的一次播放记录,我们需要统计每首音乐的播放次数
1_song1_2016-1-11 song1 singer1 man slow pc2_song2_2016-1-11 song2 singer2 woman slow ios3_song3_2016-1-11 song3 singer3 man quick andriod4_song4_2016-1-11 song4 singer4 woman slow ios5_song5_2016-1-11 song5 singer5 man quick pc6_song6_2016-1-11 song6 singer6 woman quick ios7_song7_2016-1-11 song7 singer7 man quick andriod8_song8_2016-1-11 song8 singer8 woman slow pc9_song9_2016-1-11 song9 singer9 woman slow ios10_song4_2016-1-11 song4 singer4 woman slow ios11_song6_2016-1-11 song6 singer6 woman quick ios12_song6_2016-1-11 song6 singer6 woman quick ios13_song3_2016-1-11 song3 singer3 man quick andriod14_song2_2016-1-11 song2 singer2 woman slow ios
我这里使用上一篇文章完成的HBase单元测试的方法帮我生成这张表
@Testvoid createTable() throws IOException {hBaseService.createTable("music","HBASE_ROW_KEY","info");hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","name","song1");hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","singer","singer1");hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","gender","man");hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","1_song1_2016-1-11","info","terminal","pc");hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","name","song2");hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","singer","singer2");hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","2_song2_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","name","song3");hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","singer","singer3");hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","gender","man");hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","3_song3_2016-1-11","info","terminal","andriod");hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","name","song4");hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","singer","singer4");hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","4_song4_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","name","song5");hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","singer","singer5");hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","gender","man");hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","5_song5_2016-1-11","info","terminal","pc");hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","name","song6");hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","singer","singer6");hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","6_song6_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","name","song7");hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","singer","singer7");hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","gender","man");hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","7_song7_2016-1-11","info","terminal","andriod");hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","name","song8");hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","singer","singer8");hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","8_song8_2016-1-11","info","terminal","pc");hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","name","song9");hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","singer","singer9");hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","9_song9_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","name","song4");hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","singer","singer4");hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","10_song4_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","name","song6");hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","singer","singer6");hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","11_song6_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","name","song6");hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","singer","singer6");hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","12_song6_2016-1-11","info","terminal","ios");hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","name","song3");hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","singer","singer3");hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","gender","man");hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","ryghme","quick");hBaseService.insertOrUpdate("music","13_song3_2016-1-11","info","terminal","andriod");hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","name","song2");hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","singer","singer2");hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","gender","woman");hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","ryghme","slow");hBaseService.insertOrUpdate("music","14_song2_2016-1-11","info","terminal","ios");}
进入hbase shell,创建namelist表
create 'namelist','details'
编写mapper、reducer、runner,建议导入/usr/local/hbase/lib中的所有jar包,我这里根据之前整合Hadoop的项目中又新导入如下两包
<!--HBase--><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-client</artifactId><version>2.4.12</version></dependency><dependency><groupId>org.apache.hbase</groupId><artifactId>hbase-zookeeper</artifactId><version>2.4.12</version></dependency>
public class MusicMapper extends TableMapper<Text, IntWritable> {@Overrideprotected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {List<Cell> cells = value.listCells();for (Cell cell : cells) {context.write(new Text(Bytes.toString(CellUtil.cloneValue(cell))),new IntWritable(1));}}}
public class MusicReducer extends TableReducer<Text, IntWritable,Text> {@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {int playCount = 0;for (IntWritable value : values) {playCount += value.get();}Put put = new Put(Bytes.toBytes(key.toString()));put.addColumn(Bytes.toBytes("details"),Bytes.toBytes("rank"),Bytes.toBytes(String.valueOf(playCount)));context.write(key,put);}}
public class MusicRunner {public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration configuration = HBaseConfiguration.create();configuration.set("hbase.rootdir","hdfs://hadoop0:9000/hbase");configuration.set("hbase.zookeeper.quorum","hadoop0");Job job = Job.getInstance(configuration,"top-music");//MapReduce程序基本配置job.setJarByClass(MusicRunner.class);job.setNumReduceTasks(1);//为music表设置过滤条件,只保留歌名nameScan scan = new Scan();scan.addColumn(Bytes.toBytes("info"),Bytes.toBytes("name"));//使用HBase提供的工具设置jobTableMapReduceUtil.initTableMapperJob("music",scan, MusicMapper.class, Text.class, IntWritable.class,job);TableMapReduceUtil.initTableReducerJob("namelist", MusicReducer.class,job);job.waitForCompletion(true);System.out.println("执行成功,统计结果保存在namelist表中");}}
执行main方法,我们在hbase shell中查看
这样一个简单的HBase整合MapReduce的例子就完成了,根据自己的业务场景进行修改即可~

