一、准备工作
1、环境
NIFI:1.9.0
HBase:2.1.1
2、创建HBase测试表
-- 建表1create 'USER', 'info', 'data'-- 建表2create 'USER', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}create 'USER_02', {NAME => 'info', VERSIONS => '3'}, {NAME => 'data', VERSIONS => '3'}-- 向user表中插入信息,row key为rk0001,列族info中添加name列标示符,值为zhangsanput 'USER', 'rk0001', 'info:name', 'zhangsan'put 'USER', 'rk0001', 'info:gender', 'female'put 'USER', 'rk0001', 'info:age', 20put 'USER', 'rk0001', 'data:pic', 'picture'put 'USER', 'rk0002', 'info:name', 'lisi'put 'USER', 'rk0002', 'info:gender', 'female'put 'USER', 'rk0002', 'info:age', 18put 'USER', 'rk0002', 'data:pic', 'picture'put 'USER', 'rk0003', 'info:name', 'wangwu'put 'USER', 'rk0003', 'info:gender', 'female'put 'USER', 'rk0003', 'info:age', 18put 'USER', 'rk0003', 'data:pic', 'picture'-- 获取user表中row key为rk0001的所有信息get 'USER', 'rk0001'-- 查询user表中的所有信息scan 'USER'-- 清空user表中的数据truncate 'USER'
二、整体流程预览
三、ExecuteGroovyScript
1、配置详情
2、脚本
import java.nio.charset.StandardCharsets// 获取以逗号拆分的库名字符串String TBList = TBList// 拆分库名String[] data = TBList.split(",")// 获取数据长度int length = data.length;if (length==0){return}// 循环所有库for (Integer i = 0; i < length; i++) {if (data[i]==""){session.transfer(flowFile,REL_FAILURE)continue}flowFile = session.create()// 添加属性flowFile = session.putAttribute(flowFile, 'TBName', data[i])// 写入flowFile中session.write(flowFile, {outputStream ->outputStream.write(data[i].toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// flowFile --> successsession.transfer(flowFile, REL_SUCCESS)}
四、ScanHBase
1、配置详情
2、获取到json格式
[{"row": "rk0001","cells": [{"fam": "data","qual": "pic","val": "picture","ts": 1636511060633}, {"fam": "info","qual": "age","val": "20","ts": 1636511059579}, {"fam": "info","qual": "gender","val": "female","ts": 1636511059540}, {"fam": "info","qual": "name","val": "zhangsan","ts": 1636511059469}]}]
五、HBase_2_ClientService
1、配置详情
六、ExecuteGroovyScript
1、配置详情
2、脚本
import groovy.json.JsonSlurperimport java.nio.charset.StandardCharsetsimport org.apache.commons.io.IOUtils// 批量获取流文件flowFileList=session.get(150)if (!flowFileList)returnflowFileList.each {flowFile ->if (!flowFile) {return}String content = ''session.read(flowFile, {inputStream ->content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)def data = new JsonSlurper().parseText(content)data.each{list->// 获取rowkeydef row = list.rowlist.cells.each{cell ->// 声明容器存储结果StringBuffer stringBuffer = new StringBuffer()newFlowFile = session.create()// 添加属性,CF(列簇名)session.putAttribute(newFlowFile, 'CF', cell.fam)// 传递属性,TBName(HBase的表名)session.putAttribute(newFlowFile,'TBName',flowFile.getAttribute('TBName'))// 拼接拆分的数据为jsonstringBuffer.append("{\"row\":\"" + row + "\",\"" + cell.qual + "\":\"" + cell.val + "\"}" )// 写入newFlowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// newFlowFile --> successsession.transfer(newFlowFile, REL_SUCCESS)}}}// flowFile --> failuresession.transfer(flowFileList,REL_FAILURE)
七、PutHBaseJson
1、配置详情
八、问题
ScanHBase报错:
14:56:00 CST ERRORScanHBase[id-0dbd7ef0-017d-1 00082a0-c67b6329ebff] Unable to fetch rowsfrom HBase table HBASE DEV PI B 2016 due to Cannot invoke method publicabstract voidorg.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.Stringjaa.lang.String,java.lang.Stringjava.lang.L ong,java.lang.L ong,java.lang.Integerjava.lang.Boolean,java.util.Collection,java.util.List,org.apache.nifi.hbase.scan.ResultHandler) throws java.io.IOException on Controller Service with identifier 0dc740db- 017d-1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currentlyENABL ING: org.apache.nifi.controller.service.ControllerServiceDisabledException:Cannot invoke method public abstract voidorg.apache.nifi.hbase.HBaseClientService.scan(java.lang.String,java.lang.String,java.lang.String,java.lang.String,java.lang.Long,java.lang.L ong,java.lang.Integer,java.lang.Boolean,java.util.Collection,java.util.List,org apache.nifi.hbase.scan.ResultHandler) throws java.io.IOException on Controller Service with identifier 0dc740db-017d-1Ó00-cfbb-0c8bf1119ac0 because the Controller Service's State is currentlyENABL ING

问题定位:
1、出现如上问题可能原因为:Zookeeper ZNode Parent 的路径不对。
2、正确Zookeeper ZNode Parent路径为HBase Master UI上Zookeeper Base Path
