Flink从入门到实战五[Window]-7-ProcessWindowFunction基于AggregateFunction增量计算

ProcessWindowFunction是一个全量计算函数,也就是窗口内数据到达后不计算,而是等到窗口出发时,将所有数据进行计算,那么这就有一个问题,就是当一个窗口数据量非常大的时候,等待窗口出发计算可能会很慢,比较耗时。前面数据到达了不计算,白白浪费了时间。

ProcessWindowFunction也支持增量计算,可以将ReduceFunction或AggregateFunction函数和ProcessWindowFunction组合使用,这就实现了窗口既能增量计算,又能获取到额外的信息。

我们来看带AggregateFunction的增量窗口聚合demo,AggregateFunction的特点是输入、计算和输出的类型可以是不同的类型。
demo需求目标是:每5秒统计每个用户消费总交易笔数,总交易金额;最终输出:Window,账号ID,总交易笔数,总交易金额。

代码:

package org.itzhimei.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Window ProcessWindowFunction
 * 每5秒统计每个用户消费总交易笔数,总交易金额
 * 输出类型:Tuple4<String, Long, Integer, Double>
 * 分别计算输出:Window,账号ID,总交易笔数,总交易金额
 */
public class Window_7_ProcessWindowFunctionWithAggregateFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Tuple4<String, Long, Integer, Double>> process = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new My2AggregateFunction(), new My2ProcessWindowFunction());

        process.print();

        env.execute();
    }


    private static class My2AggregateFunction implements AggregateFunction<Transaction, Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>> {
        @Override
        public Tuple3<Long, Integer, Double> createAccumulator() {
            return new Tuple3<>(0L,0,0D);
        }

        @Override
        public Tuple3<Long, Integer, Double> add(Transaction value, Tuple3<Long, Integer, Double> accumulator) {
            return new Tuple3<>(value.getAccountId(),accumulator.f1+1, accumulator.f2+value.getAmount());
        }

        @Override
        public Tuple3<Long, Integer, Double> getResult(Tuple3<Long, Integer, Double> accumulator) {
            return accumulator;
        }

        @Override
        public Tuple3<Long, Integer, Double> merge(Tuple3<Long, Integer, Double> a, Tuple3<Long, Integer, Double> b) {
            return new Tuple3<>(a.f0,a.f1+b.f1, a.f2+b.f2);
        }
    }

    private static class My2ProcessWindowFunction extends ProcessWindowFunction<Tuple3<Long, Integer, Double>, Tuple4<String, Long, Integer, Double>, Long, TimeWindow> {
        @Override
        public void process(Long aLong, Context context, Iterable<Tuple3<Long, Integer, Double>> elements, Collector<Tuple4<String, Long, Integer, Double>> out) throws Exception {
            Tuple3<Long, Integer, Double> next = elements.iterator().next();
            String s = context.window().toString();
            out.collect(new Tuple4<>(s, next.f0, next.f1, next.f2));
        }
    }
}

/* 分别计算输出:Window,账号ID,总交易笔数,总交易金额
16> (TimeWindow{start=1646992805000, end=1646992810000},2,6,1994.6900000000003)
15> (TimeWindow{start=1646992805000, end=1646992810000},3,6,1305.839)
11> (TimeWindow{start=1646992805000, end=1646992810000},1,6,2114.09)
1> (TimeWindow{start=1646992805000, end=1646992810000},4,6,1407.2900000000002)
16> (TimeWindow{start=1646992805000, end=1646992810000},5,6,1451.54)

11> (TimeWindow{start=1646992810000, end=1646992815000},1,9,2546.19)
16> (TimeWindow{start=1646992810000, end=1646992815000},2,9,3257.2499999999995)
15> (TimeWindow{start=1646992810000, end=1646992815000},3,9,1504.909)
16> (TimeWindow{start=1646992810000, end=1646992815000},5,9,2352.3900000000003)
1> (TimeWindow{start=1646992810000, end=1646992815000},4,9,2410.14)

11> (TimeWindow{start=1646992815000, end=1646992820000},1,9,2654.66)
16> (TimeWindow{start=1646992815000, end=1646992820000},2,9,3255.29)
15> (TimeWindow{start=1646992815000, end=1646992820000},3,9,2375.259)
16> (TimeWindow{start=1646992815000, end=1646992820000},5,9,2304.27)
1> (TimeWindow{start=1646992815000, end=1646992820000},4,9,2123.44)
 */