Flink从入门到实战五[Window]-3-Window Function ReduceFunction

ReduceFunction能够指定两个元素如何合并生成一个元素,两个输入元素和最终输出元素的类型必须相同。
ReduceFunction是一个增量计算窗口函数。

代码示例:

DataStream<Tuple2<String, Long>> input = ...;

input
    .keyBy(<key selector>)
    .window(<window assigner>)
    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
      public Tuple2<String, Long> reduce(Tuple2<String, Long> v1, Tuple2<String, Long> v2) {
        return new Tuple2<>(v1.f0, v1.f1 + v2.f1);
      }
    });
	

我们还是以用户交易数据来演示代码:

package org.itzhimei.window;

import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Window ReduceFunction
 * 每5秒统计每个用户消费总金额
 */
public class Window_3_ReduceFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()).name("Transaction");

        SingleOutputStreamOperator<Transaction> reduce = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .reduce(new ReduceFunction<Transaction>() {
                    @Override
                    public Transaction reduce(Transaction value1, Transaction value2) throws Exception {
                        return new Transaction(value1.getAccountId(), value1.getTimestamp(), value1.getAmount() + value2.getAmount());
                    }
                });

        reduce.print();

        env.execute();
    }
}

/*
11> Transaction{accountId=1, timestamp=1546272000000, amount=567.87}
16> Transaction{accountId=2, timestamp=1546272360000, amount=726.23}
15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
16> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}

15> Transaction{accountId=3, timestamp=1546274520000, amount=2263.909}
1> Transaction{accountId=4, timestamp=1546274880000, amount=1995.5800000000002}
16> Transaction{accountId=5, timestamp=1546275240000, amount=2222.97}
11> Transaction{accountId=1, timestamp=1546275600000, amount=2650.4599999999996}
16> Transaction{accountId=2, timestamp=1546275960000, amount=3134.0299999999997}

11> Transaction{accountId=1, timestamp=1546291800000, amount=2841.87}
1> Transaction{accountId=4, timestamp=1546291080000, amount=2390.69}
15> Transaction{accountId=3, timestamp=1546290720000, amount=1921.8090000000002}
16> Transaction{accountId=5, timestamp=1546291440000, amount=2139.3799999999997}
16> Transaction{accountId=2, timestamp=1546292160000, amount=3110.68}
 */