Flink从入门到实战四[DataStream API]-13-Transform基础算子Map、FlatMap和Filter

Transform的基础算子主要用来做数据转换和过滤,包括:map、flatMap和filter。

Map将DataStream类型的流经过处理后,依旧返回DataStream流。
FlatMap同样是将DataStream类型的流经过处理后,返回DataStream流,FlatMap还有一个作用就是数据扁平化处理,作用类似Lambda中的flatMap作用。
Filter对DataStream进行过滤,输出符合条件的数据。

下面分别演示Map、FlatMap和Filter的代码:

Map:

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

/**
 * Map DataStream → DataStream
 *
 */
public class Transform_1_Map {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(10, 20, 30, 11, 12, 13));

        //map 匿名类写法
        SingleOutputStreamOperator<Integer> map = dataStreamSource.map(new MapFunction<Integer, Integer>() {
            @Override
            public Integer map(Integer integer) throws Exception {
                return integer*2;
            }
        });
        map.print("map");

        //map lambda写法
        SingleOutputStreamOperator<Integer> map2 = dataStreamSource.map((Integer a) -> a*2);
        map2.print();

        env.execute();
    }
}

/* 输出
7> 40
11> 26
8> 60
9> 22
map:12> 40
6> 20
10> 24
map:15> 24
map:14> 22
map:16> 26
map:13> 60
map:11> 20
 */

FlatMap:

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.Arrays;

/**
 * FlatMap DataStream → DataStream
 *
 */
public class Transform_2_FlatMap {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //flatMap
        DataStreamSource<String> dataStreamSource = env.fromCollection(Arrays.asList("hello flink", "hello java", "hi program"));
        //进行分词
        SingleOutputStreamOperator<String> flatMap = dataStreamSource.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public void flatMap(String s, Collector<String> collector) throws Exception {
                String[] s1 = s.split(" ");
                for (String s2 : s1) {
                    collector.collect(s2);
                }
            }
        });
        flatMap.print("flatMap");

        env.execute();
    }
}


/*输出
flatMap:10> hello
flatMap:12> hi
flatMap:11> hello
flatMap:12> program
flatMap:10> flink
flatMap:11> java
*/
	
	

Filter:

package org.itzhimei.transform;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.Arrays;

public class Transform_3_Filter {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<Integer> dataStreamSource = env.fromCollection(Arrays.asList(10, 20, 30, 11, 12, 13));

        SingleOutputStreamOperator<Integer> filter = dataStreamSource.filter(new FilterFunction<Integer>() {
            @Override
            public boolean filter(Integer x) throws Exception {
                return x > 15;
            }
        });

        filter.print();

        env.execute();
    }
}

//输出:
/*
2> 20
3> 30
*/