Flink从入门到实战六[Watermark]-3-Watermark的使用

WatermarkStrategy 可以在 Flink 应用程序中的两处使用,第一种是直接在数据源上使用,第二种是直接在非数据源的操作之后使用,我们平时开发基本都是使用第一种。

使用方法例如:

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<MyEvent> stream = env.readFile(
		myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,
		FilePathFilter.createDefaultFilter(), typeInfo);

DataStream<MyEvent> withTimestampsAndWatermarks = stream
		.filter( event -> event.severity() == WARNING )
		.assignTimestampsAndWatermarks(<watermark strategy>);

withTimestampsAndWatermarks
		.keyBy( (event) -> event.getGroup() )
		.window(TumblingEventTimeWindows.of(Time.seconds(10)))
		.reduce( (a, b) -> a.add(b) )
		.addSink(...);

使用Flink提供的默认的Watermark进行数据计算,代码:

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

import java.time.Duration;

/**
 * Watermark
 * 基于事件时间滚动1小时计算用户交易总金额
 * Watermark时间为15分钟
 */
public class Watermark {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");
        dataStream.print("source");

        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .sum("amount");

        amount.print("amount");
        env.execute();

    }
}

/* 输出
source:14> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
source:15> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
source:16> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
source:1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
source:2> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
source:3> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
source:4> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
source:5> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
source:6> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
source:7> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
source:8> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
source:9> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
source:10> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
source:11> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
source:12> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
source:13> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
source:14> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
source:15> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
source:16> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
source:1> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
source:2> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
source:3> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
source:4> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
source:5> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=1254.74}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=992.2100000000002}
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=433.88899999999995}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=1536.2900000000002}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=1244.56}
source:6> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
source:7> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
source:8> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
source:9> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
source:10> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
source:11> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
source:12> Transaction{accountId=1, timestamp=1546282800000, amount=56.12}
source:13> Transaction{accountId=2, timestamp=1546283160000, amount=256.48}
source:14> Transaction{accountId=3, timestamp=1546283520000, amount=148.16}
source:15> Transaction{accountId=4, timestamp=1546283880000, amount=199.95}
 */