As described in timestamps and watermark handling, Flink provides abstractions that allow the programmer to assign their own timestamps and emit their own watermarks. More specifically, one can do so by implementing one of the AssignerWithPeriodicWatermarks
and AssignerWithPunctuatedWatermarks
interfaces, depending on the use case. In a nutshell, the first will emit watermarks periodically, while the second does so based on some property of the incoming records, e.g. whenever a special element is encountered in the stream.
如时间戳和水印处理中所述,flink提供了允许程序员分配自己的时间戳并发出自己的水印的抽象。更具体地,可以通过实现AssignerWithPeriodicWatermarks具有周期性水印的分配器
和 AssignerWithPunctuatedWatermarks 具有标点符号的水印
接口中的一个来实现,这取决于使用情况。简而言之,第一将周期性地发射水印,而第二则基于输入记录的某些属性,例如每当在流中遇到特殊元素时。
为了进一步简化此类任务的编程工作,FLink附带了一些预先实现的时间戳分配器。本节提供了它们的列表。除了它们的开箱即用功能之外,它们的实现可以作为定制实现的示例。
Assigners with ascending timestamps 具有升序时间戳的分配方
生成周期水印最简单的特例是给定源任务所看到的时间戳按升序发生的情况。在这种情况下,当前时间戳始终可以充当水印,因为不会有更早的时间戳到达。
请注意,仅有必要时间戳是升序 per parallel data source task。例如,如果在特定的设置中,一个KAFKA分区由一个并行数据源实例读取,则仅有必要在每个Kafka分区中递增时间戳。每当并行流被混洗、统一、连接或合并时,flink的水印合并机制将产生正确的水印。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyEvent>() {
@Override
public long extractAscendingTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignAscendingTimestamps( _.getCreationTime )
Assigners allowing a fixed amount of lateness 指定人允许一定数量的迟到
周期性水印生成的另一个示例是当水印滞后于在流中看到的最大(事件时间)时间戳之后的预定时间量时。这种情况包括预先知道在流中可能遇到的最大延迟的情形,例如,当创建包含具有在用于测试的固定时间段内扩展的时间戳的元素的自定义源时。对于这些情况,flink提供了BoundedOutOfOrdernessTimestampExtractor
,该“TimeStampExtractor”作为“MaxouToForested”的参数,即允许在计算给定窗口的最终结果时忽略元素之前的最大时间量。迟到对应于“t-t_w”的结果,其中“t”是元素的(事件时间)时间戳,而“t_w”是前一水印的(event-time)时间戳。如果“LAYLE>;0”则该元素被认为是延迟的,并且默认情况下在计算其相应窗口的作业结果时被忽略。有关使用延迟元素的详细信息,请参阅有关允许的迟到的文档。
DataStream<MyEvent> stream = ...
DataStream<MyEvent> withTimestampsAndWatermarks =
stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<MyEvent>(Time.seconds(10)) {
@Override
public long extractTimestamp(MyEvent element) {
return element.getCreationTime();
}
});
val stream: DataStream[MyEvent] = ...
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[MyEvent](docs_1.7_Time.seconds(10))( _.getCreationTime ))