使用Redis作为Sink,我们需要定义一个RedisSink对象,addSink方法中的RedisSink需要一个Config和一个Redis命令类。
Redis命令类定义了以哪种数据类型存储到Redis,并如何获取数据。Redis命令类实现接口RedisMapper,并重写三个对应的方法:getCommandDescription、getKeyFromData和getValueFromData。
RedisCommandDescription是具体的命令描述类,其初始化使用了枚举类RedisCommand来定义对应的Redis数据类型和操作,如下:
public enum RedisCommand {
LPUSH(RedisDataType.LIST),
RPUSH(RedisDataType.LIST),
SADD(RedisDataType.SET),
SET(RedisDataType.STRING),
PFADD(RedisDataType.HYPER_LOG_LOG),
PUBLISH(RedisDataType.PUBSUB),
ZADD(RedisDataType.SORTED_SET),
ZREM(RedisDataType.SORTED_SET),
HSET(RedisDataType.HASH);
private RedisDataType redisDataType;
private RedisCommand(RedisDataType redisDataType) {
this.redisDataType = redisDataType;
}
public RedisDataType getRedisDataType() {
return this.redisDataType;
}
}
引入pom:
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.11</artifactId>
<version>1.0</version>
</dependency>
代码:
package com.itzhimei.sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.redis.RedisSink;
import org.apache.flink.streaming.connectors.redis.common.config.FlinkJedisPoolConfig;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommand;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisCommandDescription;
import org.apache.flink.streaming.connectors.redis.common.mapper.RedisMapper;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* 计算结果输出到Redis
*/
public class Sink_4_Redis {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
"hello flink",
"hello java",
"你好 flink",
"hello world",
"test",
"source",
"collection"));
//Redis配置
FlinkJedisPoolConfig config = new FlinkJedisPoolConfig.Builder()
.setHost("localhost")
.setPort(6379)
.setPassword("123456")
.setDatabase(0)
.build();
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>("word_" + word, 1));
}
}
}).keyBy(item->item.f0)
.sum(1);
sum.print();
sum.addSink(new RedisSink(config, new MyRedisMapper()));
env.execute();
}
public static class MyRedisMapper implements RedisMapper<Tuple2<String,Integer>> {
//定义一个hash存储数据
@Override
public RedisCommandDescription getCommandDescription() {
return new RedisCommandDescription(RedisCommand.HSET, "word_count");
}
@Override
public String getKeyFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return stringIntegerTuple2.f0;
}
@Override
public String getValueFromData(Tuple2<String, Integer> stringIntegerTuple2) {
return String.valueOf(stringIntegerTuple2.f1);
}
}
}
/*
print输出内容:
(word_hello,1)
(word_flink,1)
(word_hello,2)
(word_java,1)
(word_你好,1)
(word_flink,2)
(word_hello,3)
(word_world,1)
(word_test,1)
(word_source,1)
(word_collection,1)
查看redis保存的数据,命令:hgetall word_count
1) "word_hello"
2) "3"
3) "word_flink"
4) "2"
5) "word_java"
6) "1"
7) "word_你好"
8) "1"
9) "word_world"
10) "1"
11) "word_test"
12) "1"
13) "word_source"
14) "1"
15) "word_collection"
16) "1"
*/