Environment 是所有 Flink 程序的基础,每个 Flink 应用都需要有执行环境,Flink1.14.3的API分为两种Environment,分别是批处理的Environment和流处理的Environment。
批处理的Environment
批处理的Environment位于org.apache.flink.api.java包下,创建执行环境有三个类:
ExecutionEnvironment、LocalEnvironment、RemoteEnvironment
LocalEnvironment和RemoteEnvironment继承自ExecutionEnvironment,
并且LocalEnvironment的创建可以通过ExecutionEnvironment.createLocalEnvironment() 和 ExecutionEnvironment.createLocalEnvironment(int)来创建,
RemoteEnvironment也可以通过ExecutionEnvironment.createRemoteEnvironment()等多个重载方法来创建。
流处理的Environment
流处理的Environment位于org.apache.flink.streaming.api.environment包下,创建执行环境有三个类:
StreamExecutionEnvironment、LocalStreamEnvironment、RemoteStreamEnvironment
LocalStreamEnvironment和RemoteStreamEnvironment继承自StreamExecutionEnvironment,
并且LocalStreamEnvironment和RemoteStreamEnvironment都可以使用StreamExecutionEnvironment类的多个重载方法来创建,例如:StreamExecutionEnvironment.createLocalEnvironment() 和StreamExecutionEnvironment.createRemoteEnvironment()。
三种创建执行环境方法:
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamContextEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamContextEnvironment.createRemoteEnvironment(“192.168.0.1”, 8081, “/wordcount.jar”);
我们在实际开发中,不管是用批处理Environment还是用流处理Environment来创建上下文环境,只需要使用*Environment.getExecutionEnvironment()方法就可以了。
因为这个方法内部会判断我们当前执行环境,如果程序执行再standalone环境,那么程序就创建一个本地环境,如果是通过命令行提交执行的,那么就返回命令行提交的集群环境。
源码中的方法和注释:
/**
* Creates an execution environment that represents the context in which the program is
* currently executed. If the program is invoked standalone, this method returns a local
* execution environment, as returned by {@link #createLocalEnvironment()}.
*
* @return The execution environment of the context in which the program is executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment() {
return getExecutionEnvironment(new Configuration());
}
/**
* Creates an execution environment that represents the context in which the program is
* currently executed. If the program is invoked standalone, this method returns a local
* execution environment, as returned by {@link #createLocalEnvironment(Configuration)}.
*
* <p>When executed from the command line the given configuration is stacked on top of the
* global configuration which comes from the {@code flink-conf.yaml}, potentially overriding
* duplicated options.
*
* @param configuration The configuration to instantiate the environment with.
* @return The execution environment of the context in which the program is executed.
*/
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
.map(factory -> factory.createExecutionEnvironment(configuration))
.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}