Flink从入门到实战五[Window]-4-Window Function AggregateFunction

AggregateFunction是一个功能加强的ReduceFunction,ReduceFunction只支持相同类型数据的计算,最终输出的类型也必须是同一种类型,AggregateFunction则支持输入、计算和输出类型各不相同。
AggregateFunction和ReduceFunction一样,也是一个增量计算窗口函数。

代码1-每5秒统计每个用户消费总金额:

package org.itzhimei.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import scala.Tuple2;

/**
 * Window AggregateFunction
 * 每5秒统计每个用户消费总金额
 * Tuple2<Long, Integer>
 */
public class Window_4_AggregateFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Tuple2<Long, Integer>> aggregate = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new AggregateFunction<Transaction, Tuple2<Long, Integer>, Tuple2<Long, Integer>>() {

                    @Override
                    public Tuple2<Long, Integer> createAccumulator() {
                        return new Tuple2<>(0L,0);
                    }

                    @Override
                    public Tuple2<Long, Integer> add(Transaction value, Tuple2<Long, Integer> accumulator) {
                        return new Tuple2<>(value.getAccountId(),accumulator._2+1);
                    }

                    @Override
                    public Tuple2<Long, Integer> getResult(Tuple2<Long, Integer> accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Tuple2<Long, Integer> merge(Tuple2<Long, Integer> a, Tuple2<Long, Integer> b) {
                        return null;
                    }
                });

        aggregate.print();

        env.execute();
    }
}

/* 分别计算输出:账号ID,总交易笔数
15> (3,6)
1> (4,6)
11> (1,6)
16> (2,6)
16> (5,5)

15> (3,9)
11> (1,9)
1> (4,9)
16> (5,9)
16> (2,9)

1> (4,9)
16> (5,9)
16> (2,9)
15> (3,9)
11> (1,9)
 */

代码2-每5秒统计每个用户总交易笔数,总交易金额:

package org.itzhimei.window;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Window AggregateFunction
 * 每5秒统计每个用户总交易笔数,总交易金额
 * 输出类型:Tuple3<Long, Integer, Double>
 */
public class Window_5_AggregateFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Tuple3<Long, Integer, Double>> aggregate = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new AggregateFunction<Transaction, Tuple3<Long, Integer, Double>, Tuple3<Long, Integer, Double>>() {

                    @Override
                    public Tuple3<Long, Integer, Double> createAccumulator() {
                        return new Tuple3<>(0L,0,0D);
                    }

                    @Override
                    public Tuple3<Long, Integer, Double> add(Transaction value, Tuple3<Long, Integer, Double> accumulator) {
                        return new Tuple3<>(value.getAccountId(),accumulator.f1+1, accumulator.f2+value.getAmount());
                    }

                    @Override
                    public Tuple3<Long, Integer, Double> getResult(Tuple3<Long, Integer, Double> accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Tuple3<Long, Integer, Double> merge(Tuple3<Long, Integer, Double> a, Tuple3<Long, Integer, Double> b) {
                        return null;
                    }
                });

        aggregate.print();

        env.execute();
    }
}

/* 分别计算输出:账号ID,总交易笔数,总交易金额
15> (3,8,1573.919)
1> (4,8,1930.8300000000002)
11> (1,8,2444.94)
16> (2,8,2724.71)
16> (5,7,1703.9099999999999)

16> (5,9,2179.45)
1> (4,9,2150.74)
16> (2,9,3011.9299999999994)
15> (3,9,2256.1389999999997)
11> (1,9,2755.37)

11> (1,9,2973.9799999999996)
1> (4,9,2274.38)
16> (5,9,2352.3900000000003)
15> (3,9,2227.899)
16> (2,9,3228.9899999999993)
 */