Generating Timestamps / Watermarks 生成时间戳/水印
本节与事件时间上运行的程序相关。有关event time, processing time, 和 ingestion time 的介绍,请参阅介绍事件时间
要使用 event time,流程序需要相应地设置 time characteristic 。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
Assigning Timestamps 分配时间戳
为了处理 event time,Flink需要知道事件 timestamps ,这意味着流中的每个元素都需要有其事件时间戳 assigned。这通常是通过从元素中的某个字段访问/提取时间戳来完成的。
时间戳分配与生成水印同时进行,水印可以告诉系统事件时间的进展。
有两种分配时间戳和生成水印的方法:
- 直接在数据流源中
- 通过时间戳分配器/水印生成器:在FLink中,时间戳分配器还定义要发射的水印
注意时间戳和水印都被指定为自1970-01-01T00:00:00Z的Java时期以来的毫秒。
Source Functions with Timestamps and Watermarks 具有时间戳和水印的源函数
流源可以直接为它们生成的元素分配时间戳,还可以发出水印。完成此操作后,不需要时间戳分配器。注意,如果使用时间戳分配程序,则源提供的任何时间戳和水印都将被覆盖。
要将时间戳直接分配给源中的一个元素,源必须在 SourceContext
上使用 collectWithTimestamp(...)
方法。要生成水印,源必须调用emitWatermark(Watermark)
函数。
以下是分配时间戳并生成水印的 (non-checkpointed) Source的简单示例:
@Override
public void run(SourceContext<MyType> ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());
if (next.hasWatermarkTime()) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
}
}
}
override def run(ctx: SourceContext[MyType]): Unit = {
while (/* condition */) {
val next: MyType = getNext()
ctx.collectWithTimestamp(next, next.eventTimestamp)
if (next.hasWatermarkTime) {
ctx.emitWatermark(new Watermark(next.getWatermarkTime))
}
}
}
Timestamp Assigners / Watermark Generators 时间戳分配器/水印生成器
时间戳分配者获取一个流并生成一个具有时间戳元素和水印的新流。如果原始流已经具有时间戳和/或水印,则时间戳分配程序将覆盖它们。
时间戳分配程序通常是在数据源之后立即指定的,但并不严格要求这样做。例如,一个常见的模式是在时间戳分配器之前解析(MapFunction)和过滤器(FilterFunction)。在任何情况下,时间戳分配程序都需要在事件时间上的第一个操作(例如第一个窗口操作)之前指定。作为特例,当使用Kafka作为流作业的源时,Flink允许指定源(或使用者)内部的时间戳分配器/水印发射器。有关如何这样做的更多信息,可以在Kafka连接器documentation.]中找到。
注: 本部分的其余部分介绍了程序员必须实施的主要接口,以便创建自己的时间戳提取器/水印发射器。要查看带有FLink的预实现提取器,请参阅预定义的时间戳提取器/水印发射器页面。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<MyEvent> stream = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter(), typeInfo);
DataStream<MyEvent> withTimestampsAndWatermarks = stream
.filter( event -> event.severity() == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
withTimestampsAndWatermarks
.keyBy( (event) -> event.getGroup() )
.timeWindow(Time.seconds(10))
.reduce( (a, b) -> a.add(b) )
.addSink(...);
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val stream: DataStream[MyEvent] = env.readFile(
myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
FilePathFilter.createDefaultFilter())
val withTimestampsAndWatermarks: DataStream[MyEvent] = stream
.filter( _.severity == WARNING )
.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
withTimestampsAndWatermarks
.keyBy( _.getGroup )
.timeWindow(Time.seconds(10))
.reduce( (a, b) => a.add(b) )
.addSink(...)
With Periodic Watermarks 带有周期性水印
AssignerWithPeriodicWatermarks
分配时间戳并定期生成水印(可能取决于流元素,也可能完全取决于处理时间)。
生成水印的间隔(每 n 毫秒)是通过ExecutionConfig.setAutoWatermarkInterval(...)
定义的。如果返回的水印是非null且大于先前的水印,则将每次调用分配器的getCurrentWatermark()
方法,并且将发出新的水印。
这里我们展示了两个使用周期性水印生成的时间戳分配程序的简单示例。请注意,Flink附带了BoundedOutOfOrdernessTimestampExtractor
,类似于下面所示的BoundedOutOfOrdernessGenerator
,您可以阅读有关here.的内容。
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
* 当元件无序到达时,该发生器产生水印,
* 但仅在一定程度上。某个时间戳T的最新元素将到达
* 在时间戳T最早的元素之后的至多N毫秒。
*/
public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current highest timestamp minus the out-of-orderness bound
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
* 该生成器生成落后于处理时间的水印。
* 假定元素在有界延迟之后到达Flink。
*/
public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
private final long maxTimeLag = 5000; // 5 seconds
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark getCurrentWatermark() {
// return the watermark as current time minus the maximum time lag
return new Watermark(System.currentTimeMillis() - maxTimeLag);
}
}
/**
* This generator generates watermarks assuming that elements arrive out of order,
* but only to a certain degree. The latest elements for a certain timestamp t will arrive
* at most n milliseconds after the earliest elements for timestamp t.
* 这个生成器产生水印,假设元素到达不正常,
* 但只在一定程度上。某个时间戳t的最新元素将在时间戳t的最早元素之后最多n毫秒到达。
*/
class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 seconds
var currentMaxTimestamp: Long = _
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
val timestamp = element.getCreationTime()
currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
timestamp
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current highest timestamp minus the out-of-orderness bound
new Watermark(currentMaxTimestamp - maxOutOfOrderness)
}
}
/**
* This generator generates watermarks that are lagging behind processing time by a fixed amount.
* It assumes that elements arrive in Flink after a bounded delay.
* 该生成器生成落后于处理时间的水印。
* 假定元素在有界延迟之后到达Flink。
*/
class TimeLagWatermarkGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
val maxTimeLag = 5000L // 5 seconds
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def getCurrentWatermark(): Watermark = {
// return the watermark as current time minus the maximum time lag
new Watermark(System.currentTimeMillis() - maxTimeLag)
}
}
With Punctuated Watermarks 加上标点符号
若要在某一事件表明可能生成新水印时生成水印,请使用 AssignerWithPunctuatedWatermarks
。对于这个类,Flink将首先调用extractTimestamp(...)
方法为元素分配一个时间戳,然后立即调用该元素上的checkAndGetNextWatermark(...)
方法。
checkAndGetNextWatermark(...)
方法通过了在extractTimestamp(...)
方法中分配的时间戳,并可以决定它是否希望生成水印。每当checkAndGetNextWatermark(...)
方法返回非零水印,并且该水印大于最新的前一个水印时,将发射新的水印。
public class PunctuatedAssigner implements AssignerWithPunctuatedWatermarks<MyEvent> {
@Override
public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
return element.getCreationTime();
}
@Override
public Watermark checkAndGetNextWatermark(MyEvent lastElement, long extractedTimestamp) {
return lastElement.hasWatermarkMarker() ? new Watermark(extractedTimestamp) : null;
}
}
class PunctuatedAssigner extends AssignerWithPunctuatedWatermarks[MyEvent] {
override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
element.getCreationTime
}
override def checkAndGetNextWatermark(lastElement: MyEvent, extractedTimestamp: Long): Watermark = {
if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null
}
}
注意: 可以在每个单个事件上生成水印。然而,由于每个水印导致一些下游的计算,过多的水印会降低性能。
Timestamps per Kafka Partition 每个卡夫卡分区的##时间戳
当使用ApacheKafka作为数据源时,每个Kafka分区可能有一个简单的事件时间模式(升序时间戳或超出顺序的界限)。然而,当使用Kafka的流时,多个分区常常被并行地消耗,将事件与分区交织在一起,并破坏每个分区模式(这是Kafka的消费者客户端工作方式固有的)。
在这种情况下,您可以使用flink的卡夫卡分区感知水印生成.使用该特性,在Kafka使用者内部生成水印,每个Kafka分区,并且每个分区的水印以与流洗牌上合并水印相同的方式合并。
例如,如果事件时间戳是严格上升的每卡夫卡分区,生成每个分区的水印与上升的时间戳水印generator将导致完美的整体水印。
下图展示了如何使用per-Kafka分割水印生成,以及在这种情况下水印如何通过流数据流传播。
FlinkKafkaConsumer09<MyType> kafkaSource = new FlinkKafkaConsumer09<>("myTopic", schema, props);
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<MyType>() {
@Override
public long extractAscendingTimestamp(MyType element) {
return element.eventTimestamp();
}
});
DataStream<MyType> stream = env.addSource(kafkaSource);
val kafkaSource = new FlinkKafkaConsumer09[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(new AscendingTimestampExtractor[MyType] {
def extractAscendingTimestamp(element: MyType): Long = element.eventTimestamp
})
val stream: DataStream[MyType] = env.addSource(kafkaSource)
