Flink从入门到实战八[State]-11-Flink Broadcast State 广播状态使用示例

基于前一节我们已经对Broadcast State和其API有了一定了解,本节我们一起来编写代码,实现一个demo。

因为广播流是一个低吞吐的流,所以广播流的使用场景适合广播全局都要使用的规则信息。例如我们要计算用户交易数据,此时计算规则,就可以用广播流广播到各个并发算子中,每个算子获取的规则都是相同的。

使用广播流的API整体流程:
1、先定义一个业务流
2、定义广播流和广播流的状态信息
3、将两个流进行合并,合并顺序:业务流.connect(广播流)
4、继承BroadcastProcessFunction或KeyedBroadcastProcessFunction类,实现其中的processElement()和processBroadcastElement()方法

代码:

/**
 * 用户交易计算规则从广播流获取
 */
public class State_7_BroadcastState {

    //广播流的状态
    private static MapStateDescriptor<String, String> ruleStateDescriptor = new MapStateDescriptor<>(
            "RulesBroadcastState",
            BasicTypeInfo.STRING_TYPE_INFO,
            TypeInformation.of(new TypeHint<String>() {}));

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //业务流的数据从交易模拟器中输入
        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        KeyedStream<Transaction, Long> keyStream = dataStream.keyBy(item -> item.getAccountId());

        //广播流的规则,从Socket输入
        DataStream<String> ruleStream = env.socketTextStream("localhost", 7777);
        BroadcastStream<String> ruleBroadcastStream = ruleStream.broadcast(ruleStateDescriptor);

        //connect() 方法需要由非广播流来进行调用,BroadcastStream 作为参数传入
        keyStream.connect(ruleBroadcastStream).process(new State_7_BroadcastState_Inner());

        env.execute();

    }

    public static class State_7_BroadcastState_Inner extends KeyedBroadcastProcessFunction<Long, Transaction, String, Void> {

        @Override
        public void processElement(Transaction value, ReadOnlyContext ctx, Collector<Void> out) throws Exception {
            //这里处理的非广播流的业务逻辑
            //模拟基于用户交易匹配匹配广播的规则,然后进行计算
            StringBuffer sb = new StringBuffer("");
            for (Map.Entry<String, String> entry :
                    ctx.getBroadcastState(ruleStateDescriptor).immutableEntries()) {
                final String ruleName = entry.getKey();
                final String rule = entry.getValue();
                sb.append(ruleName).append(":").append(rule).append(";");
            }
            System.out.println("当前用户:"+value.getAccountId()+" 匹配到的规则:" + sb.toString());
        }

        @Override
        public void processBroadcastElement(String value, Context ctx, Collector<Void> out) throws Exception {
            //这里处理的广播流的业务逻辑
            //当广播流有规则输入,则将规则放入state
            ctx.getBroadcastState(ruleStateDescriptor).put(value, value);
        }
    }
}

/*
Socket一次输入:
rule1
rule2
rule3
观察日志输出。

当Socket没有输入规则时,控制台的输出:
当前用户:1 匹配到的规则:
当前用户:2 匹配到的规则:
当前用户:3 匹配到的规则:
当前用户:4 匹配到的规则:
当前用户:5 匹配到的规则:
当前用户:1 匹配到的规则:
当前用户:2 匹配到的规则:
当前用户:3 匹配到的规则:
当前用户:4 匹配到的规则:
===========================================================
当Socket输入rule1规则时,控制台的输出:
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
当前用户:4 匹配到的规则:rule1:rule1;
当前用户:5 匹配到的规则:rule1:rule1;
当前用户:1 匹配到的规则:rule1:rule1;
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
当前用户:4 匹配到的规则:rule1:rule1;
当前用户:5 匹配到的规则:rule1:rule1;
当前用户:1 匹配到的规则:rule1:rule1;
当前用户:2 匹配到的规则:rule1:rule1;
当前用户:3 匹配到的规则:rule1:rule1;
===========================================================
当Socket输入rule2规则时,控制台的输出:
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:2 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:3 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:4 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:2 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:3 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:4 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:5 匹配到的规则:rule1:rule1;rule2:rule2;
当前用户:1 匹配到的规则:rule1:rule1;rule2:rule2;

*/