简介
FlinkKafkaConsumer为Flink消费Kafka数据的连接器。在Flink中的角色为数据源。
FlinkKafkaConsumer的继承结构
如下图所示:
我们发现FlinkKafkaConsumer继承自FlinkKafkaConsumerBase。FlinkKafkaConsumerBase又实现了SourceFunction和RichFunction接口。接下来我们重点分析它的open和run方法。
FlinkKafkaConsumerBase的open方法
该方法包含的内容为FlinkKafkaConsumer的初始化逻辑。
首先设置提交offset的模式
// determine the offset commit modethis.offsetCommitMode = OffsetCommitModes.fromConfiguration(getIsAutoCommitEnabled(),enableCommitOnCheckpoints,((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled());
OffsetCommitMode是一个枚举类型,具有如下三个值:
- DISABLED:完全禁用offset提交。
- ON_CHECKPOINTS:当checkpoint完成的时候再提交offset。
- KAFKA_PERIODIC:周期性提交offset。
判断OffsetCommitMode的逻辑封装在了OffsetCommitModes.fromConfiguration方法中。该方法的代码如下:
/*** Determine the offset commit mode using several configuration values.** @param enableAutoCommit whether or not auto committing is enabled in the provided Kafka properties.* @param enableCommitOnCheckpoint whether or not committing on checkpoints is enabled.* @param enableCheckpointing whether or not checkpoint is enabled for the consumer.** @return the offset commit mode to use, based on the configuration values.*/public static OffsetCommitMode fromConfiguration(boolean enableAutoCommit,boolean enableCommitOnCheckpoint,boolean enableCheckpointing) {if (enableCheckpointing) {// if checkpointing is enabled, the mode depends only on whether committing on checkpoints is enabledreturn (enableCommitOnCheckpoint) ? OffsetCommitMode.ON_CHECKPOINTS : OffsetCommitMode.DISABLED;} else {// else, the mode depends only on whether auto committing is enabled in the provided Kafka propertiesreturn (enableAutoCommit) ? OffsetCommitMode.KAFKA_PERIODIC : OffsetCommitMode.DISABLED;
这段代码逻辑可以总结为:
- 如果启用了checkpoint,并且启用了checkpoint完成时提交offset,返回ON_CHECKPOINTS。
- 如果未启用checkpoint,但是启用了自动提交,返回KAFKA_PERIODIC。
- 其他情况都返回DISABLED。 ```java // create the partition discoverer this.partitionDiscoverer = createPartitionDiscoverer( topicsDescriptor, getRuntimeContext().getIndexOfThisSubtask(), getRuntimeContext().getNumberOfParallelSubtasks()); this.partitionDiscoverer.open();
subscribedPartitionsToStartOffsets = new HashMap<>();
final List
这段代码中topicsDescriptor为fixedTopics和topicPattern的封装。其中fixedTopics明确指定了topic的名称,称为固定topic。topicPattern为匹配topic名称的正则表达式,用于分区发现。**createPartitionDiscoverer**方法创建了一个KafkaPartitionDiscoverer对象,主要负责Kafka分区发现。partitionDiscoverer.open()方法创建出一个KafkaConsumer。**subscribedPartitionsToStartOffsets** 为已订阅的分区列表,这里将它初始化。<br />partitionDiscoverer.discoverPartitions()用户获取所有fixedTopics和匹配topicPattern的Topic包含的所有分区信息。该部分代码稍后分析。接下来**open**方法的代码结构如下:```javaif (restoredState != null) {// 从快照恢复逻辑...} else {// 直接启动逻辑...}
如果consumer是从快照恢复的,restoredState不为空。反之restoredState为空。
我们首先分析一下从快照恢复的逻辑。代码如下:
// 如果restoredState没有存储某一分区的状态// 需要重头消费该分区for (KafkaTopicPartition partition : allPartitions) {if (!restoredState.containsKey(partition)) {restoredState.put(partition, KafkaTopicPartitionStateSentinel.EARLIEST_OFFSET);}}for (Map.Entry<KafkaTopicPartition, Long> restoredStateEntry : restoredState.entrySet()) {// seed the partition discoverer with the union state while filtering out// restored partitions that should not be subscribed by this subtask// 此处可过滤掉不归该task负责的kafka分区if (KafkaTopicPartitionAssigner.assign(restoredStateEntry.getKey(), getRuntimeContext().getNumberOfParallelSubtasks())== getRuntimeContext().getIndexOfThisSubtask()){subscribedPartitionsToStartOffsets.put(restoredStateEntry.getKey(), restoredStateEntry.getValue());}}// 依照分区发现配置的topic正则表达式过滤分区if (filterRestoredPartitionsWithCurrentTopicsDescriptor) {// 过滤掉topic名称不符合topicsDescriptor的topicPattern的分区subscribedPartitionsToStartOffsets.entrySet().removeIf(entry -> {if (!topicsDescriptor.isMatchingTopic(entry.getKey().getTopic())) {LOG.warn("{} is removed from subscribed partitions since it is no longer associated with topics descriptor of current execution.",entry.getKey());return true;}return false;});}
接下来我们分析下Consumer直接启动的逻辑(不从快照恢复)。
在此之前需要了解下StartupMode这个枚举类型。该枚举类型有5个值:
- GROUP_OFFSETS:从保存在zookeeper或者是Kafka broker的对应消费者组提交的offset开始消费,这个是默认的配置
- EARLIEST:尽可能从最早的offset开始消费
- LATEST:从最近的offset开始消费
- TIMESTAMP:从用户提供的timestamp处开始消费
- SPECIFIC_OFFSETS:从用户提供的offset处开始消费
然后,Comsumer使用分区发现工具来获取初始的分区。根据StartupMode来设置它们的起始消费offset。
我们先看SPECIFIC_OFFSETS这种情况。
case SPECIFIC_OFFSETS:// 如果没有配置具体从哪个offset开始消费,程序抛出异常if (specificStartupOffsets == null) {throw new IllegalStateException("Startup mode for the consumer set to " + StartupMode.SPECIFIC_OFFSETS +", but no specific offsets were specified.");}for (KafkaTopicPartition seedPartition : allPartitions) {// 获取每个分区指定的消费起始offsetLong specificOffset = specificStartupOffsets.get(seedPartition);if (specificOffset != null) {// since the specified offsets represent the next record to read, we subtract// it by one so that the initial state of the consumer will be correct// 如果分区配置了offset,设置从offset开始消费subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);} else {// default to group offset behaviour if the user-provided specific offsets// do not contain a value for this partition// 如果分区没有配置offset,设置从GROUP_OFFSET开始消费subscribedPartitionsToStartOffsets.put(seedPartition, KafkaTopicPartitionStateSentinel.GROUP_OFFSET);}}break;
如果采用TIMESTAMP模式,逻辑如下所示:
case TIMESTAMP:// 如果没有配置timestamp,程序报错退出if (startupOffsetsTimestamp == null) {throw new IllegalStateException("Startup mode for the consumer set to " + StartupMode.TIMESTAMP +", but no startup timestamp was specified.");}// 根据timestamp获取分区的offset// 遍历这些分区for (Map.Entry<KafkaTopicPartition, Long> partitionToOffset: fetchOffsetsWithTimestamp(allPartitions, startupOffsetsTimestamp).entrySet()) {// 如果无offset,使用LATEST_OFFSET// 如果获取到了offset,从这个offset开始消费subscribedPartitionsToStartOffsets.put(partitionToOffset.getKey(),(partitionToOffset.getValue() == null)// if an offset cannot be retrieved for a partition with the given timestamp,// we default to using the latest offset for the partition? KafkaTopicPartitionStateSentinel.LATEST_OFFSET// since the specified offsets represent the next record to read, we subtract// it by one so that the initial state of the consumer will be correct: partitionToOffset.getValue() - 1);}break;// 其他情况,使用KafkaTopicPartitionStateSentinel类对应的值作为offsetdefault:for (KafkaTopicPartition seedPartition : allPartitions) {subscribedPartitionsToStartOffsets.put(seedPartition, startupMode.getStateSentinel());}
接下来的if语句段负责打印一些日志信息。这里就不再分析了。
FlinkKafkaConsumerBase的run方法
run方法包含了从KafkaConsumer消费数据,和向Flink下游发送数据的逻辑。
首先检查open方法中初始化的subscribedPartitionsToStartOffsets是否为null。
if (subscribedPartitionsToStartOffsets == null) {throw new Exception("The partitions were not set for the consumer");}
接下来配置成功commit和失败commit数量的监控。
// initialize commit metrics and default offset callback method// 设置成功提交计数监控this.successfulCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_SUCCEEDED_METRICS_COUNTER);// 设置失败提交计数监控this.failedCommits = this.getRuntimeContext().getMetricGroup().counter(COMMITS_FAILED_METRICS_COUNTER);// 获取子任务indexfinal int subtaskIndex = this.getRuntimeContext().getIndexOfThisSubtask();// 注册一个提交时的回调函数this.offsetCommitCallback = new KafkaCommitCallback() {@Overridepublic void onSuccess() {// 提交成功,成功提交计数器加一successfulCommits.inc();}@Overridepublic void onException(Throwable cause) {LOG.warn(String.format("Consumer subtask %d failed async Kafka commit.", subtaskIndex), cause);// 提交失败,失败提交计数器加一failedCommits.inc();}};
接下来判断subscribedPartitionsToStartOffsets集合是否为空。如果为空,标记数据源的状态为暂时空闲。
if (subscribedPartitionsToStartOffsets.isEmpty()) {sourceContext.markAsTemporarilyIdle();}
下面是获取数据的过程。这里创建了一个KafkaFetcher,负责借助KafkaConsumer API从Kafka broker获取数据。
this.kafkaFetcher = createFetcher(sourceContext,subscribedPartitionsToStartOffsets,watermarkStrategy,(StreamingRuntimeContext) getRuntimeContext(),offsetCommitMode,getRuntimeContext().getMetricGroup().addGroup(KAFKA_CONSUMER_METRICS_GROUP),useMetrics);
接下来检测running变量的状态。如果没有running,直接返回。
if (!running) {return;}
最后是根据分区发现间隔时间的配置来确定是否启动分区的定时发现任务。
if (discoveryIntervalMillis == PARTITION_DISCOVERY_DISABLED) {// 直接启动获取数据任务kafkaFetcher.runFetchLoop();} else {// 否则,启动定期分区发现任务和数据获取任务runWithPartitionDiscovery();}
最后我们分析下runWithPartitionDiscovery方法。代码如下:
private void runWithPartitionDiscovery() throws Exception {final AtomicReference<Exception> discoveryLoopErrorRef = new AtomicReference<>();// 启动分区发现定时任务createAndStartDiscoveryLoop(discoveryLoopErrorRef);// 启动kafka broker数据获取任务kafkaFetcher.runFetchLoop();// make sure that the partition discoverer is waked up so that// the discoveryLoopThread exits// 使partitionDiscoverer.discoverPartitions()抛异常// 能够从discoveryLoopThread 返回partitionDiscoverer.wakeup();// 等待discoveryLoopThread 执行完毕joinDiscoveryLoopThread();// rethrow any fetcher errorsfinal Exception discoveryLoopError = discoveryLoopErrorRef.get();if (discoveryLoopError != null) {throw new RuntimeException(discoveryLoopError);}}
我们再跟踪下看看如何启动分区发现定时任务的。
private void createAndStartDiscoveryLoop(AtomicReference<Exception> discoveryLoopErrorRef) {// 创建一个discoveryLoop线程discoveryLoopThread = new Thread(() -> {try {// --------------------- partition discovery loop ---------------------// throughout the loop, we always eagerly check if we are still running before// performing the next operation, so that we can escape the loop as soon as possiblewhile (running) {if (LOG.isDebugEnabled()) {LOG.debug("Consumer subtask {} is trying to discover new partitions ...", getRuntimeContext().getIndexOfThisSubtask());}final List<KafkaTopicPartition> discoveredPartitions;try {// 尝试发现新分区,如果方法抛出异常,退出循环discoveredPartitions = partitionDiscoverer.discoverPartitions();} catch (AbstractPartitionDiscoverer.WakeupException | AbstractPartitionDiscoverer.ClosedException e) {// the partition discoverer may have been closed or woken up before or during the discovery;// this would only happen if the consumer was canceled; simply escape the loopbreak;}// no need to add the discovered partitions if we were closed during the meantime// 如果没有发现新的分区,或者数据源已关闭之时,没必要再添加新分区if (running && !discoveredPartitions.isEmpty()) {kafkaFetcher.addDiscoveredPartitions(discoveredPartitions);}// do not waste any time sleeping if we're not running anymoreif (running && discoveryIntervalMillis != 0) {try {// 睡眠discoveryIntervalMillis时间Thread.sleep(discoveryIntervalMillis);} catch (InterruptedException iex) {// may be interrupted if the consumer was canceled midway; simply escape the loopbreak;}}}} catch (Exception e) {discoveryLoopErrorRef.set(e);} finally {// calling cancel will also let the fetcher loop escape// (if not running, cancel() was already called)if (running) {cancel();}}}, "Kafka Partition Discovery for " + getRuntimeContext().getTaskNameWithSubtasks());// 启动分区发现定时任务线程discoveryLoopThread.start();}
我们再详细研究下上述方法中partitionDiscoverer.discoverPartitions()的调用,即发现分区的执行过程。代码如下:
public List<KafkaTopicPartition> discoverPartitions() throws WakeupException, ClosedException {// 确保没有关闭数据源,也没有wakeupif (!closed && !wakeup) {try {List<KafkaTopicPartition> newDiscoveredPartitions;// (1) get all possible partitions, based on whether we are subscribed to fixed topics or a topic pattern// 如果配置了fixedTopic,获取这些topic的分区if (topicsDescriptor.isFixedTopics()) {newDiscoveredPartitions = getAllPartitionsForTopics(topicsDescriptor.getFixedTopics());} else {// 如果没有配置fixedTopic// 1. 获取所有topicList<String> matchedTopics = getAllTopics();// retain topics that match the pattern// 2. 逐个排除名字不是fixedTopic,或名字不匹配topicPattern的topicIterator<String> iter = matchedTopics.iterator();while (iter.hasNext()) {if (!topicsDescriptor.isMatchingTopic(iter.next())) {iter.remove();}}if (matchedTopics.size() != 0) {// get partitions only for matched topics// 3. 如果有匹配的topic,获取他们的分区newDiscoveredPartitions = getAllPartitionsForTopics(matchedTopics);} else {// 否则newDiscoveredPartitions 设置为nullnewDiscoveredPartitions = null;}}// (2) eliminate partition that are old partitions or should not be subscribed by this subtaskif (newDiscoveredPartitions == null || newDiscoveredPartitions.isEmpty()) {throw new RuntimeException("Unable to retrieve any partitions with KafkaTopicsDescriptor: " + topicsDescriptor);} else {Iterator<KafkaTopicPartition> iter = newDiscoveredPartitions.iterator();KafkaTopicPartition nextPartition;while (iter.hasNext()) {nextPartition = iter.next();// 分区存入discoveredPartitions集合中// 返回值为分区是否归当前task消费if (!setAndCheckDiscoveredPartition(nextPartition)) {iter.remove();}}}return newDiscoveredPartitions;} catch (WakeupException e) {// the actual topic / partition metadata fetching methods// may be woken up midway; reset the wakeup flag and rethrowwakeup = false;throw e;}} else if (!closed && wakeup) {// may have been woken up before the method callwakeup = false;throw new WakeupException();} else {throw new ClosedException();}}
kafkaFetcher的runFetchLoop方法
此方法为FlinkKafkaConsumer获取数据的主入口,通过一个循环来不断获取kafka broker的数据。
public void runFetchLoop() throws Exception {try {// kick off the actual Kafka consumer// 启动kafka消费线程,定期从kafkaConsumer拉取数据并转交给handover对象consumerThread.start();while (running) {// this blocks until we get the next records// it automatically re-throws exceptions encountered in the consumer thread// 获取handover中的数据// 如果此时consumerThread尚未把数据交给handover,该方法会阻塞final ConsumerRecords<byte[], byte[]> records = handover.pollNext();// get the records for each topic partitionfor (KafkaTopicPartitionState<T, TopicPartition> partition : subscribedPartitionStates()) {// 获取属于该分区的recordsList<ConsumerRecord<byte[], byte[]>> partitionRecords =records.records(partition.getKafkaPartitionHandle());partitionConsumerRecordsHandler(partitionRecords, partition);}}}finally {// this signals the consumer thread that no more work is to be doneconsumerThread.shutdown();}// on a clean exit, wait for the runner threadtry {consumerThread.join();}catch (InterruptedException e) {// may be the result of a wake-up interruption after an exception.// we ignore this here and only restore the interruption stateThread.currentThread().interrupt();}}
此方法中的collect kafka数据的逻辑在partitionConsumerRecordsHandler中。我们查看下它的代码:
protected void partitionConsumerRecordsHandler(List<ConsumerRecord<byte[], byte[]>> partitionRecords,KafkaTopicPartitionState<T, TopicPartition> partition) throws Exception {// 反序列化Kafka record为bean// 此deserializer需要实现KafkaDeserializationSchema接口for (ConsumerRecord<byte[], byte[]> record : partitionRecords) {deserializer.deserialize(record, kafkaCollector);// emit the actual records. this also updates offset state atomically and emits// watermarks// 发送数据,更新offset,生成timestamp和watermarkemitRecordsWithTimestamps(kafkaCollector.getRecords(),partition,record.offset(),record.timestamp());// 如果数据源已到末尾,停止fetcher循环if (kafkaCollector.isEndOfStreamSignalled()) {// end of stream signaledrunning = false;break;}}}
它调用了emitRecordWithTimestamp方法,继续查看。
protected void emitRecordsWithTimestamps(Queue<T> records,KafkaTopicPartitionState<T, KPH> partitionState,long offset,long kafkaEventTimestamp) {// emit the records, using the checkpoint lock to guarantee// atomicity of record emission and offset state updatesynchronized (checkpointLock) {T record;while ((record = records.poll()) != null) {// 此处调用SourceFunction中的sourceContext// 数据源收集元素逻辑在此long timestamp = partitionState.extractTimestamp(record, kafkaEventTimestamp);sourceContext.collectWithTimestamp(record, timestamp);// this might emit a watermark, so do it after emitting the record// 可能定时发送水位线partitionState.onEvent(record, timestamp);}partitionState.setOffset(offset);}}
KafkaConsumerThread
KafkaConsumerThread负责在单独的线程中从Kafka中拉取数据到handover。这里我们分析下它的run方法中获取数据的部分。
// main fetch loopwhile (running) {// check if there is something to commit// 检查是否则commit过程中if (!commitInProgress) {// get and reset the work-to-be committed, so we don't repeatedly commit the same// 获取需要提交的offset值,以及commit回调函数// 获取完毕之后需要设置为null,防止反复提交final Tuple2<Map<TopicPartition, OffsetAndMetadata>, KafkaCommitCallback> commitOffsetsAndCallback =nextOffsetsToCommit.getAndSet(null);if (commitOffsetsAndCallback != null) {log.debug("Sending async offset commit request to Kafka broker");// also record that a commit is already in progress// the order here matters! first set the flag, then send the commit command.// 开始提交过程commitInProgress = true;// 异步提交offsetconsumer.commitAsync(commitOffsetsAndCallback.f0, new CommitCallback(commitOffsetsAndCallback.f1));}}// 为consumer指定新的分区// 由于分区发现功能的存在,consumer需要添加新发现的分区,否则poll数据会报错try {if (hasAssignedPartitions) {newPartitions = unassignedPartitionsQueue.pollBatch();}else {// if no assigned partitions block until we get at least one// instead of hot spinning this loop. We rely on a fact that// unassignedPartitionsQueue will be closed on a shutdown, so// we don't block indefinitelynewPartitions = unassignedPartitionsQueue.getBatchBlocking();}if (newPartitions != null) {reassignPartitions(newPartitions);}} catch (AbortedReassignmentException e) {continue;}if (!hasAssignedPartitions) {// Without assigned partitions KafkaConsumer.poll will throw an exceptioncontinue;}// get the next batch of records, unless we did not manage to hand the old batch overif (records == null) {try {// 从consumer拉取数据// 这里的pollTimeout可以通过配置flink.poll-timeout参数修改// pollTimeout默认值为100msrecords = consumer.poll(pollTimeout);}catch (WakeupException we) {continue;}}try {// 将数据交给handoverhandover.produce(records);records = null;}catch (Handover.WakeupException e) {// fall through the loop}}// end main fetch loop}catch (Throwable t) {// let the main thread know and exit// it may be that this exception comes because the main thread closed the handover, in// which case the below reporting is irrelevant, but does not hurt eitherhandover.reportError(t);}finally {// make sure the handover is closed if it is not already closed or has an errorhandover.close();// make sure the KafkaConsumer is closedtry {consumer.close();}catch (Throwable t) {log.warn("Error while closing Kafka consumer", t);}}
checkpoint流程
checkpoint流程大部分代码为状态的读写。这里为大家总结下主要的流程,不分析具体的代码。
snapshotState方法
FlinkKafkaConsumerBase的snapshotState方法包含snapshot的流程。包含如下:
- 如果KafkaFetcher尚未初始化完毕。需要保存已订阅的topic连同他们的初始offset。
- 如果KafkaFetcher已初始化完毕,调用fetcher的snapshotCurrentState方法。
- 如果offsetCommitMode为ON_CHECKPOINTS类型,还需要将topic和offset写入到pendingOffsetsToCommit集合中。该集合用于checkpoint成功的时候向Kafka broker提交offset。(offsetCommitMode不为ON_CHECKPOINTS和DISABLED的时候,使用的是自动提交offset的模式)
notifyCheckpointComplete方法
在所有的operator都快照成功的时候,会向JobManager的CheckpointCoordinator发送确认消息,然后coordinator会通知各个operator checkpoint已经完成。(详细请参见Flink 源码之快照
)为了保证保证数据不会被遗漏和重复消费,ON_CHECKPOINTS模式运行的FlinkKafkaConsumer只能在这个时候提交offset到kafka consumer。调用notifyCheckpointComplete的时候通知kafka consumer,将checkpoint之时保存的各个分区的offset提交给kafka broker。从而保证数据的一致性。
