Window是Flink中无限流处理的核心,Flink中将Window也算作是数据转换算子。
Window有多种类型,例如滚动窗口、滑动窗口、Session窗口、全局窗口等,窗口的作用主要是对数据按照某个时间进行分组,然后计算。
在这里我们不做过多讲解,只是先进行预热,在后续章节详细说明。
Window: KeyedStream → WindowedStream
我们还是先看代码,这个demo是用的官网反欺诈项目来演示的,数据源是从本地源源不断的生成的用户交易数据。
第一个demo是每5秒统计一次用户一共发生了多少笔交易,第二个demo是每5秒统计一次用户一共发生交易的总金额。
@Data
public final class Transaction {
private long accountId;
private long timestamp;
private double amount;
}
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 每5秒统计每个用户消费总笔数
*/
public class Transform_7_Window {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
dataStream.print();
SingleOutputStreamOperator<Integer> aggregate = dataStream.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.aggregate(new AggregateFunction<Transaction, Integer, Integer>() {
@Override
public Integer createAccumulator() {
return 0;
}
@Override
public Integer add(Transaction value, Integer accumulator) {
return accumulator + 1;
}
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
@Override
public Integer merge(Integer a, Integer b) {
return a + b;
}
});
aggregate.print();
env.execute();
}
}
/* 输出
7> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
8> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
9> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
10> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
11> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
12> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
13> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
14> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
15> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
16> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
1> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
2> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
3> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
4> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
5> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
6> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
8> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
7> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
9> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
10> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
11> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
12> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
13> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
14> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
15> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
16> Transaction{accountId=1, timestamp=1546281000000, amount=483.91}
1> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
2> Transaction{accountId=3, timestamp=1546281720000, amount=871.15}
3> Transaction{accountId=4, timestamp=1546282080000, amount=64.19}
4> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}
6> Transaction{accountId=2, timestamp=1546283160000, amount=256.48}
5> Transaction{accountId=1, timestamp=1546282800000, amount=56.12}
7> Transaction{accountId=3, timestamp=1546283520000, amount=148.16}
8> Transaction{accountId=4, timestamp=1546283880000, amount=199.95}
9> Transaction{accountId=5, timestamp=1546284240000, amount=252.37}
10> Transaction{accountId=1, timestamp=1546284600000, amount=274.73}
11> Transaction{accountId=2, timestamp=1546284960000, amount=473.54}
12> Transaction{accountId=3, timestamp=1546285320000, amount=119.92}
14> Transaction{accountId=5, timestamp=1546286040000, amount=353.16}
13> Transaction{accountId=4, timestamp=1546285680000, amount=323.59}
15> 8
16> 8
11> 9
16> 8
1> 8
*/
package org.itzhimei.transform;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 每5秒统计每个用户消费总金额
*/
public class Transform_8_Window_Sum {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataStream = env
.addSource(new TransactionSource())
.name("transactions");
SingleOutputStreamOperator<Transaction> amount = dataStream.keyBy(Transaction::getAccountId)
.window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.sum("amount");
amount.print();
env.execute();
}
}
/* 输出
11> Transaction{accountId=1, timestamp=1546272000000, amount=2170.21}
15> Transaction{accountId=3, timestamp=1546272720000, amount=1453.999}
1> Transaction{accountId=4, timestamp=1546273080000, amount=1607.2400000000002}
16> Transaction{accountId=2, timestamp=1546272360000, amount=2251.17}
16> Transaction{accountId=5, timestamp=1546273440000, amount=1451.54}
15> Transaction{accountId=3, timestamp=1546285320000, amount=2227.899}
11> Transaction{accountId=1, timestamp=1546284600000, amount=2973.9799999999996}
16> Transaction{accountId=5, timestamp=1546284240000, amount=2431.82}
1> Transaction{accountId=4, timestamp=1546285680000, amount=2274.38}
16> Transaction{accountId=2, timestamp=1546284960000, amount=3228.9899999999993}
1> Transaction{accountId=4, timestamp=1546301880000, amount=2410.14}
11> Transaction{accountId=1, timestamp=1546300800000, amount=2546.19}
15> Transaction{accountId=3, timestamp=1546301520000, amount=1504.909}
16> Transaction{accountId=2, timestamp=1546301160000, amount=3257.2499999999995}
16> Transaction{accountId=5, timestamp=1546302240000, amount=2352.3900000000003}
*/