Flink从入门到实战四[DataStream API]-22-Sink数据输出到Kafka

Flink Kafka Producer 被称为 FlinkKafkaProducer。它允许将消息流写入一个或多个 Kafka topic。
使用FlinkKafkaProducer要先构造一个FlinkKafkaProducer对象,其构造器为:

public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
	this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner()));
}

四个参数分别是:
1、写入Kafka的topic
2、序列化数据写入 Kafka 的 SerializationSchema / KafkaSerializationSchema
3、Kafka client 的 Properties。“bootstrap.servers” (逗号分隔 Kafka broker 列表)
4、容错语义

接下来我们来实践一下。

1、引入jar包

<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-connector-kafka_2.11</artifactId>
  <version>1.14.3</version>
</dependency>

2、启动zookeeper:
bin/zookeeper-server-start.sh config/zookeeper.properties

3、启动kafka:
bin/kafka-server-start.sh config/server.properties

4、新建kafka消费者:
bin/kafka-console-consumer.sh –bootstrap-server localhost:9092 –topic my-topic

5、代码:

package com.itzhimei.sink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Properties;

/**
 * 计算结果输出到Kafka
 */
public class Sink_3_Kafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
                "hello flink",
                "hello java",
                "你好 flink",
                "hello world",
                "test",
                "source",
                "collection"));

        //Kafka配置
        //从Kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");

        
        dataStreamSource.addSink(new FlinkKafkaProducer<String>(
                "my-topic",
                new SimpleStringSchema(),
                properties
        ));
        env.execute();
    }
}


/*
输出内容:
my-topic
hello flink
hello java
你好 flink
hello world
test
source
collection
 */