- 注意
- 第一步 设置 checkpoint 时间
- 第二步 默认 DELETE_ON_CANCELLATION
- https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints">https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
- ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
- ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
- 第三步 根据保存的checkpoint恢复任务
- Maven依赖
- JAVA代码
- Kafka
获取kafka中的数据存入mysql
注意
kafka问题
- 第一次我在window中安装的出现了中文乱码问题
- 没有解决成功,我便替换成wsl进行安装
在wsl安装之后,flink等程序连接不上kafka
通过 sql client 进行创建任务 设置保存Checkpoint数据方式 (默认失败才会保存到磁盘,取消不会)
第二步 默认 DELETE_ON_CANCELLATION
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/ops/state/checkpoints/#retained-checkpoints
ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpoint
SET ‘execution.checkpointing.externalized-checkpoint-retention’ = ‘RETAIN_ON_CANCELLATION’;
第三步 根据保存的checkpoint恢复任务
SET ‘execution.savepoint.path’ = ‘/opt/flink/ck/549543d3e61cb3bc304caa2c2d9c28f9/chk-37’;
2. java的方式实现1. env.enableCheckpointing(3000);2. env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);2. '**value.format' = 'csv' 问题**1. 我使用csv格式发送kafka数据是时,mysql接收到的数据出现错误,超长,被截取,乱序等1. 改成json就好了<a name="Rfd4s"></a># Flink SQL准备```sqlSET execution.checkpointing.interval = 3s;CREATE TABLE kafka_source (id int,name STRING,sex STRING) WITH ('connector' = 'kafka','topic' = 'flink-cdc','properties.bootstrap.servers' = '192.168.0.51:9092','properties.group.id' = 'test-consumer-group','scan.startup.mode' = 'latest-offset','value.format' = 'json','value.json.fail-on-missing-field' = 'false','value.fields-include' = 'ALL');// latest-offset earliest-offset-- sinkCREATE TABLE mysql_sink (id INT,name STRING,sex STRING,primary key (id) not enforced) WITH ('connector' = 'jdbc','driver' = 'com.mysql.cj.jdbc.Driver','url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false','username' = 'root','password' = 'root','table-name' = 'mysql_sink');insert into mysql_sink select * from kafka_source;select * from kafka_source;select *from mysql_sink;
mysql中建表
CREATE DATABASE IF NOT EXISTS `flink` DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci ;CREATE TABLE `mysql_sink` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,`sex` varchar(255) COLLATE utf8mb4_general_ci DEFAULT NULL,PRIMARY KEY (`id`)) ENGINE=InnoDB AUTO_INCREMENT=908036287 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;
Maven依赖
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency><!-- This dependency is provided, because it should not be packaged into the JAR file. --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><!-- Add connector dependencies here. They must be in the default scope (compile). --><!-- Example:<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version></dependency>--><!-- Add logging framework, to produce console output when running in the IDE. --><!-- These dependencies are excluded from the application JAR by default. --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-slf4j-impl</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>${log4j.version}</version><scope>runtime</scope></dependency><!--flink cdc --><dependency><groupId>com.ververica</groupId><!-- add the dependency matching your database --><artifactId>flink-sql-connector-mysql-cdc</artifactId><!-- the dependency is available only for stable releases. --><version>2.1.1</version><scope>provided</scope></dependency><!--flink cdc --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>com.google.code.gson</groupId><artifactId>gson</artifactId><version>2.8.9</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency></dependencies>
JAVA代码
package cn.tannn;import org.apache.flink.streaming.api.CheckpointingMode;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.SqlDialect;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;/*** @author tn* @date 2022-02-09 09:58*/public class KafkaToMySQLWitchSQL {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 每隔1000 ms进行启动一个检查点【设置checkpoint的周期】env.enableCheckpointing(3000);// 高级选项:// 设置模式为exactly-once (这是默认值)env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);// 确保检查点之间有至少500 ms的间隔【checkpoint最小间隔】env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1500);// 检查点必须在一分钟内完成,或者被丢弃【checkpoint的超时时间】env.getCheckpointConfig().setCheckpointTimeout(60000);// 同一时间只允许进行一个检查点env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);// 表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint【详细解释见备注】//ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink处理程序被cancel后,会保留Checkpoint数据,以便根据实际需要恢复到指定的Checkpoint//ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink处理程序被cancel后,会删除Checkpoint数据,只有job执行失败的时候才会保存checkpointenv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.setParallelism(1);EnvironmentSettings Settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, Settings);tableEnv.getConfig().setSqlDialect(SqlDialect.DEFAULT);// 设置 jobNametableEnv.getConfig().getConfiguration().setString("pipeline.name", "table_sql_job");// latest-offset earliest-offsetString sourceDDL = "CREATE TABLE kafka_source ( " +" id int, " +" name STRING, " +" sex STRING " +") WITH ( " +" 'connector' = 'kafka', " +" 'topic' = 'flink-cdc', " +" 'properties.bootstrap.servers' = '192.168.0.51:9092', " +" 'properties.group.id' = 'test-consumer-group', " +" 'scan.startup.mode' = 'latest-offset', " +" 'value.format' = 'json', " +" 'value.json.fail-on-missing-field' = 'false', " +" 'value.fields-include' = 'ALL' " +")";String sinkDDL = "CREATE TABLE mysql_sink ( " +" id INT, " +" name STRING, " +" sex STRING, " +" primary key (id) not enforced " +") WITH ( " +" 'connector' = 'jdbc', " +" 'driver' = 'com.mysql.cj.jdbc.Driver', " +" 'url' = 'jdbc:mysql://192.168.0.51:3316/flink?serverTimezone=Asia/Shanghai&useSSL=false', " +" 'username' = 'root', " +" 'password' = 'root', " +" 'table-name' = 'mysql_sink' " +")";String transformDmlSQL = "insert into mysql_sink select * from kafka_source";tableEnv.executeSql(sourceDDL);tableEnv.executeSql(sinkDDL);tableEnv.executeSql(transformDmlSQL);// env.execute("KafkaToMySQLWitchSQL");}}
Kafka
安装启动
# STEP 1: GET KAFKA$ tar -xzf kafka_2.13-3.1.0.tgz$ cd kafka_2.13-3.1.0# STEP 2: START THE KAFKA ENVIRONMENT# > NOTE: Your local environment must have Java 8+ installed.# > Run the following commands in order to start all services in the correct order:# Start the ZooKeeper service# Note: Soon, ZooKeeper will no longer be required by Apache Kafka.$ bin/zookeeper-server-start.sh config/zookeeper.properties$ bin/windows/zookeeper-server-start.bat config/zookeeper.properties # windows# Open another terminal session and run:# Start the Kafka broker service (要等zookeeper启动好之后在执行)$ bin/kafka-server-start.sh config/server.properties$ bin/windows/kafka-server-start.bat config/server.properties # windows
创建 topic
# replication-factor 指定分区的副本数# partitions 指定分区数# bootstrap-server kafka集群多个逗号隔开bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdcbin/windows/kafka-topics.bat --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic flink-cdc
查看topict详情
# 查看指定的topicbin/kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic flink-cdcbin/windows/kafka-topics.bat --describe --bootstrap-server localhost:9092 --topic flink-cdc
查看kafka topic列表
bin/kafka-topics.sh --list --bootstrap-server localhost:9092bin/windows/kafka-topics.bat --list--bootstrap-server localhost:9092
查看consumer group列表
bin/kafka-consumer-groups.sh --list --bootstrap-server localhost:9092.\bin\windows\kafka-consumer-groups.bat --list --bootstrap-server localhost:9092# 查看详情.\bin\windows\kafka-consumer-groups.bat --bootstrap-server localhost:9092 --group test-consumer-group --describe
发/收数据
# 生产bin/kafka-console-producer.sh --topic flink-cdc --bootstrap-server localhost:9092# 消费bin/kafka-console-consumer.sh --topic flink-cdc --from-beginning --bootstrap-server localhost:9092
Kafka命令行发送数据
{"id": 1, "name":"谭宁1", "sex": "男"}{"id": 2, "name":"谭宁2", "sex": "男"}{"id": 3, "name":"谭宁3", "sex": "男"}{"id": 4, "name":"谭宁4", "sex": "男"}{"id": 5, "name":"谭宁5", "sex": "男"}{"id": 6, "name":"谭宁6", "sex": "男"}{"id": 6, "name":"谭宁6", "sex": "女"}
java 给kafka发送数据
代码
package cn.tan;import com.alibaba.fastjson.JSONObject;import com.github.javafaker.Faker;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerConfig;import org.apache.kafka.clients.producer.ProducerRecord;import org.apache.kafka.common.serialization.StringSerializer;import java.util.HashMap;import java.util.Locale;import java.util.Map;import java.util.Properties;import java.util.concurrent.ExecutionException;/*** 生产者** @author tn* @date 2022-02-11 09:25*/public class Producer {private static final Faker FAKER = new Faker(Locale.CHINA);public static void main(String[] args) throws ExecutionException, InterruptedException {Properties properties = new Properties();properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.0.51:9092");properties.put(ProducerConfig.ACKS_CONFIG,"all");properties.put(ProducerConfig.RETRIES_CONFIG,0);properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);properties.put(ProducerConfig.LINGER_MS_CONFIG,1);properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);// 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());KafkaProducer<String, String> producer = new KafkaProducer<>(properties);/*** topic: 消息队列的名称,可以先行在kafka服务中进行创建。如果kafka中并未创建该topic,那么便会自动创建!* key:键值,也就是value对应的值,和Map类似\* value:要发送的数据,数据格式为String类型的*/while(true){Thread.sleep(1000);Map<String, Object> ka = new HashMap<>();// decimalBetween(1L, 10L).intValue()ka.put("id",FAKER.number().numberBetween(10,1000000000));ka.put("name",FAKER.name().username());ka.put("sex",FAKER.address().city());String string = JSONObject.toJSONString(ka);producer.send(new ProducerRecord<>("flink-cdc", string));System.out.println(string);}}}
依赖
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.78</version></dependency><!-- 构造测试数据 --><dependency><groupId>com.github.javafaker</groupId><artifactId>javafaker</artifactId><version>0.17.2</version></dependency>
