Flink从入门到实战五[Window]-5-Window Function ProcessWindowFunction

ProcessWindowFunction官方解释:A ProcessWindowFunction gets an Iterable containing all the elements of the window, and a Context object with access to time and state information, which enables it to provide more flexibility than other window functions. This comes at the cost of performance and resource consumption, because elements cannot be incrementally aggregated but instead need to be buffered internally until the window is considered ready for processing.

简单来说:ProcessWindowFunction能够获取一个包含窗口所有元素的迭代器,一个可以访问时间和状态信息的上下文,同时与其它WindowFunction相比能提供更好的灵活性,但是代价就是消耗性能和资源,因为数据不能增量聚合,而是全部缓存在内部缓存等待窗口被触发计算。

ProcessWindowFunction demo,目标是每5秒统计每个用户总交易笔数,总交易金额,同时也输出用户id和窗口信息:

/**
 * Window ProcessWindowFunction
 * 每5秒统计每个用户消费总交易笔数,总交易金额
 * 输出类型:Tuple4<String, Long, Integer, Double>
 * 分别计算输出:Window,账号ID,总交易笔数,总交易金额
 */
public class Window_6_ProcessWindowFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Tuple4<String, Long, Integer, Double>> process = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .process(new ProcessWindowFunction<Transaction, Tuple4<String, Long, Integer, Double>, Long, TimeWindow>() {
                    @Override
                    public void process(Long aLong, Context context, Iterable<Transaction> elements, Collector<Tuple4<String, Long, Integer, Double>> out) throws Exception {
                        long accountId = 0;
                        int count = 0;
                        double amount = 0d;
                        for(Transaction t: elements) {
                            accountId = t.getAccountId();
                            count++;
                            amount += t.getAmount();
                        }
                        String s = context.window().toString();
                        out.collect(new Tuple4<>(s,accountId, count, amount));
                    }
                });

        process.print();

        env.execute();
    }
}

/* 分别计算输出:Window,账号ID,总交易笔数,总交易金额
15> (TimeWindow{start=1646989610000, end=1646989615000},3,8,1573.919)
16> (TimeWindow{start=1646989610000, end=1646989615000},2,9,3005.64)
11> (TimeWindow{start=1646989610000, end=1646989615000},1,9,2656.84)
1> (TimeWindow{start=1646989610000, end=1646989615000},4,8,1930.8300000000002)
16> (TimeWindow{start=1646989610000, end=1646989615000},5,8,2057.0699999999997)

16> (TimeWindow{start=1646989615000, end=1646989620000},5,9,2078.66)
15> (TimeWindow{start=1646989615000, end=1646989620000},3,9,2256.1389999999997)
11> (TimeWindow{start=1646989615000, end=1646989620000},1,9,2818.2)
16> (TimeWindow{start=1646989615000, end=1646989620000},2,9,3204.54)
1> (TimeWindow{start=1646989615000, end=1646989620000},4,9,2150.74)
 */
	
	

ProcessWindowFunction源码:

public abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window> implements Function {

    /**
     * Evaluates the window and outputs none or several elements.
     *
     * @param key The key for which this window is evaluated.
     * @param context The context in which the window is being evaluated.
     * @param elements The elements in the window being evaluated.
     * @param out A collector for emitting elements.
     *
     * @throws Exception The function may throw exceptions to fail the program and trigger recovery.
     */
    public abstract void process(
            KEY key,
            Context context,
            Iterable<IN> elements,
            Collector<OUT> out) throws Exception;

   	/**
   	 * The context holding window metadata.
   	 */
   	public abstract class Context implements java.io.Serializable {
   	    /**
   	     * Returns the window that is being evaluated.
   	     */
   	    public abstract W window();

   	    /** Returns the current processing time. */
   	    public abstract long currentProcessingTime();

   	    /** Returns the current event-time watermark. */
   	    public abstract long currentWatermark();

   	    /**
   	     * State accessor for per-key and per-window state.
   	     *
   	     * <p><b>NOTE:</b>If you use per-window state you have to ensure that you clean it up
   	     * by implementing {@link ProcessWindowFunction#clear(Context)}.
   	     */
   	    public abstract KeyedStateStore windowState();

   	    /**
   	     * State accessor for per-key global state.
   	     */
   	    public abstract KeyedStateStore globalState();
   	}

}