Flink从入门到实战五[Window]-13-Window Interval Join

Window Join按照窗口和相同的key进行两个流的合并,如果没有窗口,可以使用Interval Join来基于相同的key join两个流。

Interval Join关联的逻辑是基于一个流的数据,在另一个流上划定一个范围,在另一个流的这个范围内,基于key进行关联。

例如现在有两个流A和B,A作为基础流来关联B,A.ts + lowerBound <= B.ts <= A.ts + upperBound,lowerBound和upperBound表示关联数据的下界和上届,数据可以是正数,也可以是负数。

Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction,取两个流中相对较小的时间戳。

API:

DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(first + "," + second);
        }
    });

代码:

package com.itzhimei.window;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import scala.Tuple2;

import java.time.Duration;

/**
 * Interval Join
 * 两个流基于区间进行关联
 *
 */
public class Window_13_Interval_Join {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        WatermarkStrategy<Transaction> strategy = WatermarkStrategy
                .<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
                .withTimestampAssigner((event, timestamp) -> event.getTimestamp());

        KeyedStream<Transaction, Long> dataStreamA = env
                .addSource(new TransactionSource())
                .name("transactionsA").assignTimestampsAndWatermarks(strategy).keyBy(Transaction::getAccountId);

        KeyedStream<Transaction, Long> dataStreamB = env
                .addSource(new TransactionSource())
                .name("transactionsB").assignTimestampsAndWatermarks(strategy).keyBy(Transaction::getAccountId);

        SingleOutputStreamOperator<Tuple2<Long, Double>> process = dataStreamA.intervalJoin(dataStreamB)
                .between(Time.hours(-1), Time.hours(1))
                .process(new ProcessJoinFunction<Transaction, Transaction, Tuple2<Long, Double>>() {
                    @Override
                    public void processElement(Transaction left, Transaction right, Context ctx, Collector<Tuple2<Long, Double>> out) throws Exception {
                        out.collect(new Tuple2<>(left.getAccountId(), left.getAmount()+right.getAmount()));
                    }
                });

        process.print();

        env.execute();

    }
}

/*
11> (1,376.46)
16> (2,749.58)
1> (4,957.5)
15> (3,224.3)
16> (5,417.7)
11> (1,567.87)
11> (1,759.28)
11> (1,567.87)
16> (2,726.23)
16> (2,702.88)
16> (2,726.23)
15> (3,432.9)
15> (3,432.9)
15> (3,641.5)
1> (4,738.1700000000001)
1> (4,518.84)
1> (4,738.1700000000001)
16> (5,482.28999999999996)
16> (5,546.88)
16> (5,482.28999999999996)
11> (1,646.89)
11> (1,455.48)
11> (1,646.89)
11> (1,455.48)
11> (1,534.5)
16> (2,748.5899999999999)
16> (2,771.94)
16> (2,794.3)
16> (2,748.5899999999999)
16> (2,771.94)
15> (3,112.369)
15> (3,320.969)
15> (3,112.369)
15> (3,0.438)
15> (3,320.969)
 */