Flink从入门到实战八[State]-14-Flink状态 Checkpoint使用

上一节了解了Checkpoint的执行流程和内部原理,那么Checkpoint是如何使用的呢,我们先来看一下API。
在此之前,还是需要重点说明一下,Flink 的 checkpoint 机制会和持久化存储进行交互,读写流与状态。一般需要:
一个能够回放一段时间内数据的持久化数据源,例如持久化消息队列(例如 Apache Kafka、RabbitMQ、 Amazon Kinesis、 Google PubSub 等)或文件系统(例如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。
存放状态的持久化存储,通常为分布式文件系统(比如 HDFS、 S3、 GFS、 NFS、 Ceph 等)。

Checkpoint API:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
env.enableCheckpointing(1000);

// 高级选项:

// 设置模式为精确一次 (这是默认值)
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

// 确认 checkpoints 之间的时间会进行 500 ms
env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

// Checkpoint 必须在一分钟内完成,否则就会被抛弃
env.getCheckpointConfig().setCheckpointTimeout(60000);

// 允许两个连续的 checkpoint 错误
env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);
        
// 同一时间只允许一个 checkpoint 进行
env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
env.getCheckpointConfig().setExternalizedCheckpointCleanup(
        ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// 开启实验性的 unaligned checkpoints
env.getCheckpointConfig().enableUnalignedCheckpoints();

代码:

package com.itzhimei.state;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

public class State_8_Checkpoint {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
        env.enableCheckpointing(1000);

        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 允许两个连续的 checkpoint 错误
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 开启实验性的 unaligned checkpoints
        //env.getCheckpointConfig().enableUnalignedCheckpoints();

        env.setStateBackend(new FsStateBackend("file:///D:/flink/checkpoints/")); //file:///D:/______flink______/checkpoints/

        /*env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 延时
        ));*/

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.map(new State_8_cp_Map())
                .print();

        env.execute();
    }

    public static class State_8_cp_Map implements MapFunction<Transaction, Long>, CheckpointedFunction {

        //private ReducingState<Long> countPerKey;
        private ListState<Long> countPerPartition;
        private long localCount;

        /**
         * 计算交易总比数
         * @param transaction
         * @return
         * @throws Exception
         */
        @Override
        public Long map(Transaction transaction) throws Exception {
            // update the states
            //countPerKey.add(1L);
            localCount++;

            return localCount;
        }

        /**
         * 备份
         * @param context
         * @throws Exception
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            // the keyed state is always up to date anyways
            // just bring the per-partition state in shape
            countPerPartition.clear();
            countPerPartition.add(localCount);
        }

        /**
         * 恢复
         * @param context
         * @throws Exception
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            // get the state data structure for the per-key state
            /*countPerKey = context.getKeyedStateStore().getReducingState(
                    new ReducingStateDescriptor<>("perKeyCount", new State_4_AddFunction(), Long.class));*/

            // get the state data structure for the per-partition state
            countPerPartition = context.getOperatorStateStore().getListState(
                    new ListStateDescriptor<>("perPartitionCount", Long.class));

            // initialize the "local count variable" based on the operator state
            for (Long l : countPerPartition.get()) {
                localCount += l;
            }
        }
    }

}

/* 输出
Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
1
Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
2
Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
3
Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
4
Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
5
Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6
Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
7
Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8
Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
9
Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
10
Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
11

*/

在你配置的目录下,生成了一个一个uuid的目录,目录中有3个子文件夹:
chk-18
shared
taskowned