Flink从入门到实战五[Window]-11-Window Evictor

Window Evictor
驱逐器(Evictor)能够在触发器触发之后,窗口函数开始计算之前或之后从窗口中清除元素。

Flink带有三种内置驱逐器:
CountEvictor:保留一定数目的元素,多余的元素按照从前到后的顺序先后清理。
TimeEvictor:保留一个时间段的元素,早于这个时间段的元素会被清理。
DeltaEvictor:窗口计算时,最近一条 Element 和其他 Element 做 Delta 计算,仅保留 Delta 结果在指定 Threshold 内的 Element。

用法:

windowStream.evictor(CountEvictor.of(5))
windowStream.evictor(TimeEvictor.of(Time.of(5, TimeUnit.SECONDS)))
windowStream.evictor(DeltaEvictor.of(threshold, new DeltaFunction<Tuple2<String, 
	Integer>>() {
	@Override
	public double getDelta(Tuple2<String, Integer> oldDataPoint, Tuple2<String, 
	Integer> newDataPoint) {
	return newDataPoint.f1 - oldDataPoint.f1;
}}, evictAfter),0,null))	

代码:

package com.itzhimei.window;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.evictors.TimeEvictor;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

import java.time.Duration;
import java.util.concurrent.TimeUnit;

/**
 * 使用TimeEvictor排除数据再进行窗口计算
 * 目标:排除窗口内12分钟之前的数据,并进行计算
 *
 */
public class Window_11_Evictor {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");

        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        SingleOutputStreamOperator<Transaction> amount = dataStream.assignTimestampsAndWatermarks(strategy)
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.hours(1)))
                .evictor(TimeEvictor.of(Time.of(12, TimeUnit.MINUTES)))
                .sum("amount");

        amount.print("amount");
        env.execute();

    }
}

/*
注释代码:.evictor(TimeEvictor.of(Time.of(12, TimeUnit.MINUTES)))
没有排除数据的输出结果:
amount:15> Transaction{accountId=3, timestamp=1546272720000, amount=432.9}
amount:11> Transaction{accountId=1, timestamp=1546272000000, amount=567.87}
amount:1> Transaction{accountId=4, timestamp=1546273080000, amount=738.1700000000001}
amount:16> Transaction{accountId=2, timestamp=1546272360000, amount=726.23}
amount:16> Transaction{accountId=5, timestamp=1546273440000, amount=482.28999999999996}
amount:1> Transaction{accountId=4, timestamp=1546276680000, amount=254.04}
amount:15> Transaction{accountId=3, timestamp=1546276320000, amount=0.989}
amount:16> Transaction{accountId=2, timestamp=1546275960000, amount=810.06}
amount:16> Transaction{accountId=5, timestamp=1546277040000, amount=762.27}
amount:11> Transaction{accountId=1, timestamp=1546275600000, amount=686.87}
amount:16> Transaction{accountId=2, timestamp=1546279560000, amount=458.4}
amount:15> Transaction{accountId=3, timestamp=1546279920000, amount=871.9499999999999}
amount:11> Transaction{accountId=1, timestamp=1546279200000, amount=859.35}
amount:1> Transaction{accountId=4, timestamp=1546280280000, amount=415.08}
amount:16> Transaction{accountId=5, timestamp=1546280640000, amount=206.98000000000002}
amount:16> Transaction{accountId=2, timestamp=1546283160000, amount=730.02}



排除数据的输出结果:
amount:15> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
amount:16> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
amount:11> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
amount:1> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
amount:16> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
amount:1> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
amount:11> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
amount:16> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
amount:15> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
amount:16> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
amount:1> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
amount:11> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
amount:15> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
amount:16> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
amount:16> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
amount:16> Transaction{accountId=2, timestamp=1546284960000, amount=473.54}
amount:1> Transaction{accountId=4, timestamp=1546285680000, amount=323.59}
amount:15> Transaction{accountId=3, timestamp=1546285320000, amount=119.92}
amount:11> Transaction{accountId=1, timestamp=1546284600000, amount=274.73}
amount:16> Transaction{accountId=5, timestamp=1546286040000, amount=353.16}
amount:16> Transaction{accountId=2, timestamp=1546288560000, amount=479.83}
amount:16> Transaction{accountId=5, timestamp=1546289640000, amount=292.44}
amount:11> Transaction{accountId=1, timestamp=1546288200000, amount=373.26}
amount:1> Transaction{accountId=4, timestamp=1546289280000, amount=83.64}
amount:15> Transaction{accountId=3, timestamp=1546288920000, amount=454.25}
 */
 

也可以自定义排除器,例如:

.evictor(new Evictor<Transaction, TimeWindow>() {
	@Override
	public void evictBefore(Iterable<TimestampedValue<Transaction>> elements, int size, TimeWindow window, EvictorContext evictorContext) {
		
	}

	@Override
	public void evictAfter(Iterable<TimestampedValue<Transaction>> elements, int size, TimeWindow window, EvictorContext evictorContext) {

	}
})

自定义排除器相当于实现了Evictor接口,需要实现内部的两个方法:evictBefore和evictAfter。

public interface Evictor<T, W extends Window> extends Serializable {

    /**
     * Optionally evicts elements. Called before windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictBefore(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

    /**
     * Optionally evicts elements. Called after windowing function.
     *
     * @param elements The elements currently in the pane.
     * @param size The current number of elements in the pane.
     * @param window The {@link Window}
     * @param evictorContext The context for the Evictor
     */
    void evictAfter(
            Iterable<TimestampedValue<T>> elements,
            int size,
            W window,
            EvictorContext evictorContext);

    /** A context object that is given to {@link Evictor} methods. */
    interface EvictorContext {

        /** Returns the current processing time. */
        long getCurrentProcessingTime();

        /**
         * Returns the metric group for this {@link Evictor}. This is the same metric group that
         * would be returned from {@link RuntimeContext#getMetricGroup()} in a user function.
         *
         * <p>You must not call methods that create metric objects (such as {@link
         * MetricGroup#counter(int)} multiple times but instead call once and store the metric
         * object in a field.
         */
        MetricGroup getMetricGroup();

        /** Returns the current watermark time. */
        long getCurrentWatermark();
    }
}