Flink从入门到实战五[Window]-8-Window 滚动计数窗口TumblingCountWindow

前面演示的Window都是基于时间的,我们再来看一个基于滚动计数的Window demo。
demo实现的目标很简单:基于滚动计数窗口计算每个用户 每3笔交易的交易总金额。

代码:

package com.itzhimei.window;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 滚动计数窗口demo
 * 计数窗口计算每个用户 每3笔交易的交易总金额
 */
public class Window_8_TumblingCountWindow {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataSource = env.addSource(new TransactionSource()).name("transactions");

        SingleOutputStreamOperator<Transaction> amount = dataSource.keyBy(Transaction::getAccountId)
                .countWindow(3)
                .sum("amount");

        amount.print();

        env.execute();
    }
}

/*
11> Transaction{accountId=1, timestamp=1546272000000, amount=835.12}
16> Transaction{accountId=2, timestamp=1546272360000, amount=1123.38}
15> Transaction{accountId=3, timestamp=1546272720000, amount=433.11899999999997}
1> Transaction{accountId=4, timestamp=1546273080000, amount=970.1100000000001}
16> Transaction{accountId=5, timestamp=1546273440000, amount=867.02}

11> Transaction{accountId=1, timestamp=1546277400000, amount=1278.97}
16> Transaction{accountId=2, timestamp=1546277760000, amount=871.3100000000001}
15> Transaction{accountId=3, timestamp=1546278120000, amount=872.72}
1> Transaction{accountId=4, timestamp=1546278480000, amount=437.18}
16> Transaction{accountId=5, timestamp=1546278840000, amount=584.52}

16> Transaction{accountId=2, timestamp=1546283160000, amount=1010.95}
11> Transaction{accountId=1, timestamp=1546282800000, amount=542.75}
15> Transaction{accountId=3, timestamp=1546283520000, amount=615.97}
1> Transaction{accountId=4, timestamp=1546283880000, amount=983.4}
16> Transaction{accountId=5, timestamp=1546284240000, amount=687.8399999999999}

11> Transaction{accountId=1, timestamp=1546288200000, amount=941.13}
16> Transaction{accountId=2, timestamp=1546288560000, amount=1206.06}
15> Transaction{accountId=3, timestamp=1546288920000, amount=887.15}
1> Transaction{accountId=4, timestamp=1546289280000, amount=821.81}
16> Transaction{accountId=5, timestamp=1546289640000, amount=774.73}
 */