Flink从入门到实战六[Watermark]-1-Watermark是什么

要了解Watermark,要先了解Flink中的时间语义,Flink中的时间分为三种:
Event Time 事件时间:业务数据中的数据产生的时间
Ingestion Time 摄取时间:数据到达Flink中的时间
Processing Time 处理时间:Flink处理数据的时间

Watermarks是什么
我们使用Flink处理实时数据时,数据不会完全按照发生的顺序进入到Flink中,当一个计算窗口触发计算,属于这个窗口的数据,可能迟迟没有进入到Flink中参与计算。
Watermark就是用来解决乱序事件和迟到时间的一个机制,能够控制接入的数据何时触发计算,让程序可以等待一定时间,来获取迟到数据。
Watermark使用的是数据中的事件时间来计算更新的,并且必须单调递增,以确保任务的事件时间在向前推进,而不是在后退。

理解Watermark
理想情况下,数据按照时间发生的时间依次进入到Flink中,被分配到对应的窗口进行计算,如下:

	35 32 || 28 25 23 || 19 17 13 || 7 5 1  →→  Flink 

每个数字的值都代表一个时间发生的时间,值越大代表时间发生之间越靠后,数字从右到左,是按照从小到大的顺序到达Flink程序的。

实际情况则是数据没有按照实际发生的事件时间的先后顺序到达Flink参与计算,如下:

	35 32 ||  25 19 23 || 28 17 5 || 13 7  1  →→  Flink 

Watermarks的产生
Source生成watermaker,经过map转换,带到下游的算子中,算子则根据来到的数据中附带的Watermark,更新算子自身的Watermark。

Watermarks的计算方式
watermaker = max EventTime – LateTime,即:watermaker = 最大事件时间-允许的最大延时时间。
watermaker根据数据的时间计算生成最新的watermaker,从而来判断是否有窗口触发了执行条件。
注意:LateTime越大,延时越高。

窗口触发计算的条件
窗口触发条件 Current watermaker > Window EndTime

Watermarks生成方式
Periodic Watermarks(周期生成,常用) :Based on Event Time
Punctuated Watermarks (标记生成):Based on something in the event stream

Watermarks API
早期版本写法(1.11之前):

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)

代码例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Transaction> stream = env.readFile("123.txt");
SingleOutputStreamOperator<Transaction> amount1 = dataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.milliseconds(1000)) {
            @Override
            public long extractTimestamp(Transaction element) {
                return element.getTimestamp();
            }
        })
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("amount");

这里直接新建了一个匿名类,继承自抽象类BoundedOutOfOrdernessTimestampExtractor,BoundedOutOfOrdernessTimestampExtractor又实现了AssignerWithPeriodicWatermarks。
BoundedOutOfOrdernessTimestampExtractor有一个抽象方法需要实现,就是指定对象的事件时间

public long extractTimestamp(Transaction element) {
	return element.getTimestamp();
}

新版本的写法(例如我使用的1.14.3):

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

代码例:

WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());