前面的内容我们已经了解了Watermark是什么,简单来说Watermark就是用来解决乱序事件和迟到时间的一个机制,能够控制接入的数据何时触发计算,让程序可以等待一定时间,来获取迟到数据。
Flink在计算时如果使用事件时间,则必须要指定Watermark,接下来我们来看一下Watermark的API。
使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。
WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:
public interface WatermarkStrategy<T>
extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{
/**
* 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
*/
@Override
TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);
/**
* 根据策略实例化一个 watermark 生成器。
*/
@Override
WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
例如Flink中的一个Watermark实现类源码:
final class WatermarkStrategyWithTimestampAssigner<T> implements WatermarkStrategy<T> {
private static final long serialVersionUID = 1L;
private final WatermarkStrategy<T> baseStrategy;
private final TimestampAssignerSupplier<T> timestampAssigner;
WatermarkStrategyWithTimestampAssigner(
WatermarkStrategy<T> baseStrategy, TimestampAssignerSupplier<T> timestampAssigner) {
this.baseStrategy = baseStrategy;
this.timestampAssigner = timestampAssigner;
}
@Override
public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return timestampAssigner.createTimestampAssigner(context);
}
@Override
public WatermarkGenerator<T> createWatermarkGenerator(
WatermarkGeneratorSupplier.Context context) {
return baseStrategy.createWatermarkGenerator(context);
}
}
通常情况下,我们不用自己实现此接口,而是可以使用 WatermarkStrategy 工具类中通用的 watermark策略,或者可以使用这个工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定。例如,你想要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
我们再分别看一下新老版本的用法
早期版本写法(1.11之前):
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
代码例:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Transaction> stream = env.readFile("123.txt");
SingleOutputStreamOperator<Transaction> amount1 = dataStream.assignTimestampsAndWatermarks(
new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.milliseconds(1000)) {
@Override
public long extractTimestamp(Transaction element) {
return element.getTimestamp();
}
})
.keyBy(Transaction::getAccountId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.sum("amount");
这里直接新建了一个匿名类,继承自抽象类BoundedOutOfOrdernessTimestampExtractor,BoundedOutOfOrdernessTimestampExtractor又实现了AssignerWithPeriodicWatermarks。
BoundedOutOfOrdernessTimestampExtractor有一个抽象方法需要实现,就是指定对象的事件时间
public long extractTimestamp(Transaction element) {
return element.getTimestamp();
}
新版本的写法(例如我使用的1.14.3):
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)
代码例:
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());