1.根据SqlQueryExecution::analyzeQuery方法生成subPlan
2.根据获取subPlan信息,在SqlQueryExecution::planDistribution 方法调用了DistributedExecutionPlanner的plan方法
private void planDistribution(PlanRoot plan){// time distribution planning 时间分配计划stateMachine.beginDistributedPlanning();// plan the execution on the active nodes 计划活动节点上的执行DistributedExecutionPlanner distributedPlanner = new DistributedExecutionPlanner(splitManager, metadata);StageExecutionPlan outputStageExecutionPlan;Session session = stateMachine.getSession();Map<Integer, Integer> parallelSources;if (SystemSessionProperties.isSnapshotEnabled(session)) {// Snapshot: UNION statement creates situation where multiple table scan stages communicate with the// same ExchangeOperator, we need to be aware of that in order to adjust the taskCount on Markers coming from// those table scan stages to have successful snapshots.// The key represents the table scan stage's stageId and value represents how many table scan stages// are in parallel in total.//快照:UNION语句创建了多个表扫描阶段与同一个ExchangeOperator进行通信的情况,我们需要注意这一点,// 以便调整来自那些表扫描阶段的Marks上的taskCount以获得成功的快照。// 该键表示表扫描阶段的stageId,而值表示总共有多少个表扫描阶段并行。parallelSources = new HashMap<>();// Snapshot: need to plan different when snapshot is enabled. 启用快照时需要进行不同的计划// See the "plan" method for difference between the different modes. 不同模式的区别见“计划”方法MarkerAnnouncer announcer = splitManager.getMarkerAnnouncer(session);outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, SNAPSHOT, null, announcer.currentSnapshotId(), parallelSources);}else {parallelSources = null;// 将子计划生成对应的 阶段执行计划outputStageExecutionPlan = distributedPlanner.plan(plan.getRoot(), session, NORMAL, null, 0, null);}// 结束分发计划stateMachine.endDistributedPlanning();// ensure split sources are closed 确保分片数据源已经关闭stateMachine.addStateChangeListener(state -> {if (state.isDone()) {closeSplitSources(outputStageExecutionPlan);}});// if query was canceled, skip creating schedulerif (stateMachine.isDone()) {return;}// record output field 获取输出字段 -- 》 <字段名,字段类型> 映射关系stateMachine.setColumns(outputStageExecutionPlan.getFieldNames(), outputStageExecutionPlan.getFragment().getTypes());PartitioningHandle partitioningHandle = plan.getRoot().getFragment().getPartitioningScheme().getPartitioning().getHandle();OutputBuffers rootOutputBuffers = createInitialEmptyOutputBuffers(partitioningHandle).withBuffer(OUTPUT_BUFFER_ID, BROADCAST_PARTITION_ID).withNoMoreBufferIds();// build the stage execution objects (this doesn't schedule execution) 构建阶段执行对象(不是调度执行)SqlQueryScheduler scheduler = createSqlQueryScheduler(stateMachine,locationFactory,outputStageExecutionPlan,nodePartitioningManager,nodeScheduler,remoteTaskFactory,stateMachine.getSession(),plan.isSummarizeTaskInfos(),scheduleSplitBatchSize,queryExecutor,schedulerExecutor,failureDetector,rootOutputBuffers,nodeTaskMap,executionPolicy,schedulerStats,dynamicFilterService,heuristicIndexerManager,snapshotManager,null,parallelSources);queryScheduler.set(scheduler);// if query was canceled during scheduler creation, abort the scheduler// directly since the callback may have already firedif (stateMachine.isDone()) {scheduler.abort();queryScheduler.set(null);}}
