Flink从入门到实战五[Window]-12-Window Join

Window 还可以将两个流中的元素进行关联合并。官方解释:窗口连接将共享一个公共Key且位于同一窗口中的两个流的元素连接在一起。这些窗口可以通过使用窗口分配器来定义,并对来自两个流的元素进行处理。

Window Join按照窗口进行两个流的合并,关联方式类似两个数据库表的Inner Join,如果在一个窗口内,只有一个流的数据,那么将没有关联结果。在窗口内两个流数据的关联,则是基于相同key,进行笛卡尔积方式的关联。

主要有三种合并方式:Tumbling Window Join、Sliding Window Join、Session Window Join。

API:
stream.join(otherStream)
.where()
.equalTo()
.window()
.apply()

.where() 是指定第一个流的关联的key
.equalTo() 是指定第二个流的关联的key
.window() 分配关联的窗口
.apply() 自定义关联后的数据如何处理,可以定义JoinFunction和FlatJoinFunction两种函数

我们以Tumbling Window Join来演示代码:

package com.itzhimei.window;

import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * Window Join
 * 两个流基于窗口进行关联
 *
 */
public class Window_12_Join {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStreamA = env
                .addSource(new TransactionSource())
                .name("transactionsA");

        DataStream<Transaction> dataStreamB = env
                .addSource(new TransactionSource())
                .name("transactionsB");

        DataStream<Tuple2<Long, Double>> apply = dataStreamA.join(dataStreamB)
                //指定第一个流的关联的key
                .where(Transaction::getAccountId)
                //指定第二个流的关联的key
                .equalTo(Transaction::getAccountId)
                //分配关联的窗口
                .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
                //自定义关联后的数据如何处理,可以定义JoinFunction和FlatJoinFunction两种函数
                .apply(new JoinFunction<Transaction, Transaction, Tuple2<Long, Double>>() {
                    @Override
                    public Tuple2<Long, Double> join(Transaction first, Transaction second) throws Exception {
                        return new Tuple2<>(first.getAccountId(), first.getAmount() + second.getAmount());
                    }
                });

        apply.print();

        env.execute();

    }
}

/*
15> (3,224.3)
1> (4,957.5)
15> (3,432.9)
1> (4,738.1700000000001)
11> (1,376.46)
11> (1,567.87)
16> (2,749.58)
11> (1,455.48)
16> (2,726.23)
16> (2,771.94)
1> (4,710.69)
15> (3,112.369)
1> (4,500.85)
1> (4,829.64)
16> (2,787.7)
11> (1,607.85)
15> (3,112.92)
1> (4,542.94)
16> (2,604.97)
1> (4,738.1700000000001)
11> (1,563.67)
1> (4,518.84)
11> (1,672.14)
1> (4,491.36)
11> (1,244.35)
 */