Flink从入门到实战四[DataStream API]-2-Environment运行时环境

Environment 是所有 Flink 程序的基础,每个 Flink 应用都需要有执行环境,Flink1.14.3的API分为两种Environment,分别是批处理的Environment和流处理的Environment。

批处理的Environment
批处理的Environment位于org.apache.flink.api.java包下,创建执行环境有三个类:
ExecutionEnvironment、LocalEnvironment、RemoteEnvironment

LocalEnvironment和RemoteEnvironment继承自ExecutionEnvironment,
并且LocalEnvironment的创建可以通过ExecutionEnvironment.createLocalEnvironment() 和 ExecutionEnvironment.createLocalEnvironment(int)来创建,
RemoteEnvironment也可以通过ExecutionEnvironment.createRemoteEnvironment()等多个重载方法来创建。

流处理的Environment
流处理的Environment位于org.apache.flink.streaming.api.environment包下,创建执行环境有三个类:
StreamExecutionEnvironment、LocalStreamEnvironment、RemoteStreamEnvironment

LocalStreamEnvironment和RemoteStreamEnvironment继承自StreamExecutionEnvironment,
并且LocalStreamEnvironment和RemoteStreamEnvironment都可以使用StreamExecutionEnvironment类的多个重载方法来创建,例如:StreamExecutionEnvironment.createLocalEnvironment() 和StreamExecutionEnvironment.createRemoteEnvironment()。

三种创建执行环境方法:
StreamExecutionEnvironment env = StreamContextEnvironment.getExecutionEnvironment();
StreamExecutionEnvironment env = StreamContextEnvironment.createLocalEnvironment();
StreamExecutionEnvironment env = StreamContextEnvironment.createRemoteEnvironment(“192.168.0.1”, 8081, “/wordcount.jar”);

我们在实际开发中,不管是用批处理Environment还是用流处理Environment来创建上下文环境,只需要使用*Environment.getExecutionEnvironment()方法就可以了。
因为这个方法内部会判断我们当前执行环境,如果程序执行再standalone环境,那么程序就创建一个本地环境,如果是通过命令行提交执行的,那么就返回命令行提交的集群环境。
源码中的方法和注释:

/**
 * Creates an execution environment that represents the context in which the program is
 * currently executed. If the program is invoked standalone, this method returns a local
 * execution environment, as returned by {@link #createLocalEnvironment()}.
 *
 * @return The execution environment of the context in which the program is executed.
 */
public static StreamExecutionEnvironment getExecutionEnvironment() {
	return getExecutionEnvironment(new Configuration());
}
/**
 * Creates an execution environment that represents the context in which the program is
 * currently executed. If the program is invoked standalone, this method returns a local
 * execution environment, as returned by {@link #createLocalEnvironment(Configuration)}.
 *
 * <p>When executed from the command line the given configuration is stacked on top of the
 * global configuration which comes from the {@code flink-conf.yaml}, potentially overriding
 * duplicated options.
 *
 * @param configuration The configuration to instantiate the environment with.
 * @return The execution environment of the context in which the program is executed.
 */
public static StreamExecutionEnvironment getExecutionEnvironment(Configuration configuration) {
	return Utils.resolveFactory(threadLocalContextEnvironmentFactory, contextEnvironmentFactory)
			.map(factory -> factory.createExecutionEnvironment(configuration))
			.orElseGet(() -> StreamExecutionEnvironment.createLocalEnvironment(configuration));
}