Flink从入门到实战五[Window]-10-Window Trigger

Window Trigger
触发器(Trigger)决定了何时启动Window Function来处理窗口中的数据以及何时将窗口内的数据清理。
Flink自带的Trigger大部分时间已经满足了,如果不满足,则可以自定义触发器来触发窗口执行。

Flink 内置 Window Trigger
ProcessingTimeTrigger 触发频率:一次触发,基于ProcessingTime 触发,当机器时间大于窗口结束时间时触发
EventTimeTrigger 触发频率:一次触发,基于EventTime,当Watermark 大于窗口结束时间触发
ContinuousProcessingTimeTrigger 触发频率:多次触发,基于 ProcessTime 的固定时间间隔触发
ContinuousEventTimeTrigger 触发频率:多次触发,基于 EventTime 的固定时间间隔触发
CountTrigger 触发频率:多次触发,基于 Element 的固定条数触发
DeltaTrigger 触发频率:多次触发,基于本次 Element和上次触发 Trigger 的 Element 做Delta 计算,超过指定 Threshold 后触发
PuringTrigger 对 Trigger 的封装实现,用于 Trigger 触发后额外清理中间状态数据

自定义Trigger
自定义的Trigger需要继承Trigger抽象类,按需实现以下方法:

.trigger(new Trigger<Transaction, TimeWindow>() {
                    @Override
                    public TriggerResult onElement(Transaction element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                        return null;
                    }

                    @Override
                    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return null;
                    }

                    @Override
                    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return null;
                    }

                    @Override
                    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

                    }
                })	

TriggerResult中包含四个枚举值:
CONTINUE:表示对窗口不执行任何操作。
FIRE:表示对窗口中的数据按照窗口函数中的逻辑进行计算,并将结果输出。注意计算完成后,窗口中的数据并不会被清除,将会被保留。
PURGE:表示将窗口中的数据和窗口清除。
FIRE_AND_PURGE:表示先将数据进行计算,输出结果,然后将窗口中的数据和窗口进行清除。

代码:

package com.itzhimei.window;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

import java.time.Duration;

/**
 * 自定义窗口触发器
 * 每一条数据都出发窗口执行
 * 通过dataStream的数据源和计算结果对比检查结果
 */
public class Window_10_Trigger {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");
        dataStream.print("source");
        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofSeconds(20))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.seconds(15)))
                .trigger(new Trigger<Transaction, TimeWindow>() {

                    @Override
                    public TriggerResult onElement(Transaction element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
                        return TriggerResult.FIRE;
                    }

                    @Override
                    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {

                    }
                }).sum("amount");

        amount.print("amount");
        env.execute();

    }
}

/* 每一条数据都出发窗口执行
source:2> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}

source:3> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}

source:4> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}

source:5> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}

source:6> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}

source:7> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
amount:11> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}

source:8> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
amount:16> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}

source:9> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
amount:15> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}

 */	
	

看EventTimeTrigger的源码:

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;

/**
 * A {@link Trigger} that fires once the watermark passes the end of the window to which a pane
 * belongs.
 *
 * @see org.apache.flink.streaming.api.watermark.Watermark
 */
@PublicEvolving
public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
    private static final long serialVersionUID = 1L;

    private EventTimeTrigger() {}

    @Override
    public TriggerResult onElement(
            Object element, long timestamp, TimeWindow window, TriggerContext ctx)
            throws Exception {
        if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
            // if the watermark is already past the window fire immediately
            return TriggerResult.FIRE;
        } else {
            ctx.registerEventTimeTimer(window.maxTimestamp());
            return TriggerResult.CONTINUE;
        }
    }

    @Override
    public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) {
        return time == window.maxTimestamp() ? TriggerResult.FIRE : TriggerResult.CONTINUE;
    }

    @Override
    public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx)
            throws Exception {
        return TriggerResult.CONTINUE;
    }

    @Override
    public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
        ctx.deleteEventTimeTimer(window.maxTimestamp());
    }

    @Override
    public boolean canMerge() {
        return true;
    }

    @Override
    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        // only register a timer if the watermark is not yet past the end of the merged window
        // this is in line with the logic in onElement(). If the watermark is past the end of
        // the window onElement() will fire and setting a timer here would fire the window twice.
        long windowMaxTimestamp = window.maxTimestamp();
        if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
            ctx.registerEventTimeTimer(windowMaxTimestamp);
        }
    }

    @Override
    public String toString() {
        return "EventTimeTrigger()";
    }

    /**
     * Creates an event-time trigger that fires once the watermark passes the end of the window.
     *
     * <p>Once the trigger fires all elements are discarded. Elements that arrive late immediately
     * trigger window evaluation with just this one element.
     */
    public static EventTimeTrigger create() {
        return new EventTimeTrigger();
    }
}