一、源端
1、结构展示
1.1 外层
1.2 内层
2、PROCESS
2.1 ExecuteGroovyScript
a) 描述
b) SETTINGS
c) SCHEDULING
d) PROPERTIES

{"config": {"mode": "Cluster","userName": "","password": "Dsjpt@2021","jedisPoolConfig": {"maxIdle": "10","minIdle": "5","maxWaitMillis": "5000","soTimeout": "10000","maxTotal": "50","maxAttempts": "5"},"hostAndPortList": ["10.83.68.151:6379","10.83.68.152:6379","10.83.68.153:6379","10.83.68.154:6379","10.83.68.155:6379","10.83.68.156:6379","10.83.68.157:6379","10.83.68.158:6379","10.83.68.159:6379"]},"keyLocalFilePath": "/opt/dms_configs/zhoushan/redis_point.txt","keyList": ["HB_GD_CLPLCF_FJ_P1_L1_001_AI0001","HB_GD_CLPLCF_FJ_P1_L1_001_AI0002","HB_GD_CLPLCF_FJ_P1_L1_001_AI0003"]}
二、目标端
1、结构展示
1.1 外层
1.2 内层
2、PROCESS
2.1 NifiFLow
a) 描述
-
b) PROPERTIES
2.2 ExecuteGroovyScript
a) 描述
-
b) SETTINGS
c) SCHEDULING
d) PROPERTIES
三、附页
1、附1页
集群模式get脚本 ```groovy import groovy.json.JsonSlurper import groovy.json.JsonOutput import org.apache.commons.io.IOUtils import java.nio.charset.StandardCharsets import redis.clients.jedis.JedisPoolConfig import redis.clients.jedis.JedisCluster import redis.clients.jedis.HostAndPort;
/ { “config”: { “mode”: “Cluster”, “function”: “get”, “userName”: “”, “password”: “Dsjpt@2021”, “jedisPoolConfig”: { “maxIdle”: “10”, “minIdle”: “5”, “maxWaitMillis”: “5000”, “soTimeout”: “10000”, “maxTotal”: “50”, “maxAttempts”: “5” }, “hostAndPortList”: [ “10.83.68.151:6379”, “10.83.68.152:6379”, “10.83.68.153:6379”, “10.83.68.154:6379”, “10.83.68.155:6379”, “10.83.68.156:6379”, “10.83.68.157:6379”, “10.83.68.158:6379”, “10.83.68.159:6379” ] }, “keyLocalFilePath”: “/opt/dms_configs/zhoushan/redis_point.txt”, “keyList”: [ “HB_GD_CLPLCF_FJ_P1_L1_001_AI0001” ] } /
JsonSlurper jsonSlurper = new JsonSlurper(); def jsonInfo = jsonSlurper.parseText(Info as String); JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);
List
keyList.each {key-> String val = jedisCluster.get(key); if (val != null) { writeFlowFile(key, val) } }
jedisCluster.close(); JedisCluster getJedisClusterResource(def config) { def jedisConfig = config.config.jedisPoolConfig; //创建数据库连接池 JedisPoolConfig jedisPoolConfig = new JedisPoolConfig(); jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数 jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数 jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数 jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间
// 获取集群ip列表Set<HostAndPort> nodes = new HashSet<>();config.config.hostAndPortList.each {nodeStr ->String[] hostAndPort = nodeStr.split(":");nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));}// 获取配置Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);// 获取密码String password = config.config.password;// 返回连接对象return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);
}
def writeFlowFile(String key, String value) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(value.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.putAttribute(newFlowFile, ‘KEY’, key); session.transfer(newFlowFile, REL_SUCCESS); }
def writeFlowFile(String str) { newFlowFile = session.create(); session.write(newFlowFile, { outputStream -> outputStream.write(str.getBytes(StandardCharsets.UTF_8)) } as OutputStreamCallback); session.transfer(newFlowFile, REL_SUCCESS); }
List
// 创建流对象def fileReader = new FileReader(filePath)BufferedReader br = new BufferedReader(fileReader);// 定义字符串,保存读取的一行文字String key = null;// 循环读取,读取到最后返回nullwhile ((key = br.readLine())!=null) {keys.add(key);}fileReader.close();// 释放资源br.close();return keys;
}
2、附2页1. 集群模式set脚本```groovyimport groovy.json.JsonSlurperimport groovy.json.JsonOutputimport org.apache.commons.io.IOUtilsimport java.nio.charset.StandardCharsetsimport redis.clients.jedis.JedisPoolConfigimport redis.clients.jedis.JedisClusterimport redis.clients.jedis.HostAndPort;/*{"config": {"mode": "Cluster","function": "get","userName": "","password": "Dsjpt@2021","jedisPoolConfig": {"maxIdle": "10","minIdle": "5","maxWaitMillis": "5000","soTimeout": "10000","maxTotal": "50","maxAttempts": "5"},"hostAndPortList": ["10.83.68.151:6379","10.83.68.152:6379","10.83.68.153:6379","10.83.68.154:6379","10.83.68.155:6379","10.83.68.156:6379","10.83.68.157:6379","10.83.68.158:6379","10.83.68.159:6379"]}}*/JsonSlurper jsonSlurper = new JsonSlurper();//String json = Info;def jsonInfo = jsonSlurper.parseText(Info as String);JedisCluster jedisCluster = getJedisClusterResource(jsonInfo);def flowFileList = session.get(10000);if (flowFileList.isEmpty()) {session.transfer(flowFileList, REL_FAILURE);return;};for (flowFile in flowFileList) {if (flowFile != null) {jedisCluster.set(flowFile.getAttribute('KEY'), readFlowFile(flowFile));}}jedisCluster.close();session.transfer(flowFileList, REL_SUCCESS)JedisCluster getJedisClusterResource(def config) {def jedisConfig = config.config.jedisPoolConfig;//创建数据库连接池JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();jedisPoolConfig.setMaxIdle(Integer.valueOf(jedisConfig.maxIdle)); // 设置连接redis的最大空闲数jedisPoolConfig.setMinIdle(Integer.valueOf(jedisConfig.minIdle)); // 设置连接redis的最小空闲数jedisPoolConfig.setMaxTotal(Integer.valueOf(jedisConfig.maxTotal)); // 设置redis连接最大客户端数jedisPoolConfig.setMaxWaitMillis(Integer.valueOf(jedisConfig.maxWaitMillis)); // 设置连接redis-超时时间// 获取集群ip列表Set<HostAndPort> nodes = new HashSet<>();config.config.hostAndPortList.each {nodeStr ->String[] hostAndPort = nodeStr.split(":");nodes.add(new HostAndPort(hostAndPort[0], Integer.valueOf(hostAndPort[1])));}// 获取配置Integer connectionTimeout = Integer.valueOf(jedisConfig.maxWaitMillis);Integer soTimeout = Integer.valueOf(jedisConfig.soTimeout);Integer maxAttempts = Integer.valueOf(jedisConfig.maxAttempts);// 获取密码String password = config.config.password;// 返回连接对象return new JedisCluster(nodes, connectionTimeout, soTimeout, maxAttempts, password, jedisPoolConfig);}String readFlowFile(def flowFile) {String content = ""session.read(flowFile, { inputStream ->content = IOUtils.toString(inputStream, StandardCharsets.UTF_8)} as InputStreamCallback)return content}
