本篇我们一起分析下Flink中流处理作业的初始化和执行逻辑。
AbstractInvokable
AbstractInvokable是TaskManager中运行的所有任务的父类。所有的读取上游数据,用户数据处理逻辑(map,filter算子以及用户自己编写的processFunction等等)和发送处理过的数据到下游相关逻辑都在该类的invoke方法中得到执行。
AbstractInvokable中与任务执行相关的2个方法为:
- invoke方法:启动任务执行的入口方法。实现类必须重写这个方法。
- cancel方法:任务被取消或者是用户终止任务的时候被调用
它有两个实现类:
- BatchTask:所有批处理类型Task的基类。
- StreamTask:所有流处理类型Task的基类。
我们以流处理为重点,下面详细介绍下StreamTask这个类。
AbstractInvokable的创建
在开始分析StreamTask之前我们需要了解下它是在何处,如何被创建出来的。
翻阅Task线程的处理逻辑,不难发现它的invoke变量初始化位于Task的doRun方法。
// now load and instantiate the task's invokable codeinvokable = loadAndInstantiateInvokable(userCodeClassLoader.asClassLoader(),nameOfInvokableClass, env);
这一行代码使用用户代码类加载器(userCodeClassLoader),调用目标类唯一参数为Environment类型的构造方法,创建出invokable对象。
/*** Instantiates the given task invokable class, passing the given environment (and possibly* the initial task state) to the task's constructor.** <p>The method will first try to instantiate the task via a constructor accepting both* the Environment and the TaskStateSnapshot. If no such constructor exists, and there is* no initial state, the method will fall back to the stateless convenience constructor that* accepts only the Environment.** @param classLoader The classloader to load the class through.* @param className The name of the class to load.* @param environment The task environment.** @return The instantiated invokable task object.** @throws Throwable Forwards all exceptions that happen during initialization of the task.* Also throws an exception if the task class misses the necessary constructor.*/private static AbstractInvokable loadAndInstantiateInvokable(ClassLoader classLoader,String className,Environment environment) throws Throwable {final Class<? extends AbstractInvokable> invokableClass;try {// 使用指定的classloader加载className对应的class,并转换为AbstractInvokable类型invokableClass = Class.forName(className, true, classLoader).asSubclass(AbstractInvokable.class);} catch (Throwable t) {throw new Exception("Could not load the task's invokable class.", t);}Constructor<? extends AbstractInvokable> statelessCtor;try {// 获取构造函数statelessCtor = invokableClass.getConstructor(Environment.class);} catch (NoSuchMethodException ee) {throw new FlinkException("Task misses proper constructor", ee);}// instantiate the classtry {//noinspection ConstantConditions --> cannot happen// 传入environment变量,创建出新的对象return statelessCtor.newInstance(environment);} catch (InvocationTargetException e) {// directly forward exceptions from the eager initializationthrow e.getTargetException();} catch (Exception e) {throw new FlinkException("Could not instantiate the task's invokable class.", e);}}
StreamTask
StreamTask类是所有流处理任务的基类。Task由TaskManager部署和执行。Task是本地运行单元。每一个Task包含了一个或多个operator。这些operator在同一个OperatorChain中。
StreamTask任务执行生命周期包含:
- setInitialState:设置各个operator的初始状态。对应initializeState方法。
- 调用 invoke方法。
其中invoke方法包含的逻辑可细分为:
- 创建出task相关配置,创建OperatorChain。
- 执行operator的setup逻辑。
- 执行task相关的初始化逻辑。
- 加载并初始化operator的状态。
- 调用各个operator的open方法。
- 执行各个operator内的数据处理逻辑。
- 关闭operator。
- 销毁operator。
- 任务清理操作。
invoke方法
本节我们分析StreamTask核心执行逻辑invoke方法。invoke方法如下所示:
/*** Starts the execution.** <p>Must be overwritten by the concrete task implementation. This method* is called by the task manager when the actual execution of the task* starts.** <p>All resources should be cleaned up when the method returns. Make sure* to guard the code with <code>try-finally</code> blocks where necessary.** @throws Exception* Tasks may forward their exceptions for the TaskManager to handle through failure/recovery.*/public final void invoke() throws Exception {try {// 调用作业执行前相关准备逻辑beforeInvoke();// final check to exit early before starting to run// 如果任务被取消,抛出异常退出if (canceled) {throw new CancelTaskException();}// let the task do its work// 执行用户编写的task逻辑runMailboxLoop();// if this left the run() method cleanly despite the fact that this was canceled,// make sure the "clean shutdown" is not attempted// 再次检查如果任务被取消,抛出异常退出if (canceled) {throw new CancelTaskException();}// 执行调用后相关逻辑afterInvoke();}catch (Throwable invokeException) {failing = !canceled;try {cleanUpInvoke();}// TODO: investigate why Throwable instead of Exception is used here.catch (Throwable cleanUpException) {Throwable throwable = ExceptionUtils.firstOrSuppressed(cleanUpException, invokeException);ExceptionUtils.rethrowException(throwable);}ExceptionUtils.rethrowException(invokeException);}// 执行invoke后清理操作cleanUpInvoke();}
beforeInvoke方法
beforeInvoke方法主要为task的初始化操作,包含创建OperatorChain,读取上游数据和下游数据输出配置等。详细内容如下:
protected void beforeInvoke() throws Exception {disposedOperators = false;LOG.debug("Initializing {}.", getName());// 创建出OperatorChain// OperatorChain是JobGraph生成时的一箱优化措施// 将复合条件的多个StreamNode(对应数据变换操作)合并到一个chain中// 他们会被调度到同一个StreamTask中执行operatorChain = new OperatorChain<>(this, recordWriter);// 获取OperatorChain中第一个operatormainOperator = operatorChain.getMainOperator();// task specific initialization// 执行task专属的初始化工作// 这个是抽象方法// 具体逻辑需要在子类中实现init();// save the work of reloading state, etc, if the task is already canceledif (canceled) {throw new CancelTaskException();}// -------- Invoke --------LOG.debug("Invoking {}", getName());// we need to make sure that any triggers scheduled in open() cannot be// executed before all operators are opened// task动作必须在StreamTaskActionExecutor中执行,防止出现并发执行问题,影响checkpoint// 该executor实际为StreamTaskActionExecutor.IMMEDIATE,即在当前线程直接运行actionExecutor.runThrowing(() -> {// 创建SequentialChannelStateReader,用于读取checkpoint时保存的channel状态SequentialChannelStateReader reader = getEnvironment().getTaskStateManager().getSequentialChannelStateReader();// TODO: for UC rescaling, reenable notifyAndBlockOnCompletion for non-iterative jobs// 获取ResultPartitionWriter状态reader.readOutputData(getEnvironment().getAllWriters(), false);// 初始化OperatorChain中所有的operator// 调用他们的initializeState(初始化状态)和open(包含初始化动作)方法operatorChain.initializeStateAndOpenOperators(createStreamTaskStateInitializer());channelIOExecutor.execute(() -> {try {// 获取InputGate状态reader.readInputData(getEnvironment().getAllInputGates());} catch (Exception e) {asyncExceptionHandler.handleAsyncException("Unable to read channel state", e);}});for (InputGate inputGate : getEnvironment().getAllInputGates()) {// 在inputGate状态被读取之后执行inputGate.getStateConsumedFuture().thenRun(() ->// 在task线程中执行mainMailboxExecutor.execute(// 执行请求partition方法inputGate::requestPartitions, "Input gate request partitions"));}});// 水池状态为正在执行isRunning = true;}
runMailboxLoop方法
runMailboxLoop方法启动task的数据输入和处理逻辑:
public void runMailboxLoop() throws Exception {mailboxProcessor.runMailboxLoop();}
MailBoxProcessor在StreamTask的构造函数中创建出来:
this.mailboxProcessor = new MailboxProcessor(this::processInput,mailbox, actionExecutor);
mailboxProcessor.runMailboxLoop()方法可以理解为在actionExecutor线程池执行processInput方法。
processInput方法从上游(StreamTaskNetworkInput,InputGate)读取数据。这部分逻辑参见Flink 源码之节点间通信。
afterInvoke
afterInvoke方法内容如下,概括起来为task执行完毕后的清理工作,关闭operator等。
protected void afterInvoke() throws Exception {LOG.debug("Finished task {}", getName());getCompletionFuture().exceptionally(unused -> null).join();final CompletableFuture<Void> timersFinishedFuture = new CompletableFuture<>();// close all operators in a chain effect way// 关闭OperatorChain中所有的operator// 从前向后依次调用各个operator的close方法operatorChain.closeOperators(actionExecutor);// make sure no further checkpoint and notification actions happen.// at the same time, this makes sure that during any "regular" exit where stillactionExecutor.runThrowing(() -> {// make sure no new timers can come// 停止timer服务FutureUtils.forward(timerService.quiesce(), timersFinishedFuture);// let mailbox execution reject all new letters from this point// 准备关闭mailboxProcessor,不再接受新的事件mailboxProcessor.prepareClose();// only set the StreamTask to not running after all operators have been closed!// See FLINK-7430// 设置task状态为停止isRunning = false;});// processes the remaining mails; no new mails can be enqueued// 处理积压的事件mailboxProcessor.drain();// make sure all timers finish// 等待所有的time都停止timersFinishedFuture.get();LOG.debug("Closed operators for task {}", getName());// make sure all buffered data is flushed// 处理掉buffer中的所有数据operatorChain.flushOutputs();// make an attempt to dispose the operators such that failures in the dispose call// still let the computation fail// 依次废弃掉OperatorChain中的所有operator(顺序为从头到尾)disposeAllOperators();}
StreamTask的子类
StreamTask是所有流处理计算任务的父类,它本身是一个抽象类。为了处理不同类型的StreamOperator,StreamTask有多种不同的实现。几个典型的实现如下:
- OneInputStreamTask:处理OneInputStreamOperator,即只有一个输入流的StreamOperator。
- TwoInputStreamTask:处理TwoInputStreamOperator,具有2个输入流。
- MultipleInputStreamTask:处理MultipleInputStreamOperator,具有多个输入流。
- SourceStreamTask:处理StreamSource,即数据源。
OneInputStreamTask的init方法
它的init方法主要流程为创建网络输入与输出,创建inputProcessor用于从网络输入读取数据,反序列化之后传递给网络输出。最后初始化数据流监控。代码和分析如下:
public void init() throws Exception {// 获取作业流配置StreamConfig configuration = getConfiguration();// 获取网络输入流数量int numberOfInputs = configuration.getNumberOfNetworkInputs();if (numberOfInputs > 0) {// 创建一个CheckpointedInputGate// 该类型InputGate拥有一个CheckpointBarrierHandler,用来处理接收到的CheckpointBarrierCheckpointedInputGate inputGate = createCheckpointedInputGate();// 监控相关,设置流入数据条数计数器Counter numRecordsIn = setupNumRecordsInCounter(mainOperator);// 创建StreamTaskNetworkOutput// 发送反序列化后的数据给task处理流程DataOutput<IN> output = createDataOutput(numRecordsIn);// 创建StreamTaskNetworkInput// 包装了CheckpointedInputGate,从中读取网络接收到的原始数据并发给反序列化器StreamTaskInput<IN> input = createTaskInput(inputGate);// 读取输入流配置// 如果要求对数据排序// 含义为数据按照key字段分组// 在一段时间内只会给task提供同一分组的数据// 不同组的数据不会频繁交替出现if (configuration.shouldSortInputs()) {checkState(!configuration.isCheckpointingEnabled(), "Checkpointing is not allowed with sorted inputs.");input = wrapWithSorted(input);}// 注册流入数据条数计数器监控getEnvironment().getMetricGroup().getIOMetricGroup().reuseRecordsInputCounter(numRecordsIn);// 创建inputProcessor// 从网络读取数据,反序列化后给output,然后把反序列化后的数据交给OperatorChaininputProcessor = new StreamOneInputProcessor<>(input,output,operatorChain);}// 创建watermark监控mainOperator.getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge);// wrap watermark gauge since registered metrics must be uniquegetEnvironment().getMetricGroup().gauge(MetricNames.IO_CURRENT_INPUT_WATERMARK, this.inputWatermarkGauge::getValue);}
其中创建CheckpointedInputGate的过程在 Flink 源码之分布式快照 有介绍,请大家查阅。
TwoInputStreamTask的init方法
它的初始化方法和OneInputStreamTask的类似,只不过需要创建两个InputGate。TwoInputStreamTask对应CoOperator,即有两个输入流的operator(比如CoFlatmap)。
SourceStreamTask的init方法
protected void init() {// we check if the source is actually inducing the checkpoints, rather// than the trigger// 获取数据源数据产生逻辑SourceFunctionSourceFunction<?> source = mainOperator.getUserFunction();// 如果source实现了这个接口,说明接收到CheckpointCoordinator发来的触发checkpoint消息之时source不触发checkpoint// checkpoint的触发由输入数据控制if (source instanceof ExternallyInducedSource) {externallyInducedCheckpoints = true;// 创建checkpoint触发钩子ExternallyInducedSource.CheckpointTrigger triggerHook = new ExternallyInducedSource.CheckpointTrigger() {@Overridepublic void triggerCheckpoint(long checkpointId) throws FlinkException {// TODO - we need to see how to derive those. We should probably not encode this in the// TODO - source's trigger message, but do a handshake in this task between the trigger// TODO - message from the master, and the source's trigger notificationfinal CheckpointOptions checkpointOptions = CheckpointOptions.forCheckpointWithDefaultLocation(configuration.isExactlyOnceCheckpointMode(),configuration.isUnalignedCheckpointsEnabled(),configuration.getAlignmentTimeout());final long timestamp = System.currentTimeMillis();final CheckpointMetaData checkpointMetaData = new CheckpointMetaData(checkpointId, timestamp);try {SourceStreamTask.super.triggerCheckpointAsync(checkpointMetaData, checkpointOptions, false).get();}catch (RuntimeException e) {throw e;}catch (Exception e) {throw new FlinkException(e.getMessage(), e);}}};((ExternallyInducedSource<?, ?>) source).setCheckpointTrigger(triggerHook);}// 配置checkpoint启动延迟时间监控getEnvironment().getMetricGroup().getIOMetricGroup().gauge(MetricNames.CHECKPOINT_START_DELAY_TIME, this::getAsyncCheckpointStartDelayNanos);}
StreamTask从上游获取数据
StreamTask从上游获取数据的调用链为:
- StreamTask.processInput
- inputProcessor.processInput
- StreamTaskNetworkInput.emitNext
- inputGate.pollNext
- inputChannel.getNextBuffer
StreamTask通过InputGate从上游其他Task获取到数据。每个InputGate包含一个或多个InputChannel,根据数据是否走网络通信,这些InputChannel分为RemoteInputChannel和LocalInputChannel。其中RemoteInputChannel使用Netty通过网络从上游task的ResultSubPartition获取数据,适用与本task和上游task运行在不同集群节点的情况。和它相反的是LocalInputChannel,适用于本task和上游task运行在同一节点的情况,从上游task获取数据不需要走网络通信。
这部分逻辑的详细分析,参见 Flink 源码之节点间通信。
数据传递给OperatorChain
这一段逻辑我们从StreamTaskNetworkInput的processElement方法开始分析。
StreamTask的processInput方法为处理数据逻辑的入口。这个方法调用了StreamOneInputProcessor的同名方法,命令StreamTaskNetworkInput一直循环不停的从InputGate中获取数据。对于获取到的数据,需要先交给反序列化器,将二进制数据反序列化为StreamRecord对象。接着交给processElement方法处理。
上面逻辑的分析请参见 Flink 源码之节点间通信 读取数据章节。
下面是processElement方法。该方法位于AbstractStreamTaskNetworkInput。参数中的output实际上就是StreamTaskNetworkOutput`对象。
private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {// 首先判断元素的类型,可能是数据,watermark,延迟标记或者是流状态if (recordOrMark.isRecord()){output.emitRecord(recordOrMark.asRecord());} else if (recordOrMark.isWatermark()) {statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel, output);} else if (recordOrMark.isLatencyMarker()) {output.emitLatencyMarker(recordOrMark.asLatencyMarker());} else if (recordOrMark.isStreamStatus()) {statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel, output);} else {throw new UnsupportedOperationException("Unknown type of StreamElement");}}
StreamTaskNetworkOutput接收反序列化处理过的数据,发送给OperatorChain的第一个operator。
/*** The network data output implementation used for processing stream elements* from {@link StreamTaskNetworkInput} in two input selective processor.*/private static class StreamTaskNetworkOutput<T> extends AbstractDataOutput<T> {// 创建的时候传入的是OperatorChain的mainOperator,即第一个operatorprivate final TwoInputStreamOperator<?, ?, ?> operator;/** The function way is only used for frequent record processing as for JIT optimization. */private final ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer;private final WatermarkGauge inputWatermarkGauge;/** The input index to indicate how to process elements by two input operator. */private final int inputIndex;private final Counter numRecordsIn;private final StreamStatusTracker statusTracker;private StreamTaskNetworkOutput(TwoInputStreamOperator<?, ?, ?> operator,ThrowingConsumer<StreamRecord<T>, Exception> recordConsumer,StreamStatusMaintainer streamStatusMaintainer,WatermarkGauge inputWatermarkGauge,StreamStatusTracker statusTracker,int inputIndex,Counter numRecordsIn) {super(streamStatusMaintainer);this.operator = checkNotNull(operator);this.recordConsumer = checkNotNull(recordConsumer);this.inputWatermarkGauge = checkNotNull(inputWatermarkGauge);this.statusTracker = statusTracker;this.inputIndex = inputIndex;this.numRecordsIn = numRecordsIn;}// 发送数据@Overridepublic void emitRecord(StreamRecord<T> record) throws Exception {numRecordsIn.inc();recordConsumer.accept(record);}// 发送watermark@Overridepublic void emitWatermark(Watermark watermark) throws Exception {inputWatermarkGauge.setCurrentWatermark(watermark.getTimestamp());if (inputIndex == 0) {operator.processWatermark1(watermark);} else {operator.processWatermark2(watermark);}}@Overridepublic void emitStreamStatus(StreamStatus streamStatus) {final StreamStatus anotherStreamStatus;if (inputIndex == 0) {statusTracker.setFirstStatus(streamStatus);anotherStreamStatus = statusTracker.getSecondStatus();} else {statusTracker.setSecondStatus(streamStatus);anotherStreamStatus = statusTracker.getFirstStatus();}// check if we need to toggle the task's stream statusif (!streamStatus.equals(streamStatusMaintainer.getStreamStatus())) {if (streamStatus.isActive()) {// we're no longer idle if at least one input has become activestreamStatusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);} else if (anotherStreamStatus.isIdle()) {// we're idle once both inputs are idlestreamStatusMaintainer.toggleStreamStatus(StreamStatus.IDLE);}}}// 发送延迟标记,被用于统计数据在整个Flink处理流程中的耗时@Overridepublic void emitLatencyMarker(LatencyMarker latencyMarker) throws Exception {if (inputIndex == 0) {operator.processLatencyMarker1(latencyMarker);} else {operator.processLatencyMarker2(latencyMarker);}}}
OperatorChain的逻辑在后续博客中单独分析。
