一、源端
1、GenerateFlowFile
1.1 配置
a) Unique FlowFiles:填写POST请求的Json串
b) Run Schedule:控制请求频率
2、InvokeHTTP
a) Remote URL:请求地址
b) Content-Type:请求头,json类型
3、SplitJson
4、OutPut
二、目标端
1、NifiFlow
2、ExecuteGroovyScript
a) Script Body:自定义脚本
b) TDUrl:TD连接地址
c) Script Body脚本如下:
import groovy.json.JsonSlurperimport java.nio.charset.StandardCharsetsimport org.apache.commons.io.IOUtilsimport java.sql.Connectionimport java.sql.PreparedStatementimport java.sql.SQLExceptionimport java.sql.DriverManager// 获取流集合flowFileList=session.get(100000)// 判断流集合是否为空if (!flowFileList)return// 自定义获取连接方法// com.taosdata.jdbc.rs.RestfulDriver// com.taosdata.jdbc.TSDBDriverdef getConnect(String jdbcUrl){Class.forName("com.taosdata.jdbc.TSDBDriver")Connection conn = DriverManager.getConnection(jdbcUrl)if(null == conn){throw new SQLException("Database Create Error")session.transfer(flowFile, REL_FAILURE)}return conn}// 创建buffer容器StringBuffer stringBuffer = new StringBuffer()// 获取连接地址属性内容String url = TDUrl// 调用方法获取连接Connection conn = getConnect(url)PreparedStatement ps = null// 判断def jsonSlurper = new JsonSlurper()if (!flowFileList.isEmpty() ) {for ( i = 0; i < 2000; i ++) {String sql = "INSERT INTO "for (def j = i*50; j<((i+1)*50);j++) {flowFile = flowFileList[j];if ( flowFile == null ) {break};String text = '';session.read(flowFile, {inputStream ->text = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback);def data = jsonSlurper.parseText(text)assert data instanceof MapString tagName = data.TagFullNameString[] tagNameList = tagName.split("\\.")String newTagName = tagNameList[0]+"_"+tagNameList[1]String pointTime = data.TimeDouble valueDouble = data.ValueInteger type = data.TypeString valueSql = String.format("%s USING shmstb TAGS (%s,%s) VALUES ('%s',%s) ", newTagName, type, newTagName, pointTime, valueDouble )sql = sql.concat(valueSql)}if ( sql == "INSERT INTO ") {break}sql = sql.concat(";")ps = conn.prepareStatement(sql)ps.executeUpdate()stringBuffer.append(sql)stringBuffer.append("\r\n")}}// closeif ( conn != null ) { conn.close() }if ( ps != null ) { ps.close() }/*newFlowFile = session.create()session.write(newFlowFile, {outputStream ->outputStream.write(stringBuffer.toString().getBytes(StandardCharsets.UTF_8))} as OutputStreamCallback)session.transfer(newFlowFile, REL_SUCCESS)*///flowFile --> successsession.transfer(flowFileList,REL_FAILURE)
