Flink从入门到实战八[State]-17-Flink状态 RocksDBStateBackend使用

EmbeddedRocksDBStateBackend 将正在运行中的状态数据保存在 RocksDB 数据库中,RocksDB 数据库默认将数据存储在 TaskManager 的数据目录。
RocksDB是一个本地库,它直接从进程分配内存, 而不是从JVM分配内存。分配给 RocksDB 的任何内存都必须被考虑在内,通常需要将这部分内存从任务管理器(TaskManager)的JVM堆中减去。

使用 EmbeddedRocksDBStateBackend,需要添加pom依赖

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
  <version>1.14.3</version>
</dependency>

API示例:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStateBackend(new EmbeddedRocksDBStateBackend());
env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");

// If you manually passed FsStateBackend into the RocksDBStateBackend constructor
// to specify advanced checkpointing configurations such as write buffer size,
// you can achieve the same results by using manually instantiating a FileSystemCheckpointStorage object.
env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));

代码:

package com.itzhimei.state;

import org.apache.flink.api.common.functions.RichFlatMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.contrib.streaming.state.EmbeddedRocksDBStateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

import java.util.concurrent.TimeUnit;

public class State_9_StateBackends {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setStateBackend(new EmbeddedRocksDBStateBackend());

        // 每 1000ms 开始一次 checkpoint,这个就是触发JM的checkpoint-Coordinator的一个事件间隔设置
        env.enableCheckpointing(1000);

        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 允许两个连续的 checkpoint 错误
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(
                CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        env.setStateBackend(new FsStateBackend("file:///D:/flink/checkpoints/")); 

        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
                3, // 尝试重启的次数
                Time.of(10, TimeUnit.SECONDS) // 延时
        ));

        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");
        dataStream.print();

        dataStream.keyBy(Transaction::getAccountId)
                .flatMap(new State_9_FlatMap())
                .print();

        env.execute();
    }

    public static class State_9_FlatMap extends RichFlatMapFunction<Transaction, Tuple3<Long, Integer, Double>> {

        private ValueState<Tuple2<Integer, Double>> sum;

        @Override
        public void open(Configuration parameters) throws Exception {
            ValueStateDescriptor<Tuple2<Integer, Double>> descriptor =
                    new ValueStateDescriptor<>(
                            "sum", // the state name
                            TypeInformation.of(new TypeHint<Tuple2<Integer, Double>>() {}), // type information
                            Tuple2.of(0, 0D)); // default value of the state, if nothing was set
            sum = getRuntimeContext().getState(descriptor);
        }

        @Override
        public void flatMap(Transaction transaction, Collector<Tuple3<Long, Integer, Double>> collector) throws Exception {
            Tuple2<Integer, Double> value = sum.value();
            value.f0 = value.f0+1;
            value.f1 = value.f1+transaction.getAmount();
            sum.update(value);
            collector.collect(new Tuple3<>(transaction.getAccountId(), value.f0, value.f1));
        }
    }
}
/* 输出结果
8> Transaction{accountId=1, timestamp=1546272000000, amount=188.23}
6> (1,1,188.23)
1> Transaction{accountId=2, timestamp=1546272360000, amount=374.79}
8> (2,1,374.79)
2> Transaction{accountId=3, timestamp=1546272720000, amount=112.15}
8> (3,1,112.15)
3> Transaction{accountId=4, timestamp=1546273080000, amount=478.75}
1> (4,1,478.75)
4> Transaction{accountId=5, timestamp=1546273440000, amount=208.85}
8> (5,1,208.85)
5> Transaction{accountId=1, timestamp=1546273800000, amount=379.64}
6> (1,2,567.87)
6> Transaction{accountId=2, timestamp=1546274160000, amount=351.44}
8> (2,2,726.23)
7> Transaction{accountId=3, timestamp=1546274520000, amount=320.75}
8> (3,2,432.9)
8> Transaction{accountId=4, timestamp=1546274880000, amount=259.42}
1> (4,2,738.1700000000001)
8> (5,2,482.28999999999996)
1> Transaction{accountId=5, timestamp=1546275240000, amount=273.44}
2> Transaction{accountId=1, timestamp=1546275600000, amount=267.25}
6> (1,3,835.12)
3> Transaction{accountId=2, timestamp=1546275960000, amount=397.15}
8> (2,3,1123.38)
8> (3,3,433.11899999999997)
4> Transaction{accountId=3, timestamp=1546276320000, amount=0.219}
5> Transaction{accountId=4, timestamp=1546276680000, amount=231.94}
1> (4,3,970.1100000000001)
6> Transaction{accountId=5, timestamp=1546277040000, amount=384.73}
8> (5,3,867.02)

*/