0、前置条件
0.1 当前节点安装TDengine,
0.2 taos-jdbcdriver-2.0.18.jar cp 到所有 ${NIFI_HOME}/lib下
0.3 注意jar和TDengine的版本匹配
0.4 添加libtaos.so环境变量
LD_LIBRARY_PATH=/usr/libexport LD_LIBRARY_PATH
1、Groovy1:
1.1 根据数据库获取db下面的所有超级表,传出(db,超级表名),Groovy脚本如下:
// 导入相关类import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.ResultSetimport java.sql.SQLExceptionimport java.sql.DriverManager// 获取流文件// flowFile = session.get()// if(!flowFile)returnflowFile = session.create()// 创建方法实现连接TAOS_TDenginedef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("数据库创建失败,请检查配置信息")session.transfer(flowFile, REL_FAILURE)}return conn}String db = "mhs" //要抽取都库名// 调用方法注入url获取连接String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",db)Connection conn = getConnect(url)// 处理SQLPreparedStatement ps = conn.prepareStatement("show stables;")// 执行ResultSet rs = ps.executeQuery()// 声明容器存储结果StringBuffer stringBuffer = new StringBuffer()// 遍历结果while (rs.next()) {stringBuffer.append(db)stringBuffer.append(",")stringBuffer.append(rs.getString(1))stringBuffer.append("\r\n")}// 写入flowFile中session.write(flowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// flowFile --> successsession.transfer(flowFile, REL_SUCCESS)rs.close()ps.close()conn.close()

1.2 failure流输出到/dev/null 或者自动终,如下:
1.3 调大 run schedule,确保只运行一次。

2、splitText
2.1 获取每一行(db,超级表名)
2.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止,如下:
3、Groovy2:
3.1 获取 (db,超级表名),查询超级表下所有子表,传出 (db,子表名,超级表名),Groovy脚本如下:
// 导入相关类import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.ResultSetimport java.sql.SQLExceptionimport java.sql.DriverManager// 获取流文件flowFile = session.get()if(!flowFile)return// 创建方法实现连接TAOS_TDenginedef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("数据库创建失败,请检查配置信息")session.transfer(flowFile, REL_FAILURE)}return conn}def text = ''session.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)String[] data = text.split(",")// 调用方法注入url获取连接String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])Connection conn = getConnect(url)// 处理SQLString sql = String.format("\"select tbname from %s.%s\"",data[0],data[1])PreparedStatement ps = conn.prepareStatement(sql)// 执行ResultSet rs = ps.executeQuery()// 声明容器存储结果StringBuffer stringBuffer = new StringBuffer()// 遍历结果while (rs.next()) {stringBuffer.append(data[0])stringBuffer.append(",")stringBuffer.append(rs.getString(1))stringBuffer.append(",")stringBuffer.append(data[1])stringBuffer.append("\r\n")}newFlowFile = session.create()// 写入flowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// flowFile --> successsession.transfer(newFlowFile, REL_SUCCESS)session.transfer(flowFile,REL_FAILURE)rs.close()ps.close()conn.close()
**3.2 failure流输出到/dev/null 或自动终止,同上。success输出到splitText
4、splitText:
4.1 获取每一行 (db,子表名,超级表名)
4.2 splits输出到下一个Groovy,failure、oraginal输出到/dev/null 或自动终止
5、Groovy3:
5.1 获取 (db,子表名,超级表名) ,查询子表下的数据,传出查询到的数据(格式:”t1,v1;t2,v2;t3,v3;”),同时以属性的形式传出 (db,子表名,超级表名) 。Groovy脚本如下:
// 导入相关类import java.nio.charset.StandardCharsetsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.ResultSetimport java.sql.SQLExceptionimport java.sql.DriverManager// 获取流文件flowFile = session.get()if(!flowFile)returndef text = ''session.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)String[] data = text.split(",")// 创建方法实现连接TAOS_TDenginedef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("数据库创建失败,请检查配置信息")session.transfer(flowFile, REL_FAILURE)}return conn}// 调用方法注入url获取连接String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata",data[0])Connection conn = getConnect(url)// 处理SQLString sql = String.format("select * from %s.%s;",data[0],data[1])PreparedStatement ps = conn.prepareStatement(sql)// 执行ResultSet rs = ps.executeQuery()// 声明容器存储结果StringBuffer stringBuffer = new StringBuffer()// 遍历结果while (rs.next()) {stringBuffer.append(rs.getTimestamp("point_time"))stringBuffer.append(",")stringBuffer.append(rs.getDouble("value_double"))/* stringBuffer.append(",")stringBuffer.append(rs.getString("tagname"))*/stringBuffer.append(";")}flowFile = session.putAttribute(flowFile, 'DBAndTable', text)newFlowFile = session.create()newFlowFile = session.putAttribute(newFlowFile, 'DBAndTable', text)// 写入flowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)// newFlowFile --> successsession.transfer(newFlowFile, REL_SUCCESS)// flowFile --> failuresession.transfer(flowFile, REL_FAILURE)rs.close()ps.close()conn.close()
5.2 success输出到groovy4,failure输出到/dev/null或自动终止,同上。
6、Groovy4:
6.1 获取Groovy3的flowFile,以“;”拆分,获取长度,以190一条数据的格式执行为一条sql,执行sql语句。脚本如下:
// 导入相关类import org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.ResultSetimport java.sql.SQLExceptionimport java.sql.DriverManager// 获取流文件flowFile = session.get()if(!flowFile)return/*StringBuffer stringBuffer = new StringBuffer()newFlowFile = session.create()*/// 获取数据def text = ''session.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)// 拆分数据String[] data = text.split(";");// 获取flowFile传入点属性(db.tagname)tagname=子表String DBAndTable = flowFile.getAttribute('DBAndTable')// 拆分String[] dbTb = DBAndTable.split(",")// 创建方法实现连接TAOS_TDenginedef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("数据库创建失败,请检查配置信息")session.transfer(flowFile, REL_FAILURE)}return conn}// 调用方法注入url获取连接String url = String.format("jdbc:TAOS://10.168.1.15:6030/%s?user=root&password=taosdata", dbTb[0])Connection conn = getConnect(url)// 获取数据长度int length = data.length;// 这种插入方式当表不存在时创建表String sql = String.format("INSERT INTO %s USING %s TAGS ('%s') VALUES ", dbTb[1], dbTb[2], dbTb[1]);// 子表都存在时可以这样插入// String sql = String.format("INSERT INTO %s VALUES ",sonTabNam);if ( length > 0 ) { // 判断长度不等于0// 分批执行Integer startNumber = 0;Integer sendNumber = 0;Integer ln = 190;// 判断需要执行多少批Integer count = length/ln;if (length==0) {return}// 当length % 总长度==0时,count+1if ( length % ln == 0) {// 循环count次for (Integer i = 1; i < count+1; i++) {startNumber = sendNumbersendNumber=i*lnfor (Integer j = startNumber; j < sendNumber; j++) {String line = data[j];String[] str = line.split(",");// 处理SQLString value = String.format("('%s',%s),",str[0],str[1]);sql = sql.concat(value);}// 删除最后一个字符sql = sql.substring(0, sql.length() - 1);// 结尾sql = sql.concat(";");/*stringBuffer.append(sql)// 写入flowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)*/// 执行ps = conn.prepareStatement(sql)ps.executeUpdate()}} else { // 当length % 总长度!=0时,count+2for (Integer i = 1; i < count+2; i++) {startNumber = sendNumber;sendNumber=i*ln;if (i==count+1) {sendNumber=length;}for (Integer j = startNumber; j < sendNumber; j++) {String line = data[j];String[] str = line.split(",");// 处理SQLString value = String.format("('%s',%s),",str[0], "0.02");sql = sql.concat(value);}// 删除最后一个字符sql = sql.substring(0, sql.length() - 1);// 结尾sql = sql.concat(";");/*stringBuffer.append(sql)// 写入flowFile中session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)*/// 执行ps = conn.prepareStatement(sql)ps.executeUpdate()}}if (ps !=null) {ps.close()}conn.close()}// flowFile --> successsession.transfer(flowFile, REL_FAILURE)/*// newLlowFile --> successsession.transfer(newFlowFile, REL_SUCCESS)*/
6.2 success、failure输出到/dev/null或自动终止,同上。
7、总览:

8、夸集群参见Kafka -> Nifi ->TD
8.1 1~5步在集群1实现,第6步在集群2实现
