Flink从入门到实战四[DataStream API]-7-Source-从Kafka中读取数据

添加pom依赖:

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka_2.11</artifactId>
    <version>1.14.3</version>
</dependency>

安装kafka
前提是本机需要具备JDK1.8或以上的版本。
1、下载kafka
https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz

2、解压
$ tar -xzf kafka_2.13-3.1.0.tgz

3、启动zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties

4、启动kafka服务
$ bin/kafka-server-start.sh config/server.properties

5、启动kafka生产者
$ bin/kafka-console-producer.sh –bootstrap-server localhost:9092 –topic topic1

kafka数据源配置:

Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));

代码:

package org.itzhimei.source;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;

import java.util.Arrays;
import java.util.Properties;

/**
 * Stream Source From Kafka
 */
public class StreamSourceFromKafka {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从Kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("group.id", "test");
        DataStream<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(), properties));

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] words = s.split(" ");
                Arrays.stream(words).forEach((String sp) -> collector.collect(new Tuple2<String, Integer>(sp, 1)));
            }
        }).keyBy(item -> item.f0)
                .sum(1);

        sum.print();
        env.execute();
    }
}

kafka生产者输入:

>hello java
>hello flink
>hello itzhimei

输出结果:

(hello,1)
(java,1)
(hello,2)
(flink,1)
(hello,3)
(itzhimei,1)