Flink从入门到实战八[State]-5-Flink Keyed State MapState API使用方法

我们来看一下使用MapStateDescriptor和MapState,基于状态如何累计计算一个用户的交易数据,demo是以较新的1.14.4版本进行演示。

使用MapState重点关注三点:
1、MapState的方法: put(UK,UV)、putAll(Map)、get(UK)、remove(UK)、entries(),keys()、values()、isEmpty()、clear()
2、状态描述符的创建: MapStateDescriptor
3、状态注册:MapState getMapState(MapStateDescriptor)

代码:

import java.util.Objects;

/**
 * 基于Flink状态求和:按照用户id分组求每个人的消费总笔数、消费总金额
 * flink version 1.14.4
 * MapStateDescriptor新版本定义使用方法演示
 */
public class State_3_MapState {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Transaction> dataStream = env.addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.keyBy(Transaction::getAccountId)
                .map(new State_3_FlatMap())
                .print();

        env.execute();
    }

    public static class State_3_FlatMap extends RichMapFunction<Transaction, Tuple3<Long, Integer, Double>> {

        private MapState<String, Double> mapState;
        private static final String KEY = "amount";
        private static final String KEY2 = "count";

        @Override
        public void open(Configuration parameters) throws Exception {
            //指定key和value的类型,这里可以替代Tuple,并且可以存储更多的key-value数据
            MapStateDescriptor<String, Double> mapStateDescriptor = new MapStateDescriptor<String, Double>("amount", TypeInformation.of(String.class),TypeInformation.of(Double.class));
            mapState = getRuntimeContext().getMapState(mapStateDescriptor);
        }

        @Override
        public Tuple3<Long, Integer, Double> map(Transaction transaction) throws Exception {
            Double value1 = mapState.get(KEY);
            Double value2 = mapState.get(KEY2);
            if(Objects.isNull(value1)) {
                value1 = 0d;
                value2 = 0d;
            }
            mapState.put(KEY, value1+transaction.getAmount());
            mapState.put(KEY2, value2+1);
            return new Tuple3<>(transaction.getAccountId(), value2.intValue()+1, value1+transaction.getAmount());
        }
    }
}

/* 输出
1> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
6> (1,1,188.23)
2> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
8> (2,1,374.79)
3> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
8> (3,1,112.15)
4> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> (4,1,478.75)
5> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
8> (5,1,208.85)
6> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6> (1,2,567.87)
7> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
8> (2,2,726.23)
8> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> (3,2,432.9)
1> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
1> (4,2,738.1700000000001)
2> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
8> (5,2,482.28999999999996)
3> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
6> (1,3,835.12)
4> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
8> (2,3,1123.38)
5> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
8> (3,3,433.11899999999997)
 */

Flink State 使用流程总结分为四步:
1、注册状态
2、读取状态
3、更新状态
4、清除状态(可选的步骤)