Flink中如何实现数据源和数据接收器?

Flink 中的数据源用于从外部系统读取数据进入 Flink,数据接收器用于将 Flink 的计算结果输出到外部系统。

Flink 提供了很多内置的 Source 和 Sink,我们也可以自定义实现。其中主要有:

  1. 基于文件(File-based):从文件读取数据或输出到文件。如 TextFileSource、CSVSource、TextFileSink 等。
  2. 基于 Socket:通过 Socket 读取数据或输出,如 SocketSource、SocketSink 等。
  3. 基于集合(Collection-based):从 Java 集合读取数据或输出到集合,如 FromElementsSource、ToReducerSink 等。
  4. 基于 Kafka:从 Kafka 读取消息或输出到 Kafka,如 FlinkKafkaConsumer、FlinkKafkaProducer 等。
  5. 自定义:实现 SourceFunction 或 SinkFunction 接口自定义 Source 或 Sink。

举例:

  1. TextFileSource:
DataStream<String> stream = env.readTextFile("file:///path/to/file");
  1. SocketSource:
DataStream<String> stream = env.socketTextStream("host", port);  
  1. FromElementsSource:
List<Integer> list = Arrays.asList(1, 2, 3);
DataStream<Integer> stream = env.fromCollection(list);
  1. FlinkKafkaConsumer:
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "host1:port1,host2:port2");
properties.setProperty("group.id", "consumer-group");

DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<>("topic", new SimpleStringSchema(), properties));  
  1. 自定义 Source:
public class CustomSource implements SourceFunction<String> {
    public void run(SourceContext<String> ctx) throws Exception {
        while(true) {
            ctx.collect("1");
        }
    }
}

DataStream<String> stream = env.addSource(new CustomSource());   

Flink 通过 Source 和 Sink 可以轻松连接外部系统。