前面演示的Window都是基于时间的,我们再来看一个基于滑动计数的Window demo。
demo实现的目标很简单:基于滑动窗口计算每个用户 滑动步长为1笔交易,每3笔交易的交易总金额。
代码:
package com.itzhimei.window;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
/**
* 滑动计数窗口demo
* 滑动窗口计算每个用户 滑动步长为1笔交易,每3笔交易的交易总金额
*/
public class Window_9_SlidingCountWindow {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Transaction> dataSource = env.addSource(new TransactionSource()).name("transactions");
SingleOutputStreamOperator<Transaction> amount = dataSource.keyBy(Transaction::getAccountId)
.countWindow(3,1)
.sum("amount");
amount.print();
env.execute();
}
}
/*
11> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
16> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
15> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
16> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
11> Transaction{accountId=1, timestamp=1546272000000, amount=567.87}
16> Transaction{accountId=2, timestamp=1546272360000, amount=726.23}
15> Transaction{accountId=3, timestamp=1546272720000, amount=432.9}
1> Transaction{accountId=4, timestamp=1546273080000, amount=738.1700000000001}
16> Transaction{accountId=5, timestamp=1546273440000, amount=482.28999999999996}
11> Transaction{accountId=1, timestamp=1546272000000, amount=835.12}
16> Transaction{accountId=2, timestamp=1546272360000, amount=1123.38}
15> Transaction{accountId=3, timestamp=1546272720000, amount=433.11899999999997}
1> Transaction{accountId=4, timestamp=1546273080000, amount=970.1100000000001}
16> Transaction{accountId=5, timestamp=1546273440000, amount=867.02}
11> Transaction{accountId=1, timestamp=1546273800000, amount=1066.51}
16> Transaction{accountId=2, timestamp=1546274160000, amount=1161.5}
15> Transaction{accountId=3, timestamp=1546274520000, amount=321.739}
1> Transaction{accountId=4, timestamp=1546274880000, amount=513.46}
16> Transaction{accountId=5, timestamp=1546275240000, amount=1035.71}
11> Transaction{accountId=1, timestamp=1546275600000, amount=1062.31}
16> Transaction{accountId=2, timestamp=1546275960000, amount=1040.24}
15> Transaction{accountId=3, timestamp=1546276320000, amount=1.7890000000000001}
1> Transaction{accountId=4, timestamp=1546276680000, amount=604.93}
16> Transaction{accountId=5, timestamp=1546277040000, amount=889.8199999999999}
11> Transaction{accountId=1, timestamp=1546277400000, amount=1278.97}
16> Transaction{accountId=2, timestamp=1546277760000, amount=871.3100000000001}
15> Transaction{accountId=3, timestamp=1546278120000, amount=872.72}
1> Transaction{accountId=4, timestamp=1546278480000, amount=437.18}
16> Transaction{accountId=5, timestamp=1546278840000, amount=584.52}
*/