Operator chain条件
一共4块包含7条原则,全部满足则可以chain到一起,具体如下。
1.上下游满足双射(单映射又是满映射)
- 上下游的并行度一致
- 下游节点的入度为1 (也就是说下游节点没有来自其他节点的输入)
两个节点间数据分区方式是ForwardPartitioner
2.SlotSharingGroup
-
3.ChainingStrategy
下游节点的 chain 策略为 ALWAYS(可以与上下游链接,map、flatmap、filter等默认是ALWAYS)
上游节点的 chain 策略为 ALWAYS 或 HEAD(只能与下游链接,不能与上游链接,Source默认是HEAD)
4.全局配置
-
Chain规则源码
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);StreamOperatorFactory<?> headOperator = upStreamVertex.getOperatorFactory();StreamOperatorFactory<?> outOperator = downStreamVertex.getOperatorFactory();return downStreamVertex.getInEdges().size() == 1&& outOperator != null&& headOperator != null&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)&& (edge.getPartitioner() instanceof ForwardPartitioner)&& edge.getShuffleMode() != ShuffleMode.BATCH&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()&& streamGraph.isChainingEnabled();}
Chain策略(ChainingStrategy)
一共就三种ALWAYS、NEVER、HEAD
- ALWAYS:可以与上下游链接,map、flatmap、filter等默认是ALWAYS。
- HEAD:只能与下游链接,不能与上游链接,Source默认是HEAD。
NEVER:上下游都不链接,算子自己独享一个Task。
public enum ChainingStrategy {/*** Operators will be eagerly chained whenever possible.** <p>To optimize performance, it is generally a good practice to allow maximal* chaining and increase operator parallelism.*/ALWAYS,/*** The operator will not be chained to the preceding or succeeding operators.*/NEVER,/*** The operator will not be chained to the predecessor, but successors may chain to this* operator.*/HEAD}
设置Chain
通过调用StreamExecutionEnvironment.disableOperatorChaining()来全局禁用chaining。
map().startNewChain().map// 独占Taskmap().disableChaining()//map().slotSharingGroup("group-name")
方法设置Chain的基本原理是通过改变Chain策略,具体对应如下。
startNewChain() //对应的策略是后面算子 ChainingStrategy.HEADdisableChaining() //对应的策略是 ChainingStrategy.NEVER
内部优化选项
Chained 的 Operators 之间数据默认序列化后拷贝传递。
通过下面代码可以开启对象重用,关闭深拷贝。
env.getConfig().enableObjectReuse();
注意:
慎用!必须要确保下游Function只有一种,或者下游的Function均不会改变对象内部的值。否则可能会有线程安全的问题。官方建议阅读源码 org.apache.flink.streaming.runtime.tasks.OperatorChain 与 org.apache.flink.streaming.runtime.tasks.OperatorChain.CopyingChainingOutput。完全了解 reuse 内部机制后之后再使用。用不好会出现线程安全问题。
