Flink从入门到实战七[ProcessFunction]-4-CoProcessFunction使用

当有两条输入流输入数据,DataStream API提供了CoProcessFunction。CoProcessFunction支持为每一个输入流提供一个processElement()方法。

我们来演示一个简单的demo,程序先模拟两个输入流,一个是交易金额大于10元的流,一个是交易金额小于10元的流,我们在两个流上的processElement()方法,都过滤掉用户1和用户5。
代码:

package com.itzhimei.process;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class ProcessFunction_3_CoProcessFunction {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> tx1 = env
                .addSource(new TransactionSource()).filter(new FilterFunction<Transaction>() {
                    @Override
                    public boolean filter(Transaction value) throws Exception {
                        return value.getAmount()>=10;
                    }
                })
                .name("transactions");

        DataStream<Transaction> tx2 = env
                .addSource(new TransactionSource()).filter(new FilterFunction<Transaction>() {
                    @Override
                    public boolean filter(Transaction value) throws Exception {
                        return value.getAmount()<10;
                    }
                })
                .name("transactions");

        //tx1.print("tx111111");
        //tx2.print("tx222222");

        SingleOutputStreamOperator<Transaction> process = tx1.connect(tx2).process(new CoProcessFunction<Transaction, Transaction, Transaction>() {
            @Override
            public void processElement1(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
                if(value.getAccountId() != 1 && value.getAccountId() != 5) {
                    out.collect(value);
                }

            }

            @Override
            public void processElement2(Transaction value, Context ctx, Collector<Transaction> out) throws Exception {
                if(value.getAccountId() != 1 && value.getAccountId() != 5) {
                    out.collect(value);
                }
            }
        });

        process.print();

        env.execute();
    }
}

/* 输出结果
1> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
6> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
3> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
2> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
5> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
8> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
7> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
2> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
5> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
4> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
7> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
2> Transaction{accountId=2, timestamp=1546281360000, amount=228.22}
 */