Transform的基础算子主要用来做数据转换和过滤,包括:map、flatMap和filter。
Map将DataStream类型的流经过处理后,依旧返回DataStream流。
FlatMap同样是将DataStream类型的流经过处理后,返回DataStream流,FlatMap还有一个作用就是数据扁平化处理,作用类似Lambda中的flatMap作用。
Filter对DataStream进行过滤,输出符合条件的数据。
下面分别演示Map、FlatMap和Filter的代码:
Map:
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
/**
* Map DataStream → DataStream
*
*/
public class Transform_1_Map {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(10, 20, 30, 11, 12, 13));
//map 匿名类写法
SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer integer) throws Exception {
return integer*2;
}
});
map.print("map");
//map lambda写法
SingleOutputStreamOperator<Integer> map2 = dataStreamSource.map((Integer a) -> a*2);
map2.print();
env.execute();
}
}
/* 输出
7> 40
11> 26
8> 60
9> 22
map:12> 40
6> 20
10> 24
map:15> 24
map:14> 22
map:16> 26
map:13> 60
map:11> 20
*/
FlatMap:
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
import java.util.Arrays;
/**
* FlatMap DataStream → DataStream
*
*/
public class Transform_2_FlatMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//flatMap
DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("hello flink", "hello java", "hi program"));
//进行分词
SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
String[] s1 = s.split(" ");
for (String s2 : s1) {
collector.collect(s2);
}
}
});
flatMap.print("flatMap");
env.execute();
}
}
/*输出
flatMap:10> hello
flatMap:12> hi
flatMap:11> hello
flatMap:12> program
flatMap:10> flink
flatMap:11> java
*/
Filter:
package org.itzhimei.transform;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.Arrays;
public class Transform_3_Filter {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(10, 20, 30, 11, 12, 13));
SingleOutputStreamOperator<Integer> filter = dataStreamSource.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer x) throws Exception {
return x > 15;
}
});
filter.print();
env.execute();
}
}
//输出:
/*
2> 20
3> 30
*/