前言
本篇我们从典型Flink用户代码入口开始分析整个Flink作业的执行流程。
我们以wordcount为例:
public static void main(String[] args) throws Exception {// Checking input parametersfinal MultipleParameterTool params = MultipleParameterTool.fromArgs(args);// set up the execution environmentfinal StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// make parameters available in the web interfaceenv.getConfig().setGlobalJobParameters(params);// get input dataDataStream<String> text = null;if (params.has("input")) {// union all the inputs from text filesfor (String input : params.getMultiParameterRequired("input")) {if (text == null) {text = env.readTextFile(input);} else {text = text.union(env.readTextFile(input));}}Preconditions.checkNotNull(text, "Input DataStream should not be null.");} else {System.out.println("Executing WordCount example with default input data set.");System.out.println("Use --input to specify file input.");// get default test text datatext = env.fromElements(WordCountData.WORDS);}DataStream<Tuple2<String, Integer>> counts =// split up the lines in pairs (2-tuples) containing: (word,1)text.flatMap(new Tokenizer())// group by the tuple field "0" and sum up tuple field "1".keyBy(value -> value.f0).sum(1);// emit resultif (params.has("output")) {counts.writeAsText(params.get("output"));} else {System.out.println("Printing result to stdout. Use --output to specify output path.");counts.print();}// execute programenv.execute("Streaming WordCount");}
创建ExecutionEnvironment
Flink用户代码执行的第一步通常为获取执行的environment。
final StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();
那我们开始从getExecutionEnvironment方法开始分析
public static StreamExecutionEnvironment getExecutionEnvironment() {return getExecutionEnvironment(new Configuration());}// 进一步调用 getExecutionEnvironment(Configuration configuration)public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory).map(factory -> factory.createExecutionEnvironment(configuration)) // 创建默认的执行环境.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration)); // 创建local执行环境}
其中Util.resolveFactory方法接收两个参数,第一个是位于ThreadLocal中的factory,另一个是静态Factory。如果Threadlocal中存在一个factory,返回这个factory,否则返回静态factory。
如果从这两个factory中都无法创建出ExecutionEnvironment,则调用StreamExecutionEnvironment.createLocalEnvironment(configuration)方法。会创建出一个LocalStreamEnvironment,意味着使用本地运行模式,所有的任务在同一个JVM中运行。
如果用户使用命令行方式将Flink作业提交到集群,可参考03-flink源码任务提交流程分析中,程序会执行如下逻辑:
- CliFrontend.executeProgram- ClientUtils.executeProgram- StreamContextEnvironment.setAsContext- StreamExecutionEnvironment.initializeContextEnvironment
通过这些调用,StreamExecutionEnvironmentFactory会被创建出来并设置到StreamExecutionEnvironment的threadLocalContextEnvironmentFactory和contextEnvironmentFactory中。调用这个factory的createExecutionEnvironment方法创建出一个StreamContextEnvironment对象。这是将作业提交到远程Flink分布式集群的运行环境。
execute方法执行作业
以上完成执行环境的创建,接下来分析execute方法启动Flink作业,代码如下:
// 无参方法public JobExecutionResult execute() throws Exception {// getJobName方法从配置文件pipeline.name配置项获取Job name// 如果没有配置,使用默认名称"Flink Streaming Job"return execute(getJobName());}// jobname执行方法public JobExecutionResult execute(String jobName) throws Exception {Preconditions.checkNotNull(jobName, "Streaming Job name should not be null.");return execute(getStreamGraph(jobName));}
在execute作业之前,有一个getStreamGraph方法,将作业转换为StreamGraph。续后会完成StreamGraph生成流程,接下来进一步分析execute方法。
接下来的execute方法的执行步骤随着ExecutionEnvironment的不同而不同。
StreamContextEnvironment的execute方法:
@Overridepublic JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// 异步执行作业final JobClient jobClient = executeAsync(streamGraph);// 获取配置的作业监听器final List<JobListener> jobListeners = getJobListeners();try {// 获取作业执行结果,逐个通知作业监听器final JobExecutionResult jobExecutionResult = getJobExecutionResult(jobClient);jobListeners.forEach(jobListener ->jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {jobListeners.forEach(jobListener ->jobListener.onJobExecuted(null, ExceptionUtils.stripExecutionException(t)));ExceptionUtils.rethrowException(t);// never reached, only make javac happyreturn null;}}
LocalStreamEnvironment的execute方法:
@Overridepublic JobExecutionResult execute(StreamGraph streamGraph) throws Exception {return super.execute(streamGraph);}
调用了父类的execute方法。它的父类正是StreamExecutionEnvironment。
/*** Triggers the program execution. The environment will execute all parts of* the program that have resulted in a "sink" operation. Sink operations are* for example printing results or forwarding them to a message queue.** @param streamGraph the stream graph representing the transformations* @return The result of the job execution, containing elapsed time and accumulators.* @throws Exception which occurs during job execution.*/@Internalpublic JobExecutionResult execute(StreamGraph streamGraph) throws Exception {// 异步执行作业final JobClient jobClient = executeAsync(streamGraph);try {final JobExecutionResult jobExecutionResult;// 使用attached模式执行作业由于需要保持client端不关闭,所以这里同步等待作业执行结果if (configuration.getBoolean(DeploymentOptions.ATTACHED)) {jobExecutionResult = jobClient.getJobExecutionResult().get();} else {// // 异步模式则不需要jobExecutionResult = new DetachedJobExecutionResult(jobClient.getJobID());}// 逐个通知jobListenerjobListeners.forEach(jobListener -> jobListener.onJobExecuted(jobExecutionResult, null));return jobExecutionResult;} catch (Throwable t) {// get() on the JobExecutionResult Future will throw an ExecutionException. This// behaviour was largely not there in Flink versions before the PipelineExecutor// refactoring so we should strip that exception.Throwable strippedException = ExceptionUtils.stripExecutionException(t);jobListeners.forEach(jobListener -> {jobListener.onJobExecuted(null, strippedException);});ExceptionUtils.rethrowException(strippedException);// never reached, only make javac happyreturn null;}}
接下来我们分析StreamContextEnvironment的executeAsync方法。
@Overridepublic JobClient executeAsync(StreamGraph streamGraph) throws Exception {// 检查一个environment中不能调用多次execute或executeAsyncvalidateAllowedExecution();// 调用父类的executeAsync方法// 和LocalStreamEnvironment中的executeAsync相同final JobClient jobClient = super.executeAsync(streamGraph);if (!suppressSysout) {System.out.println("Job has been submitted with JobID " + jobClient.getJobID());}return jobClient;}
最终两个Environment的executeAsync方法归为一致。
下面是StreamExecutionEnvironment类的executeAsync方法:
/*** Triggers the program execution asynchronously. The environment will execute all parts of* the program that have resulted in a "sink" operation. Sink operations are* for example printing results or forwarding them to a message queue.** @param streamGraph the stream graph representing the transformations* @return A {@link JobClient} that can be used to communicate with the submitted job, completed on submission succeeded.* @throws Exception which occurs during job execution.*/@Internalpublic JobClient executeAsync(StreamGraph streamGraph) throws Exception {// 检查streamGraph不能为nullcheckNotNull(streamGraph, "StreamGraph cannot be null.");// 检查部署目标配置不能为null// 部署目标即作业运行的模式,例如本地模式,远程模式,yarn模式或者是k8s模式checkNotNull(configuration.get(DeploymentOptions.TARGET), "No execution.target specified in your configuration file.");// 基于configuration创建一个合适的执行器工厂类// 获取作业执行器工厂类final PipelineExecutorFactory executorFactory =executorServiceLoader.getExecutorFactory(configuration);checkNotNull(executorFactory,"Cannot find compatible factory for specified execution.target (=%s)",configuration.get(DeploymentOptions.TARGET));// 从执行器工厂获取执行器,运行包含用户作业的streamGraphCompletableFuture<JobClient> jobClientFuture = executorFactory.getExecutor(configuration) //根据configuration获取对应的执行期.execute(streamGraph, configuration, userClassloader); // 根据stramgraph, configuration, 类加载器 开始执行作业try {// 通知各个作业监听器作业已提交JobClient jobClient = jobClientFuture.get();jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(jobClient, null));return jobClient;} catch (ExecutionException executionException) {final Throwable strippedException = ExceptionUtils.stripExecutionException(executionException);jobListeners.forEach(jobListener -> jobListener.onJobSubmitted(null, strippedException));throw new FlinkException(String.format("Failed to execute job '%s'.", streamGraph.getJobName()),strippedException);}}
到这里不同的执行环境具体的逻辑又要开始走向不同。这些逻辑的分叉点在executorServiceLoader.getExecutorFactory(configuration),不同环境下获取到的PipelineExecutorFactory是不同的。在创建LocalStreamEnvironment或者是StreamContextEnvironment的时候executorServiceLoader变量传入的是DefaultExecutorServiceLoader。我们查看它的getExecutorFactory方法。如下所示:
DefaultExecutorServiceLoader的getExecutorFactory方法:
public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {checkNotNull(configuration);// 根据配置文件加载并实例化PipelineExecutorFactory的实现类final ServiceLoader<PipelineExecutorFactory> loader =ServiceLoader.load(PipelineExecutorFactory.class);final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();final Iterator<PipelineExecutorFactory> factories = loader.iterator();while (factories.hasNext()) {try {// 遍历所有加载的factory// 只保留和配置文件兼容的factoryfinal PipelineExecutorFactory factory = factories.next();if (factory != null && factory.isCompatibleWith(configuration)) {compatibleFactories.add(factory);}} catch (Throwable e) {if (e.getCause() instanceof NoClassDefFoundError) {LOG.info("Could not load factory due to missing dependencies.");} else {throw e;}}}// 如果兼容的factory有多个,打印错误信息if (compatibleFactories.size() > 1) {final String configStr =configuration.toMap().entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining("\n"));throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");}// 如果兼容的factory是0个,抛出异常信息if (compatibleFactories.isEmpty()) {throw new IllegalStateException("No ExecutorFactory found to execute the application.");}// 返回这个唯一的factoryreturn compatibleFactories.get(0);}
在这个方法中使用了Java SPI机制,根据META-INF/services内的配置文件动态加载并实例化PipelineExecutorFactory的子类。
PipelineExecutorFactory和PipelineExecutor
PipelineExecutorFactory
我们查找下org.apache.flink.core.execution.PipelineExecutorFactory配置文件,发现共有3个,分别位于源码flink-clients,flink-yarn和flink-kubernetes子项目中。下面逐个分析。
flink-clients中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:
org.apache.flink.client.deployment.executors.RemoteExecutorFactoryorg.apache.flink.client.deployment.executors.LocalExecutorFactory
也就是说如果我们引入了flink-clients包,会创建RemoteExecutorFactory和LocalExecutorFactory实例。
我们在分别查看下他们的isCompatibleWith方法。
RemoteExecutorFactory的isCompatibleWith方法:
@Overridepublic boolean isCompatibleWith(final Configuration configuration) {// 配置文件 中 execution.target=remote时;执行该方法return RemoteExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));}
要求配置文件中execution.target配置为remote,才会使用RemoteExecutorFactory。
LocalExecutorFactory的isCompatibleWith方法:
@Overridepublic boolean isCompatibleWith(final Configuration configuration) {// 配置文件 中 execution.target=local 时;执行该方法return LocalExecutor.NAME.equalsIgnoreCase(configuration.get(DeploymentOptions.TARGET));}
要求配置文件中execution.target配置为local,才会使用LocalExecutorFactory。
flink-yarn中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:
org.apache.flink.yarn.executors.YarnJobClusterExecutorFactoryorg.apache.flink.yarn.executors.YarnSessionClusterExecutorFactory
和上面类似,这里只说明各个factory兼容的配置项,不再贴出isCompatibleWith方法代码。
- YarnJobClusterExecutorFactory:要求execution.target为yarn-per-job
- YarnSessionClusterExecutorFactory:要求execution.target为yarn-session
flink-kubernetes中的org.apache.flink.core.execution.PipelineExecutorFactory内容如下:
org.apache.flink.kubernetes.executors.KubernetesSessionClusterExecutorFactory
其中KubernetesSessionClusterExecutorFactory要求execution.target配置为kubernetes-session。
PipelineExecutor
接下来我们重点讨论两个Executor:LocalExecutor和RemoteExecutor。这两个executor创建的逻辑比较简单,此处不再介绍。
PipelineExecutor执行作业的方法为execute。它有3个参数:
- pipeline:要执行的作业,指的是StreamGraph。
- configuration:作业的配置。
- userCodeClassloader:用户作业的类加载器。和Flink本身使用不同类加载器的原因是不同用户作业加载的class可能冲突,用户作业和Flink框架本身加载的class也可能冲突。为了避免这种冲突,用户作业采用不同的类加载器加载。
LocalExecutor
LocalExecutor用于在本地执行任务。它的execute方法如下:
public CompletableFuture<JobClient> execute(Pipeline pipeline, Configuration configuration,ClassLoader userCodeClassloader) throws Exception {// 校验检查checkNotNull(pipeline);checkNotNull(configuration);//Configuration effectiveConfig = new Configuration();effectiveConfig.addAll(this.configuration);effectiveConfig.addAll(configuration);// we only support attached execution with the local executor.// 只支持ATTACHED模式运行checkState(configuration.getBoolean(DeploymentOptions.ATTACHED));// 将StreamGraph转换为JobGraphfinal JobGraph jobGraph = getJobGraph(pipeline, effectiveConfig);// 创建一个MiniCluster// 并调用MiniCluster的submitJob方法,提交作业return PerJobMiniClusterFactory.createWithFactory(effectiveConfig, miniClusterFactory).submitJob(jobGraph, userCodeClassloader);}
在submit作业之前,需要将StreamGraph转换为JobGraph。后续会补充JobGraph如何生成。
在方法的最后调用了PerJobMiniClusterFactory的submitJob方法。PerJobMiniClusterFactory实际操作的是MiniCluster对象。顾名思义,它是一个”小型集群”,所有的作业都在本地运行。
PerJobMiniClusterFactory的submitJob方法代码如下:
/*** Starts a {@link MiniCluster} and submits a job.*/public CompletableFuture<JobClient> submitJob(JobGraph jobGraph, ClassLoader userCodeClassloader) throws Exception {// 获取MiniCluster的配置,指定最大并行度MiniClusterConfiguration miniClusterConfig = getMiniClusterConfig(jobGraph.getMaximumParallelism());// 创建出一个MiniClusterMiniCluster miniCluster = miniClusterFactory.apply(miniClusterConfig);// 启动miniClusterminiCluster.start();return miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(submissionResult -> {org.apache.flink.client.ClientUtils.waitUntilJobInitializationFinished(() -> miniCluster.getJobStatus(submissionResult.getJobID()).get(),() -> miniCluster.requestJobResult(submissionResult.getJobID()).get(),userCodeClassloader);return submissionResult;})).thenApply(result -> new MiniClusterJobClient(result.getJobID(),miniCluster,userCodeClassloader,MiniClusterJobClient.JobFinalizationBehavior.SHUTDOWN_CLUSTER)).whenComplete((ignored, throwable) -> {if (throwable != null) {// We failed to create the JobClient and must shutdown to ensure cleanup.shutDownCluster(miniCluster);}}).thenApply(Function.identity());}
RemoteExecutor
remoteExecutor的execute方法位于它的父类AbstractSessionClusterExecutor中。代码和分析如下所示:
@Overridepublic CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline,@Nonnull final Configuration configuration,@Nonnull final ClassLoader userCodeClassloader) throws Exception {// 和之前相同,仍然是生成JobGraphfinal JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(pipeline, configuration);try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {// 获取远程集群IDfinal ClusterID clusterID = clusterClientFactory.getClusterId(configuration);checkState(clusterID != null);final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID);// 创建出clusterClient,用户和远程集群通信,提交作业ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();return clusterClient.submitJob(jobGraph) // 提交作业.thenApplyAsync(FunctionUtils.uncheckedFunction(jobId -> {ClientUtils.waitUntilJobInitializationFinished(() -> clusterClient.getJobStatus(jobId).get(),() -> clusterClient.requestJobResult(jobId).get(),userCodeClassloader);return jobId;})).thenApplyAsync(jobID -> (JobClient) new ClusterClientJobClientAdapter<>(clusterClientProvider,jobID,userCodeClassloader)).whenComplete((ignored1, ignored2) -> clusterClient.close());}}
ClusterClient根据集群类型的不同,有两个子类:MiniClusterClient和RestClusterClient。其中MiniClusterClient用户和MiniCluster通信,它的submitJob方法实际调用的是MiniCluster的submitJob方法。
RestClusterClient通过http rest请求和远程集群通信。它的submitJob方法如下所示:
@Overridepublic CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {CompletableFuture<java.nio.file.Path> jobGraphFileFuture = CompletableFuture.supplyAsync(() -> {try {// 创建临时文件final java.nio.file.Path jobGraphFile = Files.createTempFile("flink-jobgraph", ".bin");// 写到输出对象流中try (ObjectOutputStream objectOut = new ObjectOutputStream(Files.newOutputStream(jobGraphFile))) {objectOut.writeObject(jobGraph);}// 返回文件路径return jobGraphFile;} catch (IOException e) {throw new CompletionException(new FlinkException("Failed to serialize JobGraph.", e));}}, executorService);// 在JobGraph写入文件完成之后执行CompletableFuture<Tuple2<JobSubmitRequestBody, Collection<FileUpload>>> requestFuture = jobGraphFileFuture.thenApply(jobGraphFile -> {// jar文件名列表List<String> jarFileNames = new ArrayList<>(8);// 缓存的分布式文件列表List<JobSubmitRequestBody.DistributedCacheFile> artifactFileNames = new ArrayList<>(8);// 需要上传的文件列表Collection<FileUpload> filesToUpload = new ArrayList<>(8);//首先加载以及序列化的jobgraph文件filesToUpload.add(new FileUpload(jobGraphFile, RestConstants.CONTENT_TYPE_BINARY));// 从JobGraph中获取用户的jar文件路径,加入到上传列表for (Path jar : jobGraph.getUserJars()) {jarFileNames.add(jar.getName());filesToUpload.add(new FileUpload(Paths.get(jar.toUri()), RestConstants.CONTENT_TYPE_JAR));}for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {final Path artifactFilePath = new Path(artifacts.getValue().filePath);try {// Only local artifacts need to be uploaded.// 上传用户作业运行所需的其它类型文件// 只需要上传本地储存的文件if (!artifactFilePath.getFileSystem().isDistributedFS()) {artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));}} catch (IOException e) {throw new CompletionException(new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));}}// 创建作业提交请求体final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(jobGraphFile.getFileName().toString(),jarFileNames,artifactFileNames);// 返回请求体和需要上传的文件return Tuple2.of(requestBody, Collections.unmodifiableCollection(filesToUpload));});// 请求构建完毕后提交请求final CompletableFuture<JobSubmitResponseBody> submissionFuture = requestFuture.thenCompose(requestAndFileUploads -> sendRetriableRequest(JobSubmitHeaders.getInstance(),EmptyMessageParameters.getInstance(),requestAndFileUploads.f0,requestAndFileUploads.f1,isConnectionProblemOrServiceUnavailable()));// 请求发送完毕之后,删除JobGraph临时文件submissionFuture.thenCombine(jobGraphFileFuture, (ignored, jobGraphFile) -> jobGraphFile).thenAccept(jobGraphFile -> {try {Files.delete(jobGraphFile);} catch (IOException e) {LOG.warn("Could not delete temporary file {}.", jobGraphFile, e);}});return submissionFuture.thenApply(ignore -> jobGraph.getJobID()).exceptionally((Throwable throwable) -> {throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(),"Failed to submit JobGraph.", ExceptionUtils.stripCompletionException(throwable)));});}
由于远程集群的JobManager和TaskManager需要单独部署启动,和本篇内容关联不大,此处不再详细描述。
MiniCluster
前面已经简单介绍过MiniCluster,它用于在本地环境执行作业。
start方法
启动MiniCluster的逻辑位于start方法中。
/*** Starts the mini cluster, based on the configured properties.** @throws Exception This method passes on any exception that occurs during the startup of* the mini cluster.*/public void start() throws Exception {synchronized (lock) {checkState(!running, "MiniCluster is already running");LOG.info("Starting Flink Mini Cluster");LOG.debug("Using configuration {}", miniClusterConfiguration);final Configuration configuration = miniClusterConfiguration.getConfiguration();// MiniCluster中的组件使用同一个共享的RPC服务final boolean useSingleRpcService = miniClusterConfiguration.getRpcServiceSharing() == RpcServiceSharing.SHARED;try {// 初始化IO相关配置,是否复写和是否创建output directory等initializeIOFormatClasses(configuration);LOG.info("Starting Metrics Registry");// 创建监控相关配置metricRegistry = createMetricRegistry(configuration);// bring up all the RPC servicesLOG.info("Starting RPC Service(s)");final RpcServiceFactory dispatcherResourceManagerComponentRpcServiceFactory;final RpcService metricQueryServiceRpcService;// 使用共享rpc服务if (useSingleRpcService) {// we always need the 'commonRpcService' for auxiliary calls// 创建本地RPC服务commonRpcService = createLocalRpcService(configuration);// 创建通用RPC服务工厂final CommonRpcServiceFactory commonRpcServiceFactory = new CommonRpcServiceFactory(commonRpcService);// TaskManager RPC服务工厂使用RPC服务工厂taskManagerRpcServiceFactory = commonRpcServiceFactory;dispatcherResourceManagerComponentRpcServiceFactory = commonRpcServiceFactory;// 启动RPC查询服务metricQueryServiceRpcService = MetricUtils.startLocalMetricsRpcService(configuration);} else {// start a new service per component, possibly with custom bind addresses// 如果不共用RPC服务,获取JobManager和TaskManager地址和端口范围final String jobManagerExternalAddress = miniClusterConfiguration.getJobManagerExternalAddress();final String taskManagerExternalAddress = miniClusterConfiguration.getTaskManagerExternalAddress();final String jobManagerExternalPortRange = miniClusterConfiguration.getJobManagerExternalPortRange();final String taskManagerExternalPortRange = miniClusterConfiguration.getTaskManagerExternalPortRange();final String jobManagerBindAddress = miniClusterConfiguration.getJobManagerBindAddress();final String taskManagerBindAddress = miniClusterConfiguration.getTaskManagerBindAddress();// 分别创建各个组件的factory和服务等dispatcherResourceManagerComponentRpcServiceFactory =new DedicatedRpcServiceFactory(configuration,jobManagerExternalAddress,jobManagerExternalPortRange,jobManagerBindAddress);taskManagerRpcServiceFactory =new DedicatedRpcServiceFactory(configuration,taskManagerExternalAddress,taskManagerExternalPortRange,taskManagerBindAddress);// we always need the 'commonRpcService' for auxiliary calls// bind to the JobManager address with port 0commonRpcService = createRemoteRpcService(configuration, jobManagerBindAddress, 0);metricQueryServiceRpcService = MetricUtils.startRemoteMetricsRpcService(configuration,commonRpcService.getAddress());}// 启动监控查询服务metricRegistry.startQueryService(metricQueryServiceRpcService, null);// 创建进程监控指标组processMetricGroup = MetricUtils.instantiateProcessMetricGroup(metricRegistry,RpcUtils.getHostname(commonRpcService),ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));// 创建IO线程池ioExecutor = Executors.newFixedThreadPool(ClusterEntrypointUtils.getPoolSize(configuration),new ExecutorThreadFactory("mini-cluster-io"));// 创建高可用服务haServices = createHighAvailabilityServices(configuration, ioExecutor);// 启动blobServerblobServer = new BlobServer(configuration, haServices.createBlobStore());blobServer.start();// 创建心跳服务heartbeatServices = HeartbeatServices.fromConfiguration(configuration);// 创建blob缓存服务blobCacheService = new BlobCacheService(configuration, haServices.createBlobStore(), new InetSocketAddress(InetAddress.getLocalHost(), blobServer.getPort()));// 启动TaskManagerstartTaskManagers();// 创建监控查询获取服务MetricQueryServiceRetriever metricQueryServiceRetriever = new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService());// 创建Dispatcher和ResourceManager,它们在同一个进程中运行setupDispatcherResourceManagerComponents(configuration, dispatcherResourceManagerComponentRpcServiceFactory, metricQueryServiceRetriever);// 创建ResourceManager leader获取服务resourceManagerLeaderRetriever = haServices.getResourceManagerLeaderRetriever();dispatcherLeaderRetriever = haServices.getDispatcherLeaderRetriever();clusterRestEndpointLeaderRetrievalService = haServices.getClusterRestEndpointLeaderRetriever();// 创建Dispatcher gateway获取服务dispatcherGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,DispatcherGateway.class,DispatcherId::fromUuid,new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));resourceManagerGatewayRetriever = new RpcGatewayRetriever<>(commonRpcService,ResourceManagerGateway.class,ResourceManagerId::fromUuid,new ExponentialBackoffRetryStrategy(21, Duration.ofMillis(5L), Duration.ofMillis(20L)));// 创建WebMonitor leader获取服务webMonitorLeaderRetriever = new LeaderRetriever();// 分别启动这些服务resourceManagerLeaderRetriever.start(resourceManagerGatewayRetriever);dispatcherLeaderRetriever.start(dispatcherGatewayRetriever);clusterRestEndpointLeaderRetrievalService.start(webMonitorLeaderRetriever);}catch (Exception e) {// cleanup everythingtry {close();} catch (Exception ee) {e.addSuppressed(ee);}throw e;}// create a new termination futureterminationFuture = new CompletableFuture<>();// now officially mark this as runningrunning = true;LOG.info("Flink Mini Cluster started successfully");}}
通过上面的分析我们熟悉了MiniCluster的启动流程。接下来分析下启动TaskManager的逻辑,位于startTaskManagers方法。
@GuardedBy("lock")private void startTaskManagers() throws Exception {final int numTaskManagers = miniClusterConfiguration.getNumTaskManagers();// 启动task manager的个数LOG.info("Starting {} TaskManger(s)", numTaskManagers);for (int i = 0; i < numTaskManagers; i++) {startTaskExecutor();}}
继续跟踪startTaskExecutor方法:
@VisibleForTestingvoid startTaskExecutor() throws Exception {synchronized (lock) {final Configuration configuration = miniClusterConfiguration.getConfiguration();// 创建TaskExecutorfinal TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager(configuration,new ResourceID(UUID.randomUUID().toString()),taskManagerRpcServiceFactory.createRpcService(),haServices,heartbeatServices,metricRegistry,blobCacheService,useLocalCommunication(),ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES,taskManagerTerminatingFatalErrorHandlerFactory.create(taskManagers.size()));// 启动TaskExecutortaskExecutor.start();taskManagers.add(taskExecutor);}}
taskmanger创建了多个taskexecutor,到这里TaskManager已经启动完毕。
Dispatcher
Dispatcher负责提交作业和创建出JobManager。
Dispatcher有2个子类,MiniDispatcher和StandaloneDispatcher。分别用于提交任务给MiniCluster和其他独立的cluster。其中StandaloneDispatcher的实现
最为简单,没有override父类Dispatcher的任何方法。
接下来我们分析下MiniDispatcher的submitJob方法。
@Overridepublic CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {// 调用Dispatcher的submitJob方法final CompletableFuture<Acknowledge> acknowledgeCompletableFuture = super.submitJob(jobGraph, timeout);acknowledgeCompletableFuture.whenComplete((Acknowledge ignored, Throwable throwable) -> {if (throwable != null) {onFatalError(new FlinkException("Failed to submit job " + jobGraph.getJobID() + " in job mode.",throwable));}});return acknowledgeCompletableFuture;}
继续查看父类Dispacher的submitJob方法,如下所示:
@Overridepublic CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());try {// 检查作业是否重复if (isDuplicateJob(jobGraph.getJobID())) {return FutureUtils.completedExceptionally(new DuplicateJobSubmissionException(jobGraph.getJobID()));// 检查部分资源是否已配置} else if (isPartialResourceConfigured(jobGraph)) {return FutureUtils.completedExceptionally(new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +"resources configured. The limitation will be removed in future versions."));} else {// 内部提交作业return internalSubmitJob(jobGraph);}} catch (FlinkException e) {return FutureUtils.completedExceptionally(e);}}
接下来流程到了internalSubmitJob方法,代码如下:
private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());// 调用persistAndRunJob方法final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJob(jobGraph.getJobID(), jobGraph,this::persistAndRunJob).thenApply(ignored -> Acknowledge.get());return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {if (throwable != null) {cleanUpJobData(jobGraph.getJobID(), true);ClusterEntryPointExceptionUtils.tryEnrichClusterEntryPointError(throwable);final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);throw new CompletionException(new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));} else {return acknowledge;}}, ioExecutor);}
继续跟踪persistAndRunJob方法:
private void persistAndRunJob(JobGraph jobGraph) throws Exception {// 存储JobGraphjobGraphWriter.putJobGraph(jobGraph);// // 调用runJob方法runJob(jobGraph, ExecutionType.SUBMISSION);}
分析下runJob方法:
private void runJob(JobGraph jobGraph, ExecutionType executionType) {Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));// 作业时间戳long initializationTimestamp = System.currentTimeMillis();// 调用启动JobManager逻辑CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph, initializationTimestamp);DispatcherJob dispatcherJob = DispatcherJob.createFor(jobManagerRunnerFuture,jobGraph.getJobID(),jobGraph.getName(),initializationTimestamp);// 将当前Job存入runningJobs集合runningJobs.put(jobGraph.getJobID(), dispatcherJob);final JobID jobId = jobGraph.getJobID();// 处理Job提交结果,进行cleanup操作final CompletableFuture<CleanupJobState> cleanupJobStateFuture = dispatcherJob.getResultFuture().handleAsync((dispatcherJobResult, throwable) -> {Preconditions.checkState(runningJobs.get(jobId) == dispatcherJob, "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");if (dispatcherJobResult != null) {return handleDispatcherJobResult(jobId, dispatcherJobResult, executionType);} else {return dispatcherJobFailed(jobId, throwable);}}, getMainThreadExecutor());// 在cleanup完成之后终止作业final CompletableFuture<Void> jobTerminationFuture = cleanupJobStateFuture.thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState)).thenCompose(Function.identity());FutureUtils.assertNoException(jobTerminationFuture);registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);}
这里到了MiniCluster提交作业流程的最后一步,创建JobManagerRunner。
createJobManagerRunner方法分析如下:
CompletableFuture<JobManagerRunner> createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp) {// 获取RPC服务final RpcService rpcService = getRpcService();return CompletableFuture.supplyAsync(() -> {try {// 创建出一个JobManager启动器// 传入JobGraph和其他参数JobManagerRunner runner = jobManagerRunnerFactory.createJobManagerRunner(jobGraph,configuration,rpcService,highAvailabilityServices,heartbeatServices,jobManagerSharedServices,new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),fatalErrorHandler,initializationTimestamp);// 启动JobManager启动器runner.start();return runner;} catch (Exception e) {throw new CompletionException(new JobInitializationException(jobGraph.getJobID(), "Could not instantiate JobManager.", e));}},ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on JobManager creation}
下面我们开始分析JobManager启动逻辑。
JobManagerRunner和JobManagerRunnerFactory
JobManagerRunnerFactory
接下来我们分析JobManagerRunnerFactory的唯一实现类DefaultJobManagerRunnerFactory的createJobManagerRunner方法。
@Overridepublic JobManagerRunner createJobManagerRunner(JobGraph jobGraph,Configuration configuration,RpcService rpcService,HighAvailabilityServices highAvailabilityServices,HeartbeatServices heartbeatServices,JobManagerSharedServices jobManagerServices,JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,FatalErrorHandler fatalErrorHandler,long initializationTimestamp) throws Exception {// // 创建JobManager的配置final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration);// 创建SlotPool服务和Scheduler的工厂类// 该工厂类用于创建SlotPoolService和SchedulerNGfinal SlotPoolFactory slotPoolFactory = SlotPoolFactory.fromConfiguration(configuration);final SchedulerNGFactory schedulerNGFactory = SchedulerNGFactoryFactory.createSchedulerNGFactory(configuration);final ShuffleMaster<?> shuffleMaster = ShuffleServiceLoader.loadShuffleServiceFactory(configuration).createShuffleMaster(configuration);// 创建JobMaster服务工厂// JobMaster服务用于获取JobManaster所在地址,获取和JobMaster通信的Gatewayfinal JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory(jobMasterConfiguration,slotPoolFactory,rpcService,highAvailabilityServices,jobManagerServices,heartbeatServices,jobManagerJobMetricGroupFactory,fatalErrorHandler,schedulerNGFactory,shuffleMaster);// 创建JobManagerRunnerImplreturn new JobManagerRunnerImpl(jobGraph,jobMasterFactory,highAvailabilityServices,jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()),jobManagerServices.getScheduledExecutorService(),fatalErrorHandler,initializationTimestamp);}
JobManagerRunner
JobManagerRunner用于启动JobManager。JobManagerRunner在创建时需要获取用户代码类加载器,RunningJobsRegistry(用于跟踪job执行状态,等待执行,执行中或者是执行完毕)和leader选举服务。
接下来需要启动JobManager。我们查看下start方法:
@Overridepublic void start() throws Exception {try {leaderElectionService.start(this);} catch (Exception e) {log.error("Could not start the JobManager because the leader election service did not start.", e);throw new Exception("Could not start the leader election service.", e);}}
start方法就一个任务:启动leader选举服务。启动leader选举过程。leader选举过程详细分析将在后续补充完整!
一旦有JobManagerRunner实例被授予leader角色,它的grantLeadership方法会被调用。grantLeadership是LeaderContender接口的方法,所有参与leader竞选的角色都必须要实现这个接口。JobManagerRunner自然也不例外。
我们查看grantLeadership方法:
@Overridepublic void grantLeadership(final UUID leaderSessionID) {synchronized (lock) {if (shutdown) {log.debug("JobManagerRunner cannot be granted leadership because it is already shut down.");return;}leadershipOperation = leadershipOperation.thenCompose((ignored) -> {synchronized (lock) {return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);}});handleException(leadershipOperation, "Could not start the job manager.");}}
接下来到了校验作业调度和启动JobManager的时候。这部分逻辑位于verifyJobSchedulingStatusAndStartJobManager方法。
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {// 检查作业调度状态final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();return jobSchedulingStatusFuture.thenCompose(jobSchedulingStatus -> {// 如果作业已经执行完毕,调用执行完毕逻辑if (jobSchedulingStatus == JobSchedulingStatus.DONE) {return jobAlreadyDone();} else {// 否则,启动JobMasterreturn startJobMaster(leaderSessionId);}});}
这么长一路下来,终于到了启动JobMaster的时候。
private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());try {// 先设置作业的状态为正在运行runningJobsRegistry.setJobRunning(jobGraph.getJobID());} catch (IOException e) {return FutureUtils.completedExceptionally(new FlinkException(String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),e));}final CompletableFuture<Acknowledge> startFuture;try {// 然后启动JobMaster服务startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));} catch (Exception e) {return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));}final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;return startFuture.thenAcceptAsync((Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId,jobMasterService.getAddress(),currentLeaderGatewayFuture),executor);}
到这里为止,执行过程流转到了JobMaster对象。
JobMaster
JobMaster负责执行一个JobGraph。
public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {// make sure we receive RPC and async callsstart();return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);}
start方法调用了RPC的start方法
startJobExecution方法启动JobMaster服务和开始任务调度:
private Acknowledge startJobExecution(JobMasterId newJobMasterId) throws Exception {// 检查是否在主线程执行validateRunsInMainThread();checkNotNull(newJobMasterId, "The new JobMasterId must not be null.");if (Objects.equals(getFencingToken(), newJobMasterId)) {log.info("Already started the job execution with JobMasterId {}.", newJobMasterId);return Acknowledge.get();}setNewFencingToken(newJobMasterId);startJobMasterServices();log.info("Starting execution of job {} ({}) under job master id {}.", jobGraph.getName(), jobGraph.getJobID(), newJobMasterId);resetAndStartScheduler();return Acknowledge.get();}
startJobMasterServices方法创建出TaskManager心跳管理器,启动SlotPoolService和建立起ResourceManager leader的连接(ResourceManager也有leader选举过程)。
private void startJobMasterServices() throws Exception {startHeartbeatServices();// start the slot pool make sure the slot pool now accepts messages for this leaderslotPool.start(getFencingToken(), getAddress(), getMainThreadExecutor());//TODO: Remove once the ZooKeeperLeaderRetrieval returns the stored address upon start// try to reconnect to previously known leaderreconnectToResourceManager(new FlinkException("Starting JobMaster component."));// job is ready to go, try to establish connection with resource manager// - activate leader retrieval for the resource manager// - on notification of the leader, the connection will be established and// the slot pool will start requesting slotsresourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());}
我们回到上面的resetAndStartScheduler方法,查看它的代码
private void resetAndStartScheduler() throws Exception {validateRunsInMainThread();final CompletableFuture<Void> schedulerAssignedFuture;if (schedulerNG.requestJobStatus() == JobStatus.CREATED) {schedulerAssignedFuture = CompletableFuture.completedFuture(null);schedulerNG.setMainThreadExecutor(getMainThreadExecutor());} else {suspendAndClearSchedulerFields(new FlinkException("ExecutionGraph is being reset in order to be rescheduled."));final JobManagerJobMetricGroup newJobManagerJobMetricGroup = jobMetricGroupFactory.create(jobGraph);final SchedulerNG newScheduler = createScheduler(executionDeploymentTracker, newJobManagerJobMetricGroup);schedulerAssignedFuture = schedulerNG.getTerminationFuture().handle((ignored, throwable) -> {newScheduler.setMainThreadExecutor(getMainThreadExecutor());assignScheduler(newScheduler, newJobManagerJobMetricGroup);return null;});}FutureUtils.assertNoException(schedulerAssignedFuture.thenRun(this::startScheduling));}
我们回到上面的startScheduling方法,查看它的代码
private void startScheduling() {checkState(jobStatusListener == null);// register self as job status change listenerjobStatusListener = new JobManagerJobStatusListener();schedulerNG.registerJobStatusListener(jobStatusListener);schedulerNG.startScheduling();}
SchedulerNG
SchedulerNG为Flink的调度器接口,负责根据JobGraph创建ExecutionGraph然后将作业调度执行。
下面的分析我们已默认的DefaultScheduler为准进行分析。DefaultScheduler的父类为SchedulerBase。它在初始化的时候将JobGraph转换为ExecutionGraph。
@Overridepublic final void startScheduling() {mainThreadExecutor.assertRunningInMainThread();registerJobMetrics();startAllOperatorCoordinators();startSchedulingInternal();}
我们回到开始调度执行的逻辑。SchedulerBase的startScheduling方法调用了startSchedulingInternal。
@Overrideprotected void startSchedulingInternal() {log.info("Starting scheduling with scheduling strategy [{}]", schedulingStrategy.getClass().getName());// 设置ExecutionGraph的JobStatus状态为RunningprepareExecutionGraphForNgScheduling();// 执行调度策略的startScheduling方法schedulingStrategy.startScheduling();}
接下来我们查看下SchedulingStrategy唯一的实现类PipelinedRegionSchedulingStrategy的startScheduling方法。
@Overridepublic void startScheduling() {final Set<SchedulingPipelinedRegion> sourceRegions = IterableUtils.toStream(schedulingTopology.getAllPipelinedRegions()).filter(region -> !region.getConsumedResults().iterator().hasNext()).collect(Collectors.toSet());maybeScheduleRegions(sourceRegions);}
此方法先创建出sourceRegions集合。获取所有的Pipeline类型的Region。然后过滤掉其中不包含consumed result的region(region从上游region接收的数据称为consumed result,为下游region输出的数据称为produced result),即最后剩下的是pipelined region的源头节点。
接下来轮到maybeScheduleRegions方法。
private void maybeScheduleRegions(final Set<SchedulingPipelinedRegion> regions) {final List<SchedulingPipelinedRegion> regionsSorted =SchedulingStrategyUtils.sortPipelinedRegionsInTopologicalOrder(schedulingTopology, regions);for (SchedulingPipelinedRegion region : regionsSorted) {maybeScheduleRegion(region);}}
该方法将这些region安装拓扑顺序排序后,逐个调用maybeScheduleRegion方法。
private void maybeScheduleRegion(final SchedulingPipelinedRegion region) {// 如果region中有一个consumed result状态不是CONSUMABLE(数据可以被消费),返回if (!areRegionInputsAllConsumable(region)) {return;}// 检查region中所有的节点必须为已创建状态checkState(areRegionVerticesAllInCreatedState(region), "BUG: trying to schedule a region which is not in CREATED state");// 创建各个Execution节点和部署选项final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(regionVerticesSorted.get(region),id -> deploymentOption);// 开始为执行节点分配资源(slot)和部署schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);}
SchedulerOperations接口是SchedulingStrategy用于实现调度决策的方法。它拥有一个实现类DefaultScheduler。
@Overridepublic void allocateSlotsAndDeploy(final List<ExecutionVertexDeploymentOption> executionVertexDeploymentOptions) {// 检查每个ExecutionVertex的状态必须为ExecutionState.CREATEDvalidateDeploymentOptions(executionVertexDeploymentOptions);// 将ExecutionVertexID提取成keyfinal Map<ExecutionVertexID, ExecutionVertexDeploymentOption> deploymentOptionsByVertex =groupDeploymentOptionsByVertexId(executionVertexDeploymentOptions);// 提取出所有的ExecutionVertexIDfinal List<ExecutionVertexID> verticesToDeploy = executionVertexDeploymentOptions.stream().map(ExecutionVertexDeploymentOption::getExecutionVertexId).collect(Collectors.toList());// 创建每个ExecutionVertex的版本信息,默认值为1Lfinal Map<ExecutionVertexID, ExecutionVertexVersion> requiredVersionByVertex =executionVertexVersioner.recordVertexModifications(verticesToDeploy);// 将每个ExecutionVertex的状态切换为ExecutionState.SCHEDULEDtransitionToScheduled(verticesToDeploy);// 为所有vertex分配slot,即执行所需的资源final List<SlotExecutionVertexAssignment> slotExecutionVertexAssignments =allocateSlots(executionVertexDeploymentOptions);// 创建vertex部署相关信息// 即ExecutionVertexVersion,ExecutionVertexDeploymentOption和SlotExecutionVertexAssignment的包装类final List<DeploymentHandle> deploymentHandles = createDeploymentHandles(requiredVersionByVertex,deploymentOptionsByVertex,slotExecutionVertexAssignments);// 等待所有节点分配资源和部署完毕waitForAllSlotsAndDeploy(deploymentHandles);}
我们继续跟踪waitForAllSlotsAndDeploy方法
private void waitForAllSlotsAndDeploy(final List<DeploymentHandle> deploymentHandles) {FutureUtils.assertNoException(assignAllResources(deploymentHandles).handle(deployAll(deploymentHandles)));}
assignAllResources方法给所有的vertex分配执行所需的资源,然后执行deployAll部署所有节点。
private BiFunction<Void, Throwable, Void> deployAll(final List<DeploymentHandle> deploymentHandles) {return (ignored, throwable) -> {propagateIfNonNull(throwable);for (final DeploymentHandle deploymentHandle : deploymentHandles) {final SlotExecutionVertexAssignment slotExecutionVertexAssignment = deploymentHandle.getSlotExecutionVertexAssignment();final CompletableFuture<LogicalSlot> slotAssigned = slotExecutionVertexAssignment.getLogicalSlotFuture();// 确保slot分配已经完成checkState(slotAssigned.isDone());// 在slot分配完成后执行deployOrHandleError方法FutureUtils.assertNoException(slotAssigned.handle(deployOrHandleError(deploymentHandle)));}return null;};}
接着查看deployOrHandleError方法,代码如下:
private BiFunction<Object, Throwable, Void> deployOrHandleError(final DeploymentHandle deploymentHandle) {final ExecutionVertexVersion requiredVertexVersion = deploymentHandle.getRequiredVertexVersion();final ExecutionVertexID executionVertexId = requiredVertexVersion.getExecutionVertexId();return (ignored, throwable) -> {// 检查ExecutionVertex的版本是否已经修改// 如果已经修改,说明该vertex被其他deployment部署,终止本次部署操作if (executionVertexVersioner.isModified(requiredVertexVersion)) {log.debug("Refusing to deploy execution vertex {} because this deployment was " +"superseded by another deployment", executionVertexId);return null;}if (throwable == null) {// 部署ExecutionVertexdeployTaskSafe(executionVertexId);} else {handleTaskDeploymentFailure(executionVertexId, throwable);}return null;};}
接着跳转到deployTaskSafe方法:
private void deployTaskSafe(final ExecutionVertexID executionVertexId) {try {final ExecutionVertex executionVertex = getExecutionVertex(executionVertexId);// 在获取ExecutionVertex之后部署节点executionVertexOperations.deploy(executionVertex);} catch (Throwable e) {handleTaskDeploymentFailure(executionVertexId, e);}}
查看下ExecutionVertexOperations的唯一实现类DefaultExecutionVertexOperations的deploy方法:
@Overridepublic void deploy(final ExecutionVertex executionVertex) throws JobException {executionVertex.deploy();}
该方法调用了ExecutionVertex的部署方法。
ExecutionVertex
ExecutionVertex的deploy方法内容如下。
public void deploy() throws JobException {currentExecution.deploy();}
其中currentExecution为Execution对象。每次尝试执行ExecutionVertex都会创建出一个Execution对象。currentExecution变量保存了最近一次创建的Execution。
Execution的deploy方法内容请见下面说明:
/*** Deploys the execution to the previously assigned resource.** @throws JobException if the execution cannot be deployed to the assigned resource*/public void deploy() throws JobException {assertRunningInJobMasterMainThread();final LogicalSlot slot = assignedResource;checkNotNull(slot, "In order to deploy the execution we first have to assign a resource via tryAssignResource.");// Check if the TaskManager died in the meantime// This only speeds up the response to TaskManagers failing concurrently to deployments.// The more general check is the rpcTimeout of the deployment call// 资源必须是可用状态if (!slot.isAlive()) {throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");}// make sure exactly one deployment call happens from the correct state// note: the transition from CREATED to DEPLOYING is for testing purposes onlyExecutionState previous = this.state;// 执行状态从SCHEDULED或CREATED转换成DEPLOYINGif (previous == SCHEDULED || previous == CREATED) {if (!transitionState(previous, DEPLOYING)) {// race condition, someone else beat us to the deploying call.// this should actually not happen and indicates a race somewhere elsethrow new IllegalStateException("Cannot deploy task: Concurrent deployment call race.");}}else {// vertex may have been cancelled, or it was already scheduledthrow new IllegalStateException("The vertex must be in CREATED or SCHEDULED state to be deployed. Found state " + previous);}// 检查slot是否分配给了当前这个Executionif (this != slot.getPayload()) {throw new IllegalStateException(String.format("The execution %s has not been assigned to the assigned slot.", this));}try {// race double check, did we fail/cancel and do we need to release the slot?// 再次检查状态是否是部署中(DEPLOYING)if (this.state != DEPLOYING) {slot.releaseSlot(new FlinkException("Actual state of execution " + this + " (" + state + ") does not match expected state DEPLOYING."));return;}LOG.info("Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(),attemptNumber, vertex.getCurrentExecutionAttempt().getAttemptId(), getAssignedResourceLocation(), slot.getAllocationId());if (taskRestore != null) {checkState(taskRestore.getTaskStateSnapshot().getSubtaskStateMappings().stream().allMatch(entry ->entry.getValue().getInputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE) &&entry.getValue().getOutputRescalingDescriptor().equals(InflightDataRescalingDescriptor.NO_RESCALE)),"Rescaling from unaligned checkpoint is not yet supported.");}// 创建Task部署描述符,用来创建Taskfinal TaskDeploymentDescriptor deployment = TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber).createDeploymentDescriptor(slot.getAllocationId(),slot.getPhysicalSlotNumber(),taskRestore,producedPartitions.values());// null taskRestore to let it be GC'edtaskRestore = null;// 获取提供资源的TaskManagerGateway// 用来和TaskManager通信final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();final ComponentMainThreadExecutor jobMasterMainThreadExecutor =vertex.getExecutionGraph().getJobMasterMainThreadExecutor();getVertex().notifyPendingDeployment(this);// We run the submission in the future executor so that the serialization of large TDDs does not block// the main thread and sync back to the main thread once submission is completed.// RPC调用,告诉TaskManager创建一个Task,执行当前ExecutionCompletableFuture.supplyAsync(() -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor).thenCompose(Function.identity()).whenCompleteAsync((ack, failure) -> {if (failure == null) {vertex.notifyCompletedDeployment(this);} else {if (failure instanceof TimeoutException) {String taskname = vertex.getTaskNameWithSubtaskIndex() + " (" + attemptId + ')';markFailed(new Exception("Cannot deploy task " + taskname + " - TaskManager (" + getAssignedResourceLocation()+ ") not responding after a rpcTimeout of " + rpcTimeout, failure));} else {markFailed(failure);}}},jobMasterMainThreadExecutor);}catch (Throwable t) {markFailed(t);if (isLegacyScheduling()) {ExceptionUtils.rethrow(t);}}}
最后,我们经历了种种曲折又复杂的过程,终于到了TaskManager执行task这一步。Task的创建和执行作者打算单独开篇来讲解。到此为止,Flink作业执行流程已分析完毕。
