Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
使用FlinkKafkaProducer要先构造一个FlinkKafkaProducer对象,其构造器为:
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()));
}
四个参数分别是:
1、写入Kafka的topic
2、序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
3、Kafka client 的 Properties。“bootstrap.servers” (逗号分隔 Kafka broker 列表)
4、容错语义
接下来我们来实践一下。
1、引入jar包
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14.3</version>
</dependency>
2、启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties
3、启动kafka:
bin/kafka-server-start.sh config/server.properties
4、新建kafka消费者:
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic my-topic
5、代码:
package com.itzhimei.sink;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Properties;
/**
* 计算结果输出到Kafka
*/
public class Sink_3_Kafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
"hello flink",
"hello java",
"你好 flink",
"hello world",
"test",
"source",
"collection"));
//Kafka配置
//从Kafka中读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
dataStreamSource.addSink(new FlinkKafkaProducer<String>(
"my-topic",
new SimpleStringSchema(),
properties
));
env.execute();
}
}
/*
输出内容:
my-topic
hello flink
hello java
你好 flink
hello world
test
source
collection
*/