Flink从入门到实战九[Table API&SQL]-1-什么是 Table API&SQL?

Flink本身是批流统一的处理框架,所以Table API和SQL,就是批流统一的上层处理API。目前功能尚未完善,处于活跃的开发阶段。


Table API是一套内嵌在Java和Scala语言中的查询API,它允许我们以非常直观的方式,组合来自一些关系运算符的查询(比如select、filter和join)。而对于Flink SQL,就是直接可以在代码中写SQL,来实现一些查询(Query)操作。Flink的SQL支持,基于实现了SQL标准的Apache Calcite(Apache开源SQL解析工具)。

无论输入是批输入还是流式输入,在这两套API中,指定的查询都具有相同的语义,得到相同的结果。

其中还要说明的是Flink planner和Blink planner,这两个的作用是相同的,都是将我们的SQL语法解析成Flink当前可以识别的语法语义,然后执行。
区别在于Flink planner是早期官方开发的,功能相对不完整,而Blink planner是阿里主导开发的,功能在不断增强和完善,目前已经是默认的planner。

我们接下来看看如何基于Table API 和 SQL进行开发。
Table API和 SQL 程序的标准结构:
1、创建一个表环境
2、注册一个源表
3、注册一个输出表
4、基于Table API或SQL API处理数据
5、执行程序输出结果

代码示例:

import org.apache.flink.table.api.*;
import org.apache.flink.connector.datagen.table.DataGenOptions;

// Create a TableEnvironment for batch or streaming execution.
// See the "Create a TableEnvironment" section for details.
TableEnvironment tableEnv = TableEnvironment.create(/*…*/);

// Create a source table
tableEnv.createTemporaryTable("SourceTable", TableDescriptor.forConnector("datagen")
    .schema(Schema.newBuilder()
      .column("f0", DataTypes.STRING())
      .build())
    .option(DataGenOptions.ROWS_PER_SECOND, 100)
    .build());

// Create a sink table (using SQL DDL)
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable WITH ('connector' = 'blackhole') LIKE SourceTable");

// Create a Table object from a Table API query
Table table2 = tableEnv.from("SourceTable");

// Create a Table object from a SQL query
Table table3 = tableEnv.sqlQuery("SELECT * FROM SourceTable");

// Emit a Table API result Table to a TableSink, same for SQL result
TableResult tableResult = table2.insertInto("SinkTable").execute();