添加pom依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka_2.11</artifactId>
<version>1.14.3</version>
</dependency>
安装kafka
前提是本机需要具备JDK1.8或以上的版本。
1、下载kafka
https://dlcdn.apache.org/kafka/3.1.0/kafka_2.13-3.1.0.tgz
2、解压
$ tar -xzf kafka_2.13-3.1.0.tgz
3、启动zookeeper
$ bin/zookeeper-server-start.sh config/zookeeper.properties
4、启动kafka服务
$ bin/kafka-server-start.sh config/server.properties
5、启动kafka生产者
$ bin/kafka-console-producer.sh –bootstrap-server localhost:9092 –topic topic1
kafka数据源配置:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic1", new SimpleStringSchema(), properties));
代码:
package org.itzhimei.source;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.util.Collector;
import java.util.Arrays;
import java.util.Properties;
/**
* Stream Source From Kafka
*/
public class StreamSourceFromKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//从Kafka中读取数据
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
properties.setProperty("group.id", "test");
DataStream<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(), properties));
SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] words = s.split(" ");
Arrays.stream(words).forEach((String sp) -> collector.collect(new Tuple2<String, Integer>(sp, 1)));
}
}).keyBy(item -> item.f0)
.sum(1);
sum.print();
env.execute();
}
}
kafka生产者输入:
>hello java
>hello flink
>hello itzhimei
输出结果:
(hello,1)
(java,1)
(hello,2)
(flink,1)
(hello,3)
(itzhimei,1)