1、前置条件:
1.1 集群1有kafka和nifi
1.2 集群2有nifi和TD
2、集群1消费kafka
2.1 添加process group
2.2 进入process group添加consumerkafka和output!
port
2.3 consumerKafka配置如下:

3、集群2消费集群1
3.1 集群2 添加remote process group

3.2 remote process group配置如下:(urls=集群1下nifi地址,多个地址以逗号隔开,传输协议为HTTP)
3.3 添加ExecuteGroovyScript处理器,脚本如下:
import groovy.json.JsonSlurperimport java.nio.charset.StandardCharsetsimport org.apache.commons.io.IOUtilsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.SQLExceptionimport java.sql.DriverManagerflowFileList=session.get(150)if (!flowFileList)return// 声明容器存储结果// StringBuffer stringBuffer = new StringBuffer()// 创建方法实现连接TAOS_TDenginedef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("数据库创建失败,请检查配置信息")session.transfer(flowFile, REL_FAILURE)}return conn}//判断获取到的150个flowFile不为空if ( ! flowFileList.isEmpty() ) {// 调用方法注入url获取连接String url = String.format("jdbc:TAOS://%s/%s?user=root&password=taosdata", IPPort, DBName)// 这种执行方式需要表存在(先创建表)String sql = String.format("INSERT INTO ")Connection conn = getConnect(url)flowFileList.each { // 遍历flowFile ->if (flowFile != null) {String text = ''session.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)String[] data = text.split(",")String[] lists = data[0].split("\\.")String newTagName=lists[0].concat("_").concat(lists[1])// 拼接sql语句String valueSql = String.format("%s.%s VALUES ('%s',%s) ", DBName, newTagName, data[1], data[2] )sql = sql.concat(valueSql)}}sql = sql.concat(";")// 执行ps = conn.prepareStatement(sql)ps.executeUpdate()ps.close()// closeif ( ! conn ) { conn.close() }if ( ! ps ) { ps.close() }}/*newFlowFile = session.create()stringBuffer.append(sql)stringBuffer.append(url)// 写入newFlowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// newFlowFile --> successsession.transfer(newFlowFile, REL_SUCCESS)*/// flowFile --> successsession.transfer(flowFileList,REL_FAILURE)
3.4 详细如下:

4、在nifi上用groovy脚本调用TDjdbc注意事项
4.1 需要把对应jdbc.jar包cp到${NIFI_HOME}/lib下
4.2 libtaos.so文件设置环境变量,如下:
LD_LIBRARY_PATH=/usr/libexport LD_LIBRARY_PATH
