Flink中的任务并发度是怎样控制的?

Flink通过并行度来控制任务的并发执行数量。调整任务并发度可以实现资源利用率的优化和吞吐量的提高。

Flink中任务并发度主要有以下几种控制方式:

  1. setParallelism:直接设置任务的并行度,适用于Source、Map、FlatMap等算子。
  2. keys:在KeyedStream上设置并行度,影响后续的所有运算符。
  3. rebalance:在DataStream上重新设置并行度,产生新流。
  4. parallelism:在Environment中设置全局并行度,作用于所有任务。
  5. maxParallelism:在Environment中设置全局最大并行度,单个任务并行度不能超过该值。

下面通过例子来说明几种任务并发度控制方式:

setParallelism:

DataStream<String> stream = env.readTextFile("input");
DataStream<Integer> result = stream.setParallelism(5)  // 设置Source并行度为5
                                 .map(String::length);  

keys:

DataStream<Tuple2<String, Integer>> stream = ...
KeyedStream<Tuple2<String,Integer>, String> result = stream.keyBy(0)  
                                              .setParallelism(10); // 设置KeyedStream并行度为10

rebalance:

DataStream<Tuple2<String, Integer>> stream = ... 
DataStream<Tuple2<String, Integer>> result = stream.rebalance().setParallelism(20);  
                                              // 重新设置DataStream并行度为20

parallelism:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(32);  // 设置全局并行度为32

DataStream<String> stream = env.readTextFile("input"); 
// Source的并行度默认为全局并行度32

maxParallelism:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setMaxParallelism(16);   // 设置全局最大并行度为16

DataStream<String> stream1 = env.readTextFile("input1");
DataStream<String> stream2 = env.readTextFile("input2");
// stream1和stream2的并行度不能超过16

Flink的任务并发度控制机制可以实现资源和负载的动态调整。
任务并发度控制是我们利用Flink实现最大资源利用率和最高性能的基础。
setParallelism、keys、rebalance、parallelism和maxParallelism是Flink中控制任务并发度的五种主要方式。作用范围和效果不同,我们需要根据实际情况选择最优或组合使用多种策略,来实现资源利用率的最大化和任务吞吐量的最优化。