Flink从入门到实战八[State]-9-Flink Operator State的ListCheckpointed使用方法

通过实现 ListCheckpointed 接口来使用 operator state。
ListCheckpointed 接口提供了访问 non-keyed state 的方法,需要实现如下两个方法:

List<T> snapshotState(long checkpointId, long timestamp) throws Exception;
void restoreState(List<T> state) throws Exception;

operator state 以 list 的形式存在。这些状态是一个 可序列化 对象的集合 List,彼此独立,方便在改变并发后进行状态的重新分派。 换句话说,这些对象是重新分配 non-keyed state 的最细粒度。根据状态的不同访问方式,有如下几种重新分配的模式:
1)Even-split redistribution: 每个算子都保存一个列表形式的状态集合,整个状态由所有的列表拼接而成。当作业恢复或重新分配的时候,整个状态会按照算子的并发度进行均匀分配。 比如说,算子 A 的并发度为 1,包含两个元素 element1 和 element2,当并发读增加为 2 时,element1 会被分到并发 0 上,element2 则会被分到并发 1 上。
2)Union redistribution: 每个算子保存一个列表形式的状态集合。整个状态由所有的列表拼接而成。当作业恢复或重新分配时,每个算子都将获得所有的状态数据。 Do not use this feature if your list may have high cardinality. Checkpoint metadata will store an offset to each list entry, which could lead to RPC framesize or out-of-memory errors.

demo代码:

import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
 * 计基于Flink算子状态:计算交易总比数
 * 基于ListCheckpointed实现
 */
public class State_5_OperatorState {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.map(new State_5_Map())
                .print();

        env.execute();
    }

    public static class State_5_Map implements MapFunction<Transaction, Long>, ListCheckpointed<Long> {

        private Long txCount = 0L;

        /**
         * 计算交易总比数
         * @param transaction
         * @return
         * @throws Exception
         */
        @Override
        public Long map(Transaction transaction) throws Exception {
            txCount++;
            return txCount;
        }

        /**
         * 备份
         * @param checkpointId
         * @param timestamp
         * @return
         * @throws Exception
         */
        @Override
        public List<Long> snapshotState(long checkpointId, long timestamp) throws Exception {
            return Collections.singletonList(txCount);
        }

        /**
         * 恢复
         * @param state
         * @throws Exception
         */
        @Override
        public void restoreState(List<Long> state) throws Exception {
            for(Long s:state) {
                txCount+=s;
            }
        }

    }
}

/*
1> 1
1> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
2> 1
2> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
1> 2
1> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
2> 2
2> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> 3
1> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
2> 3
2> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
1> 4
1> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
2> 4
2> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
1> 5
1> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
2> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
2> 5
1> 6
1> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}

*/