Flink从入门到实战五[Window]-2-Window API初探

上一篇文章从整体上了解了窗口的概念、分类,接下来我们用两个简单的demo来看一下如何使用窗口来进行计算。

demo还是使用之前的Transaction,主要就是从本地无限生成数据,模拟5位用户的交易数据。

public final class Transaction {

    private long accountId;

    private long timestamp;

    private double amount;
}

例1,每5秒统计每个用户消费总金额

package org.itzhimei.window;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 每5秒统计每个用户消费总金额
 * 基于事件时间进行计算
 *
 */
public class Window_1 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");

        SingleOutputStreamOperator<Transaction> amount1 = dataStream.assignTimestampsAndWatermarks(
                new BoundedOutOfOrdernessTimestampExtractor<Transaction>(Time.milliseconds(1000)) {
            @Override
            public long extractTimestamp(Transaction element) {
                return element.getTimestamp();
            }
        })
                .keyBy(Transaction::getAccountId)
                .window(TumblingEventTimeWindows.of(Time.seconds(5)))
                .sum("amount");

        amount1.print();

        env.execute();
    }
}

/* 输出
11> Transaction{accountId=1, timestamp=1546272000000, amount=2170.21}
15> Transaction{accountId=3, timestamp=1546272720000, amount=1453.999}
1> Transaction{accountId=4, timestamp=1546273080000, amount=1607.2400000000002}
16> Transaction{accountId=2, timestamp=1546272360000, amount=2251.17}
16> Transaction{accountId=5, timestamp=1546273440000, amount=1451.54}
15> Transaction{accountId=3, timestamp=1546285320000, amount=2227.899}
11> Transaction{accountId=1, timestamp=1546284600000, amount=2973.9799999999996}
16> Transaction{accountId=5, timestamp=1546284240000, amount=2431.82}
1> Transaction{accountId=4, timestamp=1546285680000, amount=2274.38}
16> Transaction{accountId=2, timestamp=1546284960000, amount=3228.9899999999993}
1> Transaction{accountId=4, timestamp=1546301880000, amount=2410.14}
11> Transaction{accountId=1, timestamp=1546300800000, amount=2546.19}
15> Transaction{accountId=3, timestamp=1546301520000, amount=1504.909}
16> Transaction{accountId=2, timestamp=1546301160000, amount=3257.2499999999995}
16> Transaction{accountId=5, timestamp=1546302240000, amount=2352.3900000000003}
 */

例2,每5秒统计每个用户消费最小金额

package org.itzhimei.window;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 每5秒统计每个用户消费最小金额
 * 基于处理时间计算
 */
public class Window_2 {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource())
                .name("transactions");

        SingleOutputStreamOperator<Transaction> amount = dataStream.keyBy(Transaction::getAccountId)
                //.timeWindow(Time.seconds(5))  旧版API
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                .minBy("amount");


        amount.print();

        env.execute();
    }
}

/* 输出
15> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
1> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
11> Transaction{accountId=1, timestamp=1546282800000, amount=56.12}
16> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
16> Transaction{accountId=5, timestamp=1546282440000, amount=79.43}

1> Transaction{accountId=4, timestamp=1546296480000, amount=22.1}
16> Transaction{accountId=2, timestamp=1546299360000, amount=228.22}
15> Transaction{accountId=3, timestamp=1546294320000, amount=0.219}
11> Transaction{accountId=1, timestamp=1546300800000, amount=56.12}
16> Transaction{accountId=5, timestamp=1546300440000, amount=79.43}
 */

上面的两个两个例子,都是基于Transaction类的AccountId进行keyBy分组,然后使用window()来进行分组计算的。