看异常其实问题已经比较明显了:
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Specifying keys via field positions is only valid for tuple data types. Type: GenericType<scala.Tuple2>
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:239)
at org.apache.flink.api.common.operators.Keys$ExpressionKeys.<init>(Keys.java:232)
at org.apache.flink.streaming.api.datastream.DataStream.keyBy(DataStream.java:323)
at org.itzhimei.transform.Transform_4_KeyBy.main(Transform_4_KeyBy.java:34)
代码:
SingleOutputStreamOperator<Tuple2<String, Integer>> keyBy = dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] s1 = s.split(" ");
for (String s2 : s1) {
collector.collect(new Tuple2<>(s2, 1));
}
}
}).keyBy(0)
.sum(1);
keyBy用法keyBy(0)或者keyBy(item->item._1)
原因:
Tuple2引用错误,用错了包。因为在Flink中Tuple2即有Java的Tuple2也有Scala的Tuple2,报错是因为这里引入了Scala的Tuple2,正确是的要引入Flink的Tuple2。