Flink从入门到实战四[DataStream API]-9-Source-从MQ中读取数据

pom文件:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-rabbitmq_2.11</artifactId>
    <version>1.14.3</version>
</dependency>

安装 RabbitMQ

RabbitMQ Source
RMQSource 负责从 RabbitMQ 中消费数据,可以配置三种不同级别的保证:
1)精确一次: 保证精确一次需要以下条件
开启 checkpointing: 开启 checkpointing 之后,消息在 checkpoints 完成之后才会被确认(然后从 RabbitMQ 队列中删除).
使用关联标识(Correlation ids): 关联标识是 RabbitMQ 的一个特性,消息写入 RabbitMQ 时在消息属性中设置。 从 checkpoint 恢复时有些消息可能会被重复处理,source 可以利用关联标识对消息进行去重。
非并发 source: 为了保证精确一次的数据投递,source 必须是非并发的(并行度设置为1)。 这主要是由于 RabbitMQ 分发数据时是从单队列向多个消费者投递消息的。
2)至少一次: 在 checkpointing 开启的条件下,如果没有使用关联标识或者 source 是并发的, 那么 source 就只能提供至少一次的保证。
3)无任何保证: 如果没有开启 checkpointing,source 就不能提供任何的数据投递保证。 使用这种设置时,source 一旦接收到并处理消息,消息就会被自动确认。

代码:

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.rabbitmq.RMQSource;
import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig;

/**
 * Stream Source From RabbitMQ
 */
public class StreamSourceFromRabbitMQ {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 每 1000ms 开始一次 checkpoint
        env.enableCheckpointing(1000);

        // 高级选项:
        // 设置模式为精确一次 (这是默认值)
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // 确认 checkpoints 之间的时间会进行 500 ms
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500);

        // Checkpoint 必须在一分钟内完成,否则就会被抛弃
        env.getCheckpointConfig().setCheckpointTimeout(60000);

        // 允许两个连续的 checkpoint 错误
        env.getCheckpointConfig().setTolerableCheckpointFailureNumber(2);

        // 同一时间只允许一个 checkpoint 进行
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

        // 使用 externalized checkpoints,这样 checkpoint 在作业取消后仍就会被保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

        // 开启实验性的 unaligned checkpoints
        env.getCheckpointConfig().enableUnalignedCheckpoints();

        RMQConnectionConfig connectionConfig = new RMQConnectionConfig.Builder()
                .setHost("localhost")
                .setPort(5000)
                .build();

        DataStream<String> stream = env
                .addSource(new RMQSource<String>(
                        // config for the RabbitMQ connection
                        connectionConfig,
                        // name of the RabbitMQ queue to consume
                        "queueName",
                        // use correlation ids; can be false if only at-least-once is required
                        true,
                        // deserialization schema to turn messages into Java objects
                        new SimpleStringSchema()))
                .setParallelism(1);

        stream.print();
        env.execute();
    }
}