Window Join按照窗口和相同的key进行两个流的合并,如果没有窗口,可以使用Interval Join来基于相同的key join两个流。
Interval Join关联的逻辑是基于一个流的数据,在另一个流上划定一个范围,在另一个流的这个范围内,基于key进行关联。
例如现在有两个流A和B,A作为基础流来关联B,A.ts + lowerBound <= B.ts <= A.ts + upperBound,lowerBound和upperBound表示关联数据的下界和上届,数据可以是正数,也可以是负数。
Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction,取两个流中相对较小的时间戳。
API:
DataStream<Integer> orangeStream = ...
DataStream<Integer> greenStream = ...
orangeStream
.keyBy(<KeySelector>)
.intervalJoin(greenStream.keyBy(<KeySelector>))
.between(Time.milliseconds(-2), Time.milliseconds(1))
.process (new ProcessJoinFunction<Integer, Integer, String(){
@Override
public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
out.collect(first + "," + second);
}
});
代码:
package com.itzhimei.window;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;
import scala.Tuple2;
import java.time.Duration;
/**
* Interval Join
* 两个流基于区间进行关联
*
*/
public class Window_13_Interval_Join {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
WatermarkStrategy<Transaction> strategy = WatermarkStrategy
.<Transaction>forBoundedOutOfOrderness(Duration.ofMinutes(15))
.withTimestampAssigner((event, timestamp) -> event.getTimestamp());
KeyedStream<Transaction, Long> dataStreamA = env
.addSource(new TransactionSource())
.name("transactionsA").assignTimestampsAndWatermarks(strategy).keyBy(Transaction::getAccountId);
KeyedStream<Transaction, Long> dataStreamB = env
.addSource(new TransactionSource())
.name("transactionsB").assignTimestampsAndWatermarks(strategy).keyBy(Transaction::getAccountId);
SingleOutputStreamOperator<Tuple2<Long, Double>> process = dataStreamA.intervalJoin(dataStreamB)
.between(Time.hours(-1), Time.hours(1))
.process(new ProcessJoinFunction<Transaction, Transaction, Tuple2<Long, Double>>() {
@Override
public void processElement(Transaction left, Transaction right, Context ctx, Collector<Tuple2<Long, Double>> out) throws Exception {
out.collect(new Tuple2<>(left.getAccountId(), left.getAmount()+right.getAmount()));
}
});
process.print();
env.execute();
}
}
/*
11> (1,376.46)
16> (2,749.58)
1> (4,957.5)
15> (3,224.3)
16> (5,417.7)
11> (1,567.87)
11> (1,759.28)
11> (1,567.87)
16> (2,726.23)
16> (2,702.88)
16> (2,726.23)
15> (3,432.9)
15> (3,432.9)
15> (3,641.5)
1> (4,738.1700000000001)
1> (4,518.84)
1> (4,738.1700000000001)
16> (5,482.28999999999996)
16> (5,546.88)
16> (5,482.28999999999996)
11> (1,646.89)
11> (1,455.48)
11> (1,646.89)
11> (1,455.48)
11> (1,534.5)
16> (2,748.5899999999999)
16> (2,771.94)
16> (2,794.3)
16> (2,748.5899999999999)
16> (2,771.94)
15> (3,112.369)
15> (3,320.969)
15> (3,112.369)
15> (3,0.438)
15> (3,320.969)
*/