Flink DataSet执行报错如下:
Exception in thread "main" java.lang.RuntimeException: No new data sinks have been defined since the last execution. The last execution refers to the latest call to 'execute()', 'count()', 'collect()', or 'print()'.
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1165)
at org.apache.flink.api.java.ExecutionEnvironment.createProgramPlan(ExecutionEnvironment.java:1145)
at org.apache.flink.api.java.ExecutionEnvironment.executeAsync(ExecutionEnvironment.java:1041)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:958)
at org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:942)
at com.itzhimei.WordCountBatch.main(WordCountBatch.java:31)
代码:
public static void main(String[] args) throws Exception {
//创建执行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//获取数据源
String input = "D:\\wordcount.txt";
DataSet<String> inputDataSet = env.readTextFile(input);
DataSet<Tuple2<String, Integer>> result = inputDataSet.flatMap(new MyBatchFlatMapper())
.groupBy(0)
.sum(1);
result.print();
env.execute();
}
根据提示分析原因:
在最后一行代码“env.execute()”执行的时候,没有新的数据接收器被定义,对于Flink批处理而前一行代码“result.print()”已经触发了代码的执行和输出,所以再执行“env.execute()”,就是多余的了,因此报了上面的异常。
解决方法:
去掉最后一行代码“env.execute();” 就可以了。