API 迁移指南
从Flink 1.3+ 到 Flink 1.7
TypeSerializer 的变化
这部分主要与实现TypeSerializer接口来自定义序列化的用户有关。
原来的 TypeSerializerConfigSnapshot 抽象接口被弃用了, 并且将在将来完全删除,取而代之的是新的 TypeSerializerSnapshot. 详情请参考 Migrating from deprecated serializer snapshot APIs before Flink 1.7。
从 Flink 1.2 迁移到 Flink 1.3
自Flink 1.2以来,有一些API已被更改。大多数更改都记录在其特定文档中。以下是API更改的综合列表以及升级到Flink 1.3时迁移详细信息的链接。
TypeSerializer 接口变化
这主要适用于自定义 TypeSerializer接口的用户
从Flink 1.3开始,添加了两个与保存点恢复的串行器兼容性相关的其他方法。 有关如何实现这些方法的更多详细信息,请参阅 序列化升级和兼容性
ProcessFunction 是 RichFunction
从Flink 1.2, ProcessFunction 引入,并有了多种实现例如 RichProcessFunction 。 从Flink 1.3,开始RichProcessFunction 被移除了, 现在 ProcessFunction 始终是 RichFunction 并且可以访问运行时上下文。
Flink CEP 库API更改
Flink 1.3中的CEP库新增了许多新函数,请参阅 CEP迁移文档 。
Flink core 中移除了Logger的依赖
Flink1.3以后,用户可以选用自己期望的日志框架了,Flink移除了日志记录框架的依赖。
实例和快速入门的demo已经指定了日志记录器,不会有问题,其他项目,请确保添加日志依赖,比如Maven的 pom.xml,中需要增加
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.7</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.17</version></dependency>
从 Flink 1.1 到 Flink 1.2的迁移
正如 状态文档中所说,Flink 有两种状态: keyed 和 non-keyed 状态 (也被称作 operator 状态). 这两种类型都可用于 算子和用户定义的函数。文档将指导您完成从Flink 1.1函数代码迁移到Flink 1.2的过程,并介绍Flink 1.2中引入的一些重要内部更改,这些改变涉及到Flink 1.1中对齐窗口操作的弃用。 (请参阅 时间对齐窗口算子Aligned Processing Time Window Operators).
迁移有两个目标:
引入Flink1.2中引入的新函数,比如自适应(rescaling)
确保新Flink 1.2作业能够从Flink 1.1的保存点恢复执行
按照本指南操作可以把正在运行的Flink1.1作业迁移到Flink1.2中。前提需要在Flink1.1中使用 保存点 并把保存点作为Flink1.2作业的起点。这样,Flink1.2就可以从之前中断的位置恢复执行了。
用户函数示例
本文档其余部分使用 CountMapper 和 BufferingSink 函数作为示例。第一个函数是 keyed 状态,第二个不是 s non-keyed 状态,Flink 1.1中上述两个函数的代码如下:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>> {private transient ValueState<Integer> counter;private final int numberElements;public CountMapper(int numberElements) {this.numberElements = numberElements;}@Overridepublic void open(Configuration parameters) throws Exception {counter = getRuntimeContext().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {int count = counter.value() + 1;counter.update(count);if (count % numberElements == 0) {out.collect(Tuple2.of(value.f0, count));counter.update(0); // reset to 0}}}public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,Checkpointed<ArrayList<Tuple2<String, Integer>>> {private final int threshold;private ArrayList<Tuple2<String, Integer>> bufferedElements;BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic ArrayList<Tuple2<String, Integer>> snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {return bufferedElements;}@Overridepublic void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {bufferedElements.addAll(state);}}
CountMapper 是一个按表格分组输入(word, 1)的 RichFlatMapFunction , 函数为每个传入的key保存一个计数器 (ValueState<Integer> counter) 并且 ,如果某个单词的出现次数超过用户提供的阈值,则会发出一个包含单词本身和出现次数的元组。
BufferingSink 是一个 SinkFunction 接收方 ( CountMapper可能的输出) ,直到达到用户定义的最终状态之前会一直缓存数据,这可以避免频繁的对数据库或者是存储系统的操作,通常这些操作都是比较耗时或开销比较大的。为了以容错方式进行缓冲,缓冲数据元保存在列表(bufferedElements) 列表会被定期被检查点保存。
状态 API 迁移
要使用Flink 1.2的新函数,应修改上面的代码来完成新的状态抽象。完成这些更改后,就可以实现作业的并行度的修改(向上或向下扩展),并确保新版本的作业将从之前作业停止的位置开始执行。
Keyed State: 需要注意的是,如果代码中只有keyed state,那么Flink1.1的代码也适用于1.2版本,并且完全支持新函数和向下兼容。可以仅针代码格式进行更改,但这只是风格/习惯问题。
综上所述,本章我们重点阐述 non-keyed state的迁移
自适应和新状态抽象
第一个修改是 Checkpointed<T extends Serializable> 接口有了新的实现。在 Flink 1.2中,有状态函数可以实现更通用的 CheckpointedFunction 接口或 ListCheckpointed<T extends Serializable> 接口 ,和之前版本的 Checkpointed 类似。
在这两种情况中,非键合状态预期是一个 可序列化 的 List ,对象彼此独立,这样可以在自适应的时候重新分配,意味着, 这些对象是可以重新分区非被Keys化状态的最细粒度。例如,如果并行度为1的 BufferingSink 有 (test1, 2) 和 (test2, 2)两个数据,当并行度增加到2时, (test1, 2) 可能在task 0中,而 (test2, 2) 可能在task 1中。
更详细的信息可以参阅状态文档.
ListCheckpointed
ListCheckpointed 接口需要实现两个方法:
List<T> snapshotState(long checkpointId, long timestamp) throws Exception;void restoreState(List<T> state) throws Exception;
它的语义和之前的 Checkpointed 接口类似,唯一区别是,现在 snapshotState() 返回的是检查点对象列表, 如前所述, restoreState 必须在恢复的时候,处理这个列表。如果状态不是重新分区,可以随时返回 Collections.singletonList(MY_STATE) 的 snapshotState()。 更新的代码 BufferingSink 如下:
public class BufferingSinkListCheckpointed implementsSinkFunction<Tuple2<String, Integer>>,ListCheckpointed<Tuple2<String, Integer>>,CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSinkListCheckpointed(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value) throws Exception {this.bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic List<Tuple2<String, Integer>> snapshotState(long checkpointId, long timestamp) throws Exception {return this.bufferedElements;}@Overridepublic void restoreState(List<Tuple2<String, Integer>> state) throws Exception {if (!state.isEmpty()) {this.bufferedElements.addAll(state);}}@Overridepublic void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {// this is from the CheckpointedRestoring interface.this.bufferedElements.addAll(state);}}
更新后的函数也实现了 CheckpointedRestoring 接口。这是出于向后兼容性原因,更多细节将在本节末尾解释。
CheckpointedFunction
CheckpointedFunction 接口也需要实现这两个方法。
void snapshotState(FunctionSnapshotContext context) throws Exception;void initializeState(FunctionInitializationContext context) throws Exception;
在Flink 1.1中, 检查点执行会调用snapshotState() 方法,但是现在当用户每次初始化自定义函数时,会调用 initializeState() (对应 restoreState()) ,而不是在恢复的情况下调用,鉴于此, initializeState() 不仅是初始化不同类型状态的地方,而且还包括状态恢复逻辑。实现了 CheckpointedFunction 接口的 BufferingSink 代码如下所示
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>,CheckpointedFunction, CheckpointedRestoring<ArrayList<Tuple2<String, Integer>>> {private final int threshold;private transient ListState<Tuple2<String, Integer>> checkpointedState;private List<Tuple2<String, Integer>> bufferedElements;public BufferingSink(int threshold) {this.threshold = threshold;this.bufferedElements = new ArrayList<>();}@Overridepublic void invoke(Tuple2<String, Integer> value) throws Exception {bufferedElements.add(value);if (bufferedElements.size() == threshold) {for (Tuple2<String, Integer> element: bufferedElements) {// send it to the sink}bufferedElements.clear();}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {checkpointedState.clear();for (Tuple2<String, Integer> element : bufferedElements) {checkpointedState.add(element);}}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");if (context.isRestored()) {for (Tuple2<String, Integer> element : checkpointedState.get()) {bufferedElements.add(element);}}}@Overridepublic void restoreState(ArrayList<Tuple2<String, Integer>> state) throws Exception {// this is from the CheckpointedRestoring interface.this.bufferedElements.addAll(state);}}
initializeState 方法是需要传入 FunctionInitializationContext,用于初始化non-keyed 状态的 “容器”,容器的类型是 ListState,供 non-keyed 状态的对象被检查点存储时使用:
this.checkpointedState = context.getOperatorStateStore().getSerializableListState("buffered-elements");
初始化之后,调用 isRestored() 方法可以获取当前是否在恢复。如果是 true, 表示正在恢复。
正如下面的代码所示,在状态初始化期间恢复 BufferingSink, ListState 中保存的变量可以被 snapshotState()使用, ListState 会清除掉之前检查点存储的对象,然后存储当前检查点的对象。
当然, keyed 状态也可以在 initializeState() 方法中初始化, 可以使用 FunctionInitializationContext 来完成初始化,而不是使用Flink1.1中的 RuntimeContext,如果CheckpointedFunction 要在 CountMapper 中使用该接口,则可以不使用 open() 方法, snapshotState() 和 initializeState() 方法如下所示:
public class CountMapper extends RichFlatMapFunction<Tuple2<String, Integer>, Tuple2<String, Integer>>implements CheckpointedFunction {private transient ValueState<Integer> counter;private final int numberElements;public CountMapper(int numberElements) {this.numberElements = numberElements;}@Overridepublic void flatMap(Tuple2<String, Integer> value, Collector<Tuple2<String, Integer>> out) throws Exception {int count = counter.value() + 1;counter.update(count);if (count % numberElements == 0) {out.collect(Tuple2.of(value.f0, count));counter.update(0); // reset to 0}}@Overridepublic void snapshotState(FunctionSnapshotContext context) throws Exception {// all managed, nothing to do.}@Overridepublic void initializeState(FunctionInitializationContext context) throws Exception {counter = context.getKeyedStateStore().getState(new ValueStateDescriptor<>("counter", Integer.class, 0));}}
请注意, snapshotState() 方法为空,因为Flink本身负责在检查点时就会存储Keys化的对象
向后兼容Flink 1.1
到目前为止,我们已经了解如何修改函数来引入Flink 1.2的新函数。剩下的问题是“我可以确保我的修改后的(Flink 1.2)作业将从我从Flink 1.1运行的作业停止的位置开始吗?”。
答案是肯定的,而这样做的方式非常简单。对于被Keys化状态,什么都不需要做。Flink将负责从Flink 1.1恢复状态。对于非被Keys化状态,新函数必须像上面代码一样实现 CheckpointedRestoring 接口,还有个办法,需要熟悉Flink1.1的 restoreState() 和Checkpointed 接口,然后修改 BufferingSink, restoreState() 方法完成和之前一样的功能。
时间对齐窗口算子
在Flink 1.1中,只有在没有指定的触发器的时, timeWindow() 才会实例化特殊的数据类型 WindowOperator。 它可以是 AggregatingProcessingTimeWindowOperator 或 AccumulatingProcessingTimeWindowOperator.。这两个算子都可以称之为时间对齐窗口算子,因为它们假定输入数据是按顺序到达,当在处理时间中操作时,这是有效的,元素到达窗口操作时获得时间。算子仅限于使用内存状态,并且优化了用于存储数据的元素结构。
在Flink 1.2中,不推荐使用对齐窗口算子,并且所有窗口算子操作都通过泛型WindowOperator来实现。迁移不需要更改Flink 1.1作业的代码,因为Flink将读取Flink 1.1保存点中对齐的窗口 算子存储的状态,将其转换为与泛型相兼容的格式 WindowOperator,并使用通用的 WindowOperator。
虽然是已经弃用这个方法,但是在Flink 1.2 中仍然是可以使用的,通过特殊的 WindowAssigners 可以实现这个目的。 SlidingAlignedProcessingTimeWindows 和 TumblingAlignedProcessingTimeWindows assigners,分别是滑动窗口和滚动窗口,使用对齐窗口的Flink 1.2作业必须是一项新作业,因为在使用这些 算子时无法从Flink 1.1保存点恢复执行。
注意时间对齐窗口算子不提供自适应 而且 不向下兼容 Flink 1.1.
在Flink 1.2中使用对齐窗口 算子的代码如下所示:
// for tumbling windowsDataStream<Tuple2<String, Integer>> window1 = source.keyBy(0).window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))).apply(your-function)// for sliding windowsDataStream<Tuple2<String, Integer>> window1 = source.keyBy(0).window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))).apply(your-function)
// for tumbling windows val window1 = source.keyBy(0).window(TumblingAlignedProcessingTimeWindows.of(Time.of(1000, TimeUnit.MILLISECONDS))).apply(your-function)// for sliding windows val window2 = source.keyBy(0).window(SlidingAlignedProcessingTimeWindows.of(Time.seconds(1), Time.milliseconds(100))).apply(your-function)
