Flink从入门到实战八[State]-3-Flink Keyed State ValueState旧版API使用方法

我们来看一下使用ValueStateDescriptor和ValueState,基于状态如何累计计算一个用户的交易数据,demo是以1.1之前的版本进行演示,下一节将以较新的1.14.4版本进行演示。

使用ValueState重点关注三点:
1、ValueState的方法: update(T)、T value()、clear()
2、状态描述符的创建: ValueStateDescriptor
3、状态注册:ValueState getState(ValueStateDescriptor)

代码:

/**
 * 基于Flink状态求和:按照用户id分组求每个人的消费总笔数、消费总金额
 * flink version 1.14.4
 */
public class State_1_ValueState {



    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.keyBy(Transaction::getAccountId)
                .flatMap(new State_1_FlatMap())
                .print();

        env.execute();
    }

    public static class State_1_FlatMap extends RichFlatMapFunction<Transaction, Tuple3<Long, Integer, Double>> {

        private ValueState<Tuple2<Integer, Double>> sum;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<Integer, Double>> descriptor =
                    new ValueStateDescriptor<>(
                            "sum", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Integer, Double>>() {}), // type information
                            Tuple2.of(0, 0D)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(Transaction transaction, Collector<Tuple3<Long, Integer, Double>> collector) throws Exception {
            Tuple2<Integer, Double> value = sum.value();
            value.f0 = value.f0+1;
            value.f1 = value.f1+transaction.getAmount();
            sum.update(value);
            collector.collect(new Tuple3<>(transaction.getAccountId(), value.f0, value.f1));
        }
    }
}
/* 输出结果
6> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
6> (1,1,188.23)
7> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
8> (2,1,374.79)
8> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
8> (3,1,112.15)
1> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> (4,1,478.75)
2> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
8> (5,1,208.85)
3> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6> (1,2,567.87)
4> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
8> (2,2,726.23)
5> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> (3,2,432.9)
6> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
1> (4,2,738.1700000000001)
7> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
8> (5,2,482.28999999999996)
8> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
6> (1,3,835.12)
1> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
8> (2,3,1123.38)
2> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
8> (3,3,433.11899999999997)
3> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
1> (4,3,970.1100000000001)
4> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
8> (5,3,867.02)
5> Transaction{accountId=1, timestamp=1546277400000, amount=419.62}
6> (1,4,1254.74)
6> Transaction{accountId=2, timestamp=1546277760000, amount=412.91}
8> (2,4,1536.2900000000002)
7> Transaction{accountId=3, timestamp=1546278120000, amount=0.77}
8> (3,4,433.88899999999995)
8> Transaction{accountId=4, timestamp=1546278480000, amount=22.1}
1> (4,4,992.2100000000002)
1> Transaction{accountId=5, timestamp=1546278840000, amount=377.54}
8> (5,4,1244.56)
2> Transaction{accountId=1, timestamp=1546279200000, amount=375.44}
6> (1,5,1630.18)
3> Transaction{accountId=2, timestamp=1546279560000, amount=230.18}
8> (2,5,1766.4700000000003)
4> Transaction{accountId=3, timestamp=1546279920000, amount=0.8}
8> (3,5,434.68899999999996)
5> Transaction{accountId=4, timestamp=1546280280000, amount=350.89}
1> (4,5,1343.1000000000001)
6> Transaction{accountId=5, timestamp=1546280640000, amount=127.55}
8> (5,5,1372.11)
 */

Flink State 使用流程总结分为四步:
1、注册状态
2、读取状态
3、更新状态
4、清除状态(可选的步骤)