Flink从入门到实战六[Watermark]-2-Watermark API深入理解

前面的内容我们已经了解了Watermark是什么,简单来说Watermark就是用来解决乱序事件和迟到时间的一个机制,能够控制接入的数据何时触发计算,让程序可以等待一定时间,来获取迟到数据。

Flink在计算时如果使用事件时间,则必须要指定Watermark,接下来我们来看一下Watermark的API。

使用 Flink API 时需要设置一个同时包含 TimestampAssigner 和 WatermarkGenerator 的 WatermarkStrategy。
WatermarkStrategy 工具类中也提供了许多常用的 watermark 策略,并且用户也可以在某些必要场景下构建自己的 watermark 策略。WatermarkStrategy 接口如下:

public interface WatermarkStrategy<T> 
	extends TimestampAssignerSupplier<T>, WatermarkGeneratorSupplier<T>{

	/**
	 * 根据策略实例化一个可分配时间戳的 {@link TimestampAssigner}。
	 */
	@Override
	TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);

	/**
	 * 根据策略实例化一个 watermark 生成器。
	 */
	@Override
	WatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}

例如Flink中的一个Watermark实现类源码:

final class WatermarkStrategyWithTimestampAssigner<T> implements WatermarkStrategy<T> {

    private static final long serialVersionUID = 1L;

    private final WatermarkStrategy<T> baseStrategy;
    private final TimestampAssignerSupplier<T> timestampAssigner;

    WatermarkStrategyWithTimestampAssigner(
            WatermarkStrategy<T> baseStrategy, TimestampAssignerSupplier<T> timestampAssigner) {
        this.baseStrategy = baseStrategy;
        this.timestampAssigner = timestampAssigner;
    }

    @Override
    public TimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
        return timestampAssigner.createTimestampAssigner(context);
    }

    @Override
    public WatermarkGenerator<T> createWatermarkGenerator(
            WatermarkGeneratorSupplier.Context context) {
        return baseStrategy.createWatermarkGenerator(context);
    }
}	

通常情况下,我们不用自己实现此接口,而是可以使用 WatermarkStrategy 工具类中通用的 watermark策略,或者可以使用这个工具类将自定义的 TimestampAssigner 与 WatermarkGenerator 进行绑定。例如,你想要使用有界无序(bounded-out-of-orderness)watermark 生成器和一个 lambda 表达式作为时间戳分配器,那么可以按照如下方式实现:

WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());	

我们再分别看一下新老版本的用法

早期版本写法(1.11之前):

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPunctuatedWatermarks<T> timestampAndWatermarkAssigner)
public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner)
	

代码例:

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
	DataStream<Transaction> stream = env.readFile("123.txt");
	SingleOutputStreamOperator<Transaction> amount1 = dataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.milliseconds(1000)) {
            @Override
            public long extractTimestamp(Transaction element) {
                return element.getTimestamp();
            }
        })
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("amount");

这里直接新建了一个匿名类,继承自抽象类BoundedOutOfOrdernessTimestampExtractor,BoundedOutOfOrdernessTimestampExtractor又实现了AssignerWithPeriodicWatermarks。
BoundedOutOfOrdernessTimestampExtractor有一个抽象方法需要实现,就是指定对象的事件时间

	public long extractTimestamp(Transaction element) {
		return element.getTimestamp();
	}

新版本的写法(例如我使用的1.14.3):

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(WatermarkStrategy<T> watermarkStrategy)

代码例:

WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());