Flink从入门到实战四[DataStream API]-15-Transform聚合算子Reduce

DataStream API没有reduce和sum这类聚合操作的方法,因为Flink设计中,数据必须先分组才能做聚合操作。
所以一般操作是对DataStream做keyBy,得到KeyedStream,然后调用KeyedStream API上的reduce、sum等聚合操作方法。

Reduce在DataStream API中的作用是聚合操作,其聚合逻辑是将当前数据和上一条聚合结果数据一起给到用户,由用户自定义聚合方式。
方法API:
public interface ReduceFunction extends Function, Serializable {
T reduce(T value1, T value2) throws Exception;
}
这种聚合方式不同于KeyBy的sum、min、max等,Reduce的逻辑和Java Lambda中的Reduce实现原理是一样的。

代码:

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * Reduce KeyedStream → DataStream
 *
 */
public class Transform_5_Reduce {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.fromCollection(
                Arrays.asList("hello flink",
                        "hello java",
                        "hi program",
                        "hello",
                        "java"));

        // 1、分词
        // 2、按每个单词作为key分组
        // 3、使用reduce统计每个单词出现次数
        SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(new Tuple2<>(s2, 1));
                }
            }
        }).keyBy(item -> item.f0)
            .reduce(new ReduceFunction<Tuple2<String, Integer>>() {
                @Override
                public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2) throws Exception {
                    System.out.println("---t1.f0:"+t1.f0 + ",---t2.f0:"+t2.f0);
                    System.out.println("---t1.f1:"+t1.f1 + ",---t2.f1:"+t2.f1);
                    return new Tuple2<String, Integer>(t1.f0,t1.f1+t2.f1);
                }
            });

        reduce.print();

        env.execute();
    }
}

/* 输出
(hello,1)
(flink,1)
---t1.f0:hello,---t2.f0:hello
---t1.f1:1,---t2.f1:1
(hello,2)
(java,1)
(hi,1)
(program,1)
---t1.f0:hello,---t2.f0:hello
---t1.f1:2,---t2.f1:1
(hello,3)
---t1.f0:java,---t2.f0:java
---t1.f1:1,---t2.f1:1
(java,2)
 */

基于上面的代码,核心逻辑分析:
我们来演示一下,假设无限流流通过keyBy将hello的单词,分5次数据到key为hello的分组,那么5次的逻辑是:
1(hello-1) 2(hello-1) 3(hello-1) 4(hello-1) 5(hello-1)
第一次没有之前的数据,就是null+Tuple2(hello,1)进行计算
第二次基于前一次计算的结果,就是Tuple2(hello,1)+Tuple2(hello,1)进行计算
第三次基于前一次计算的结果,就是Tuple2(hello,2)+Tuple2(hello,1)进行计算
第四次基于前一次计算的结果,就是Tuple2(hello,3)+Tuple2(hello,1)进行计算
第五次基于前一次计算的结果,就是Tuple2(hello,4)+Tuple2(hello,1)进行计算
最后输出的结果是hello-5。