更新记录
| 时间 | 更新内容 | 更新人 |
|---|---|---|
| 2022-10-28 | 文档创建 | 付飞 |
Hello World
1. 引入脚本架依赖
<dependency><groupId>com.zxelec</groupId><artifactId>flink-scaffold</artifactId><version>1.0</version></dependency>
2. 定义启动类
package com.zxelec.demo;import com.zxelec.demo.process.SizeMap;import com.zxelec.scaffold.annotation.Flow;import com.zxelec.scaffold.annotation.Sink;import com.zxelec.scaffold.annotation.Source;import com.zxelec.scaffold.annotation.Stream;import com.zxelec.scaffold.annotation.Transform;import com.zxelec.scaffold.app.Application;import org.apache.flink.connector.kafka.source.KafkaSource;import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;@Flow(streams = {@Stream(sources = {@Source(sourceFunction = KafkaSource.class,configPrefix = "demo")},transforms = {@Transform(transformFunction = SizeMap.class)},sinks = {@Sink(sinkFunction = PrintSinkFunction.class)})})public class HelloWorldApplication extends Application {}
SizeMap.java
package com.zxelec.demo.process;import org.apache.flink.api.common.functions.MapFunction;public class SizeMap implements MapFunction<String, Integer> {@Overridepublic Integer map(String json) throws Exception {return json.length();}}
3. 定义配置文件
# 标识是否为必须及默认值app.name=HelloWorld# 基础环境配置env.checkpoint.interval=30env.checkpoint.directory=file:///Users/felix/Desktop/ckenv.restart.attempts=3env.restart.delay.interval=10# kafkademo.name=car_streamdemo.bootstrap.servers=192.168.1.249:9092demo.topic=car_streamdemo.group.id=demo_group_01demo.auto.offset.reset=earliestdemo.enable.auto.commit=false
4. 启动参数
--MainClass com.zxelec.demo.HelloWorldApplication --configPath ${application.properties文件路径}
完成以上步骤即可启动一个读取kafka数据,统计每条数据长度,输出打印至控制台的Flink应用程序!
注解标签
@Flow
@Flow用于定义一个Flink应用程序。 #### mainArgsConfig 通过启动参数mainArgs传入参数值的解析类,将参数值放入应用程序配置容器ParameterTool中,供配置型增强算子(ConfigMapFunction等)获取做业务处理。 解析类需要实现MainArgsConfig接口,接口定义如下:
@Flow(mainArgsConfig = MainArgsConfig.class,streams = {@Stream(),...})
java
/**
* @author: Felix
* @description: 解析main args传参
* @date: 15:45 2022/10/25
* @version: 1.0
*/
public interface MainArgsConfig {
/**
* 解析配置项
* @param args
* @return
*/
HashMap<String, String> process(String mainArgs);
}
#### streams
用于定义一个或多个Flink流处理节点@Stream(),脚本架解析时会根据定义顺序处理。当一个节点@Stream数据源依赖其它节点@Stream过程流时,请注意定义的顺序。
### @Stream
java
@Stream(
sources = {
@Source(),
...
},
transforms = {
@Transform(),
...
},
sinks = {
@Sink(),
...
}
)
@Stream用于定义一个Flink流处理节点
#### sources
用于定义一个或多个数据源@Source()
transforms
用于定义一个或多个数据转换算子@Transform()
sinks
用于定义一个或多个输出源@Sink()
@Source
@Source用于定义一个具体的数据源 #### sourceFunction 获取所需数据源class类型。脚本架提供了大部分的常用数据源,数据源class + 配置项的规则使用。如需自定义数据源,实现Flink框架提供的SourceFunction接口或其抽象子类即可。 #### configPrefix 用于数据源实例中区分获取需要的配置项。(注意:自定义数据源暂未支持此功能) #### streamName 所需数据源为某Flink处理节点的中间流标识名称。(@Transform注解定义的streamNames属性值为中间流标识名)
@Source(sourceFunction = KafkaSource.class,configPrefix = "",streamName = "")/*** KafkaSource为脚手架提供的数据源* 配置项为:* {configPrefix}.name=* {configPrefix}.bootstrap.servers=* {configPrefix}.topic=* {configPrefix}.group.id=* {configPrefix}.auto.offset.reset=* {configPrefix}.enable.auto.commit=**/
@Transform
@Transform(transformFunction = Map.class,configPrefix = {},streamNames = {},isEnable = {})
� @Transform用于定义一个转换算子。因一个转换算子可应用到多条数据源上,故其属性配置项常为数组形式,以便针对不同数据源提供准确的定义。
transformFunction
转换算子class类型。(见转换算子章节)
configPrefix
用于算子实例中获取需要的配置项。
streamNames
转换后的数据流标识名,当其它流处理节点需要使用时,可进行定义。
isEnable
在脚手架解析时,根据此配置读取应用配置${isEnable}.enable,判断是否启用该转换算子进行处理。(例:数据源为一个,isEable = {“map”},则此转换会读取应用配置项中 map.enable 判断是否进行流转换操作)
@Sink
@Sink用于定义一个具体的输出源。
@Sink(sinkFunction = PrintSinkFunction.classconfigPrefix = {})
sinkFunction
获取所需输出源class类型。目前脚手架只提供了JDBC连接方式,继承ConfigSinkFunction
�并实现SinkFactory接口,使用样例如下。
package com.zxelec.relation.component;import com.zxelec.relation.entity.Relation;import com.zxelec.scaffold.component.ConfigSinkFunction;import com.zxelec.scaffold.component.SinkFactory;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.connector.jdbc.JdbcConnectionOptions;import org.apache.flink.connector.jdbc.JdbcExecutionOptions;import org.apache.flink.connector.jdbc.JdbcSink;import org.apache.flink.streaming.api.functions.sink.SinkFunction;/*** @author: Felix* @description: 关系散存存储* @date: 15:45 2022/9/27* @version: 1.0*/public class RelationSink extends ConfigSinkFunction implements SinkFactory {private String ip;private String port;private String database;private String username;private String password;private Integer dataBatchSize;private Integer dataBatchIntervalMs;public RelationSink(ParameterTool parameterTool, String configPrefix) {super(parameterTool, configPrefix);this.ip = parameterTool.get("database.ip");this.port = parameterTool.get("database.port");this.database = parameterTool.get("database");this.username = parameterTool.get("database.user");this.password = parameterTool.get("database.password");this.dataBatchSize = parameterTool.getInt(configPrefix + ".data.batch.size");this.dataBatchIntervalMs = parameterTool.getInt(configPrefix + ".data.batch.interval.ms");}@Overridepublic SinkFunction createSinkFunction() {SinkFunction<Relation> sink = JdbcSink.sink("INSERT INTO relation VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",(statement, relation) -> {statement.setString(1, relation.getLayoutObjectId());statement.setString(2, relation.getAccompanyObjectId());statement.setLong(3, relation.getLayoutEventTime()/1000);statement.setLong(4, relation.getAccompanyEventTime()/1000);statement.setString(5, relation.getLayoutAddressId());statement.setString(6, relation.getAccompanyAddressId());statement.setString(7, relation.getLayoutObjectType());statement.setString(8, relation.getAccompanyObjectType());statement.setString(9, relation.getGid());statement.setString(10, relation.getLayoutRemark());statement.setString(11, relation.getAccompanyRemark());statement.setLong(12, System.currentTimeMillis()/1000);},JdbcExecutionOptions.builder().withBatchSize(dataBatchSize).withBatchIntervalMs(dataBatchIntervalMs).withMaxRetries(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:clickhouse://" + ip + ":" + port + "/" + database + "?autoReconnect=true&maxReconnects=5&initialTimeout=10&connectTimeout=30000&socketTimeout=30000").withDriverName("com.clickhouse.jdbc.ClickHouseDriver").withUsername(username).withPassword(password).build());return sink;}}
configPrefix
用于输出源实例中获取需要的配置项。
启动命令
—MainClass {应用启动类全类名} —configPath {配置文件绝对路径} —mainArgs {main参数}�样例:
—MainClass com.zxelec.demo.HelloWorldApplication —configPath /Users/felix/config/application.properties —mainArgs {\”name\”:”felix”}�
脚本架组件
Source
Kafka
sourceFunction = KafkaSource.class/*** {configPrefix}.name=car_stream //kafka流名称,用于定义flink查看指标的标识名* {configPrefix}.bootstrap.servers=192.168.1.249:9092* {configPrefix}.topic=car_stream* {configPrefix}.group.id=demo01* {configPrefix}.auto.offset.reset=earliest* {configPrefix}.enable.auto.commit=false*/
Transform
原生算子
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/dev/datastream/operators/overview/
Config增加型算子
config增加型算子在原生算子的基础上,可获取应用程序启动时加载的配置信息,包括应用环境配置及业务配置。使用时继承相应的配置算子即可。使用样例如下:
package com.zxelec.demo.process;import com.zxelec.scaffold.component.ConfigMapFunction;import org.apache.flink.api.java.utils.ParameterTool;public class DemoConfigMap extends ConfigMapFunction<String, String> {public DemoConfigMap(ParameterTool parameterTool, String configPrefix) {super(parameterTool, configPrefix);// parameterTool为应用程序配置容器// configPrefix为算子流程定义时的名称}@Overridepublic String map(String o) throws Exception {return null;}}
目前脚手架已支持的增加型算子有:
ConfigFilterFunction
�ConfigFlatMapFunction
�ConfigKeyedCoProcessFunction
�ConfigKeyedProcessFunction
�ConfigKeySelector
�ConfigMapFunction
�ConfigProcessJoinFunction
�ConfigSerializableTimestampAssigner
ConfigSinkFunction�
Sink
等待更新…
环境配置项
# 应用程序名称app.name=HelloWorld# checkpoint触发时间间隔env.checkpoint.interval=30# checkpoint存储目录env.checkpoint.directory=file:///Users/reality/Desktop/ck# 失败重试次数env.restart.attempts=3# 失败重试间隔env.restart.delay.interval=10# 等待更新...
