Flink从入门到实战四[DataStream API]-18-Transform Window

Window是Flink中无限流处理的核心,Flink中将Window也算作是数据转换算子。
Window有多种类型,例如滚动窗口、滑动窗口、Session窗口、全局窗口等,窗口的作用主要是对数据按照某个时间进行分组,然后计算。
在这里我们不做过多讲解,只是先进行预热,在后续章节详细说明。
Window: KeyedStream → WindowedStream

我们还是先看代码,这个demo是用的官网反欺诈项目来演示的,数据源是从本地源源不断的生成的用户交易数据。
第一个demo是每5秒统计一次用户一共发生了多少笔交易,第二个demo是每5秒统计一次用户一共发生交易的总金额。

@Data
public final class Transaction {

    private long accountId;

    private long timestamp;

    private double amount;
}
package org.itzhimei.transform;

import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 每5秒统计每个用户消费总笔数
 */
public class Transform_7_Window {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");
        dataStream.print();

        SingleOutputStreamOperator<Integer> aggregate = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .aggregate(new AggregateFunction<Transaction, Integer, Integer>() {
                    @Override
                    public Integer createAccumulator() {
                        return 0;
                    }

                    @Override
                    public Integer add(Transaction value, Integer accumulator) {
                        return accumulator + 1;
                    }

                    @Override
                    public Integer getResult(Integer accumulator) {
                        return accumulator;
                    }

                    @Override
                    public Integer merge(Integer a, Integer b) {
                        return a + b;
                    }
                });

        aggregate.print();
        env.execute();
    }
}


/* 输出
7> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
8> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
9> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
10> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
11> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
12> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
13> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
14> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
15> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
16> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
1> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
2> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
3> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
4> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
5> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
6> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
8> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
7> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
9> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
10> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
11> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
12> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
13> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
14> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
15> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
16> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
1> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
2> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
3> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
4> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
6> Transaction{accountId=2, timestamp=1546283160000, amount=256.48}
5> Transaction{accountId=1, timestamp=1546282800000, amount=56.12}
7> Transaction{accountId=3, timestamp=1546283520000, amount=148.16}
8> Transaction{accountId=4, timestamp=1546283880000, amount=199.95}
9> Transaction{accountId=5, timestamp=1546284240000, amount=252.37}
10> Transaction{accountId=1, timestamp=1546284600000, amount=274.73}
11> Transaction{accountId=2, timestamp=1546284960000, amount=473.54}
12> Transaction{accountId=3, timestamp=1546285320000, amount=119.92}
14> Transaction{accountId=5, timestamp=1546286040000, amount=353.16}
13> Transaction{accountId=4, timestamp=1546285680000, amount=323.59}
15> 8
16> 8
11> 9
16> 8
1> 8
 */
 
package org.itzhimei.transform;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 每5秒统计每个用户消费总金额
 */
public class Transform_8_Window_Sum {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");


        SingleOutputStreamOperator<Transaction> amount = dataStream.keyBy(Transaction::getAccountId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .sum("amount");
        amount.print();

        env.execute();
    }
}

/* 输出
11> Transaction{accountId=1, timestamp=1546272000000, amount=2170.21}
15> Transaction{accountId=3, timestamp=1546272720000, amount=1453.999}
1> Transaction{accountId=4, timestamp=1546273080000, amount=1607.2400000000002}
16> Transaction{accountId=2, timestamp=1546272360000, amount=2251.17}
16> Transaction{accountId=5, timestamp=1546273440000, amount=1451.54}
15> Transaction{accountId=3, timestamp=1546285320000, amount=2227.899}
11> Transaction{accountId=1, timestamp=1546284600000, amount=2973.9799999999996}
16> Transaction{accountId=5, timestamp=1546284240000, amount=2431.82}
1> Transaction{accountId=4, timestamp=1546285680000, amount=2274.38}
16> Transaction{accountId=2, timestamp=1546284960000, amount=3228.9899999999993}
1> Transaction{accountId=4, timestamp=1546301880000, amount=2410.14}
11> Transaction{accountId=1, timestamp=1546300800000, amount=2546.19}
15> Transaction{accountId=3, timestamp=1546301520000, amount=1504.909}
16> Transaction{accountId=2, timestamp=1546301160000, amount=3257.2499999999995}
16> Transaction{accountId=5, timestamp=1546302240000, amount=2352.3900000000003}
 */