Flink从入门到实战六[Watermark]-5-Flink自带的Watermark

Flink自带的Watermark类,最主要的就是BoundedOutOfOrdernessWatermarks。
BoundedOutOfOrdernessWatermarks实现了WatermarkGenerator接口,实现了两个方法:onEvent和onPeriodicEmit。
BoundedOutOfOrdernessWatermarks的源码中最大无序时间是由构造器传入的,和我们自定义的写法基本相同。

@Public
public class BoundedOutOfOrdernessWatermarks<T> implements WatermarkGenerator<T> {

    /** The maximum timestamp encountered so far. */
    private long maxTimestamp;

    /** The maximum out-of-orderness that this watermark generator assumes. */
    private final long outOfOrdernessMillis;

    /**
     * Creates a new watermark generator with the given out-of-orderness bound.
     *
     * @param maxOutOfOrderness The bound for the out-of-orderness of the event timestamps.
     */
    public BoundedOutOfOrdernessWatermarks(Duration maxOutOfOrderness) {
        checkNotNull(maxOutOfOrderness, "maxOutOfOrderness");
        checkArgument(!maxOutOfOrderness.isNegative(), "maxOutOfOrderness cannot be negative");

        this.outOfOrdernessMillis = maxOutOfOrderness.toMillis();

        // start so that our lowest watermark would be Long.MIN_VALUE.
        this.maxTimestamp = Long.MIN_VALUE + outOfOrdernessMillis + 1;
    }

    // ------------------------------------------------------------------------

    @Override
    public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
        maxTimestamp = Math.max(maxTimestamp, eventTimestamp);
    }

    @Override
    public void onPeriodicEmit(WatermarkOutput output) {
        output.emitWatermark(new Watermark(maxTimestamp - outOfOrdernessMillis - 1));
    }
}

我们基于前面章节和本章节上面的内容,来自定义一个Watermark生成器,并应用,代码如下:

package com.itzhimei.watermark;

import org.apache.flink.api.common.eventtime.*;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 自定义Watermark生成器
 * 基于事件时间滚动1小时计算用户交易总金额
 * Watermark时间为1小时
 */
public class Watermark_2_Define {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");
        dataStream.print("source");

        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .forGenerator((ctx) -> new MyWatermarkGenerator())
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .sum("amount");

        amount.print("amount");
        env.execute();

    }

    private static class MyWatermarkGenerator implements WatermarkGenerator<Transaction> {

        private final long maxOutOfOrderness = 1000*60*60;
        private long currentMaxTimestamp = Long.MIN_VALUE;

        @Override
        public void onEvent(Transaction event, long eventTimestamp, WatermarkOutput output) {
            System.out.println("Transaction Timestamp:" + event.getTimestamp() + "  eventTimestamp:" + eventTimestamp);
            currentMaxTimestamp = Math.max(currentMaxTimestamp, eventTimestamp);
        }

        @Override
        public void onPeriodicEmit(WatermarkOutput output) {
            output.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));
        }

    }
}

/* 输出
Transaction Timestamp:1546272000000  eventTimestamp:1546272000000
Transaction Timestamp:1546272360000  eventTimestamp:1546272360000
source:13> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
Transaction Timestamp:1546272720000  eventTimestamp:1546272720000
source:14> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
Transaction Timestamp:1546273080000  eventTimestamp:1546273080000
source:15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
source:16> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
Transaction Timestamp:1546273440000  eventTimestamp:1546273440000
source:1> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
Transaction Timestamp:1546273800000  eventTimestamp:1546273800000
Transaction Timestamp:1546274160000  eventTimestamp:1546274160000
source:2> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
source:3> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
Transaction Timestamp:1546274520000  eventTimestamp:1546274520000
Transaction Timestamp:1546274880000  eventTimestamp:1546274880000
source:4> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
source:5> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
Transaction Timestamp:1546275240000  eventTimestamp:1546275240000
Transaction Timestamp:1546275600000  eventTimestamp:1546275600000
source:6> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
Transaction Timestamp:1546275960000  eventTimestamp:1546275960000
source:7> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
Transaction Timestamp:1546276320000  eventTimestamp:1546276320000
source:8> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
source:9> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
Transaction Timestamp:1546276680000  eventTimestamp:1546276680000
Transaction Timestamp:1546277040000  eventTimestamp:1546277040000
source:10> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
Transaction Timestamp:1546277400000  eventTimestamp:1546277400000
source:11> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
Transaction Timestamp:1546277760000  eventTimestamp:1546277760000
source:12> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
Transaction Timestamp:1546278120000  eventTimestamp:1546278120000
source:13> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
source:14> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
source:15> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
Transaction Timestamp:1546278480000  eventTimestamp:1546278480000
Transaction Timestamp:1546278840000  eventTimestamp:1546278840000
Transaction Timestamp:1546279200000  eventTimestamp:1546279200000
source:16> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
Transaction Timestamp:1546279560000  eventTimestamp:1546279560000
source:1> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
source:2> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
 */