Flink从入门到实战八[State]-8-Flink Operator State的CheckpointedFunction使用方法

通过实现 CheckpointedFunction 接口来使用 operator state。
CheckpointedFunction 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

void snapshotState(FunctionSnapshotContext context) throws Exception;
void initializeState(FunctionInitializationContext context) throws Exception;

operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
1)Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发度为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
2)Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

demo代码:

public class State_4_OperatorState {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.map(new State_4_Map())
                .print();

        env.execute();
    }

    public static class State_4_Map implements MapFunction<Transaction, Long>, CheckpointedFunction {

        private ListState<Long> countPerPartition;
        private long localCount;

        /**
         * 计算交易总比数
         * @param transaction
         * @return
         * @throws Exception
         */
        @Override
        public Long map(Transaction transaction) throws Exception {
            // update the states
            localCount++;

            return localCount;
        }

        /**
         * 备份
         * @param context
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // the keyed state is always up to date anyways
            // just bring the per-partition state in shape
            countPerPartition.clear();
            countPerPartition.add(localCount);
        }

        /**
         * 恢复
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {

            // get the state data structure for the per-partition state
            countPerPartition = context.getOperatorStateStore().getListState(
                    new ListStateDescriptor<>("perPartitionCount", Long.class));

            // initialize the "local count variable" based on the operator state
            for (Long l : countPerPartition.get()) {
                localCount += l;
            }
        }
    }

}

/* 输出
Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
1
Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2
Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3
Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
4
Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
5
Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6
Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7
Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8
Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
9
Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
10
Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
11

*/

调用不同的获取状态对象的接口,会使用不同的状态分配算法。比如 getUnionListState(descriptor) 会使用 union redistribution 算法, 而 getListState(descriptor) 则简单的使用 even-split redistribution 算法。

当初始化好状态对象后,我们通过 isRestored() 方法判断是否从之前的故障中恢复回来,如果该方法返回 true 则表示从故障中进行恢复,会执行接下来的恢复逻辑。