Flink从入门到实战四[DataStream API]-24-Sink数据输出到JDBC

Flink除了可以将结果写入到文本、Socket、Kafka、RabbitMQ等终端,还可以将Flink计算结果输出到JDBC。

我们以mysql为例来看一下具体代码。

1、引入pom

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-jdbc_2.11</artifactId>
    <version>1.14.4</version>
</dependency>

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>8.0.19</version>
</dependency>

2、启动mysql数据库

3、创建数据库

CREATE DATABASE flink_test DEFAULT CHARACTER SET utf8 COLLATE utf8_general_ci;

4、创建表

CREATE TABLE `word_count_table` (
  `id` varchar(32) NOT NULL,
  `cnt` int NOT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

5、代码

package com.itzhimei.sink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.Arrays;

/**
 * 计算结果输出到Redis
 */
public class Sink_5_Jdbc {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList(
                "hello flink",
                "hello java",
                "你好 flink",
                "hello world",
                "test",
                "source",
                "collection"));

        SingleOutputStreamOperator<Tuple2<String, Integer>> sum = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] words = value.split(" ");
                for (String word : words) {
                    out.collect(new Tuple2<>("word_" + word, 1));
                }
            }
        }).keyBy(item->item.f0)
                .sum(1);

        sum.print();
        sum.addSink(new MyJdbcSink());

        env.execute();
    }


    private static class MyJdbcSink extends RichSinkFunction<Tuple2<String, Integer>> {

        // 声明连接和预编译语句
        Connection connection = null;
        PreparedStatement insertStmt = null;
        PreparedStatement updateStmt = null;

        @Override
        public void open(Configuration parameters) throws Exception {
            connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/flink_test?useUnicode=true&serverTimezone=Asia/Shanghai&characterEncoding=UTF-8&useSSL=false&allowPublicKeyRetrieval=true", "flink", "flink");
            insertStmt = connection.prepareStatement("insert into word_count_table (id, cnt) values (?, ?)");
            updateStmt = connection.prepareStatement("update word_count_table set cnt = ? where id = ?");
        }

        @Override
        public void invoke(Tuple2<String, Integer> value, Context context) throws Exception {
            // 直接执行更新语句,如果没有更新那么就插入
            updateStmt.setInt(1, value.f1);
            updateStmt.setString(2, value.f0);
            updateStmt.execute();
            if (updateStmt.getUpdateCount() == 0) {
                insertStmt.setString(1, value.f0);
                insertStmt.setDouble(2, value.f1);
                insertStmt.execute();
            }
        }

        @Override
        public void close() throws Exception {
            insertStmt.close();
            updateStmt.close();
            connection.close();
        }

    }
}

/*
print输出内容:
(word_hello,1)
(word_flink,1)
(word_hello,2)
(word_java,1)
(word_你好,1)
(word_flink,2)
(word_hello,3)
(word_world,1)
(word_test,1)
(word_source,1)
(word_collection,1)


从数据库查询结果:
mysql> select * from word_count_table;
+-----------------+-----+
| id              | cnt |
+-----------------+-----+
| word_hello      |   3 |
| word_flink      |   2 |
| word_java       |   1 |
| word_你好       |   1 |
| word_world      |   1 |
| word_test       |   1 |
| word_source     |   1 |
| word_collection |   1 |
+-----------------+-----+
8 rows in set (0.00 sec)
 */

官方代码写法:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(...)
	.addSink(JdbcSink.sink(
			"insert into books (id, title, author, price, qty) values (?,?,?,?,?)",
			(ps, t) -> {
				ps.setInt(1, t.id);
				ps.setString(2, t.title);
				ps.setString(3, t.author);
				ps.setDouble(4, t.price);
				ps.setInt(5, t.qty);
			},
			new JdbcConnectionOptions.JdbcConnectionOptionsBuilder()
					.withUrl(getDbMetadata().getUrl())
					.withDriverName(getDbMetadata().getDriverClass())
					.build()));
env.execute();