Flink从入门到实战四[DataStream API]-23-Sink数据输出到Redis

使用Redis作为Sink,我们需要定义一个RedisSink对象,addSink方法中的RedisSink需要一个Config和一个Redis命令类。
Redis命令类定义了以哪种数据类型存储到Redis,并如何获取数据。Redis命令类实现接口RedisMapper,并重写三个对应的方法:getCommandDescription、getKeyFromData和getValueFromData。

RedisCommandDescription是具体的命令描述类,其初始化使用了枚举类RedisCommand来定义对应的Redis数据类型和操作,如下:

public enum RedisCommand {
    LPUSH(RedisDataType.LIST),
    RPUSH(RedisDataType.LIST),
    SADD(RedisDataType.SET),
    SET(RedisDataType.STRING),
    PFADD(RedisDataType.HYPER_LOG_LOG),
    PUBLISH(RedisDataType.PUBSUB),
    ZADD(RedisDataType.SORTED_SET),
    ZREM(RedisDataType.SORTED_SET),
    HSET(RedisDataType.HASH);

    private RedisDataType redisDataType;

    private RedisCommand(RedisDataType redisDataType) {
        this.redisDataType = redisDataType;
    }

    public RedisDataType getRedisDataType() {
        return this.redisDataType;
    }
}

引入pom:

<dependency>
    <groupId>org.apache.bahir</groupId>
    <artifactId>flink-connector-redis_2.11</artifactId>
    <version>1.0</version>
</dependency>

代码:

package com.itzhimei.sink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * 计算结果输出到Redis
 */
public class Sink_4_Redis {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
                "hello flink",
                "hello java",
                "你好 flink",
                "hello world",
                "test",
                "source",
                "collection"));

        //Redis配置
        FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
                .setHost("localhost")
                .setPort(6379)
                .setPassword("123456")
                .setDatabase(0)
                .build();


        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>("word_" + word, 1));
                }
            }
        }).keyBy(item->item.f0)
                .sum(1);

        sum.print();
        sum.addSink(new RedisSink(config, new MyRedisMapper()));

        env.execute();
    }

    public static class MyRedisMapper implements RedisMapper<Tuple2<String,Integer>> {

        //定义一个hash存储数据
        @Override
        public RedisCommandDescription getCommandDescription() {
            return new RedisCommandDescription(RedisCommand.HSET, "word_count");
        }

        @Override
        public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
            return stringIntegerTuple2.f0;
        }

        @Override
        public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
            return String.valueOf(stringIntegerTuple2.f1);
        }
    }

}

/*
print输出内容:
(word_hello,1)
(word_flink,1)
(word_hello,2)
(word_java,1)
(word_你好,1)
(word_flink,2)
(word_hello,3)
(word_world,1)
(word_test,1)
(word_source,1)
(word_collection,1)

查看redis保存的数据,命令:hgetall word_count
 1)  "word_hello"
 2)  "3"
 3)  "word_flink"
 4)  "2"
 5)  "word_java"
 6)  "1"
 7)  "word_你好"
 8)  "1"
 9)  "word_world"
 10)  "1"
 11)  "word_test"
 12)  "1"
 13)  "word_source"
 14)  "1"
 15)  "word_collection"
 16)  "1"
 */