Flink从入门到实战九[Table API&SQL]-3-使用EnvironmentSettings创建 TableEnvironment

上一节知道了TableEnvironment的作用,并且也知道了创建TableEnvironment有两种方法,我们来代码演示一下如何使用的。

第一种方法用到了EnvironmentSettings,我们可以手动指定是使用Flink planner,还是Blink planner,还可以指定其他设置。
例如通过EnvironmentSettings.newInstance()方法获取一个实例,然后基于建造者模式,完善EnvironmentSettings实例。
代码如下:

package com.itzhimei.tablesql;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.walkthrough.common.entity.Transaction;
import org.apache.flink.walkthrough.common.source.TransactionSource;

/**
 * 使用EnvironmentSettings创建TableEnvironment
 */
public class TableSQL_2_TableEnvironment {

    public static void main(String[] args) throws Exception {

        EnvironmentSettings envSettings = EnvironmentSettings.newInstance()
                .inStreamingMode()
                //.inBatchMode()
                .build();

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Transaction> dataStream = env
                .addSource(new TransactionSource()) //TransactionSource可以查看前面章节,有源码分析讲解
                .name("transactions");

        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env,envSettings);

        Table table = tableEnv.fromDataStream(dataStream);

        //使用table api查询数据
        Table result1 = table.select("*").where("amount >= 500");

        //使用sql查询数据
        tableEnv.createTemporaryView("transactions", table);
        Table result2 = tableEnv.sqlQuery("select * from transactions where amount >= 500");

        DataStream<Row> rowDataStream1 = tableEnv.toDataStream(result1);
        DataStream<Row> rowDataStream2 = tableEnv.toDataStream(result2);

        rowDataStream1.print("rowDataStream1");
        rowDataStream2.print("rowDataStream2");
        env.execute();

    }
}

/* 输出结果
rowDataStream2:8> +I[3, 1546281720000, 871.15]
rowDataStream1:1> +I[3, 1546281720000, 871.15]
rowDataStream2:1> +I[3, 1546299720000, 871.15]
rowDataStream1:2> +I[3, 1546299720000, 871.15]
rowDataStream1:3> +I[3, 1546317720000, 871.15]
rowDataStream2:2> +I[3, 1546317720000, 871.15]
rowDataStream1:4> +I[3, 1546335720000, 871.15]
rowDataStream2:3> +I[3, 1546335720000, 871.15]
rowDataStream1:5> +I[3, 1546353720000, 871.15]
rowDataStream2:4> +I[3, 1546353720000, 871.15]
 */