跳到主要内容

06、Flink 笔记 - transform算子

一、map

map算子把input类型数据转换为output类型数据
eg:String input = “sensor,123456,33.0”
SensorReading(“sensor”,123456L,33.0d)
dataStream.map(new Mapfunction<input,output>{xxx})

import com.tan.flink.bean.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_Map {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<String> inputDataStream = env.socketTextStream("192.168.200.102", 9999);
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.map(new MapFunction<String, SensorReading>() {
   
     
            @Override
            public SensorReading map(String input) throws Exception {
   
     
                String[] fields = input.split(",");
                String id = fields[0];
                Long timestamp = Long.parseLong(fields[1]);
                Double temperatur = Double.parseDouble(fields[2]);
                return new SensorReading(id, timestamp, temperatur);
            }
        });
        resultDataStream.print();
        env.execute();
    }
}

二、flatMap

flatMap 扁平化算子:把输入input类型转化为output类型输出,与map不同,flatmap输出多个output类型。
eg:String input = “hello,word”
output:Tuple2(“hello”,1)、Tuple2(“word”,1)
dataStream.flatMap(new FlatMapFunction<input,outpu>{xxx})

inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
   
     
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
   
     
                String[] words = input.split(",");
                for (String word : words) {
   
     
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });

三、filter

filter算子把input类型输入数据过滤出来 。为true留下来,false过滤掉。
eg:String input = “hello,world”
dataStream.filter(new FilterFunction{return boolean})

inputDataStream.filter(new FilterFunction<String>() {
   
     
            @Override
            public boolean filter(String input) throws Exception {
   
     
                return input.contains("hello");
            }
        });

四、keyby

DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
 
eg:dataStream.keyby(param)
param 可以是输入数据字段下标,默认从0开始,比如Tuple2(word,1),param=0,就是按照word分流。
param 可以是输入数据的字段名称,比如一个类Sensor有三个字段:id,timestamp,temperature。param=“id”,就是按照id进行分流。
还有其他KeySelector。后面结合滚动聚合算子进行案例。

五、滚动聚合算子(rolling Aggregation)

5.1、sum

案例:统计wordcount

SingleOutputStreamOperator<Tuple2<String, Integer>> wordDataStream = inputDataStream.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
   
     
            @Override
            public void flatMap(String input, Collector<Tuple2<String, Integer>> collector) throws Exception {
   
     
                String[] words = input.split(" ");
                for (String word : words) {
   
     
                    collector.collect(new Tuple2<>(word, 1));
                }
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Integer>> resultDataStream = wordDataStream.keyBy(0)
                .sum(1);

5.2、max

选择每条流的最大值
案例:根据输入数据(比如 第一次输入 hello,9 、第二次输入 world,7,结果还是 hello,9)

inputDataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
   
     
            @Override
            public Tuple2<String, Long> map(String input) throws Exception {
   
     
                String[] fields = input.split(" ");
                return new Tuple2(fields[0], Long.parseLong(fields[1]));
            }
        }).keyBy(0)
                .max(1);

5.3、min

选择每条流的最小值
案例:根据输入数据(比如 第一次输入 hello,9 、第二次输入 world,7,结果是 hello,7)

inputDataStream.map(new MapFunction<String, Tuple2<String, Long>>() {
   
     
            @Override
            public Tuple2<String, Long> map(String input) throws Exception {
   
     
                String[] fields = input.split(" ");
                return new Tuple2(fields[0], Long.parseLong(fields[1]));
            }
        }).keyBy(0)
                .min(1);

5.4、minby

针对keyedStream中的某个字段数据进行选择最小值
案例:根据每次传感器id传来的数据选择温度最小的

DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
                .minBy("temperature");

5.5、maxby

原理同maxby

5.6、reduce

KeyedStream → DataStream:一个分组数据流的聚合操作,合并当前的元素和上次聚合的结果,产生一个新的值,返回的流中包含每一次聚合的结果,而不是只返回最后一次聚合的最终结果。
案例:根据传感器id传来的数据比较上一次时间戳的温度,选择最大温度的时间戳数据

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_keyed_Reduce {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDataStream = env.addSource(new SourceFromCustom.CustomSource());
        SingleOutputStreamOperator<SensorReading> resultDataStream = inputDataStream.keyBy("id")
                .reduce(new CustomReduceFunction());
        resultDataStream.print();
        env.execute();
    }

    public static class CustomReduceFunction implements ReduceFunction<SensorReading> {
   
     
        @Override
        public SensorReading reduce(SensorReading sensorReading, SensorReading input) throws Exception {
   
     
            String id = sensorReading.getId();
            Long timestamp = input.getTimestamp();
            double temperature = Math.max(sensorReading.getTemperature(), input.getTemperature());
            return new SensorReading(id, timestamp, temperature);
        }
    }
}

六、split和select

6.1、split

DataStream → SplitStream:根据某些特征把一个 DataStream 拆分成两个或者多个 DataStream。
 

6.2、select

SplitStream→DataStream:从一个 SplitStream 中获取一个或者多个
DataStream。
 

6.3、案例

根据传感器的温度,以60度为标准,大于等于60度为high流,其他为low流

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_Split_Select {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
        SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
   
     
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
   
     
                Double temperature = sensorReading.getTemperature();
                if (temperature >= 60) {
   
     
                    return Lists.newArrayList("high");
                } else {
   
     
                    return Lists.newArrayList("low");
                }
            }
        });

        DataStream<SensorReading> high = splitStream.select("high");
        DataStream<SensorReading> low = splitStream.select("low");
        DataStream<SensorReading> all = splitStream.select("high", "low");

        high.print("high").setParallelism(1);
        low.print("low").setParallelism(1);
        all.print("all").setParallelism(1);

        env.execute();
    }
}

七、connect和CoMap

7.1、connect

DataStream,DataStream → ConnectedStreams:连接两个保持他们类型的数据流,两个数据流被 Connect 之后,只是被放在了一个同一个流中,内部依然保持各自的数据和形式不发生任何变化,两个流相互独立。输入数据类型可以一样也可以不一样。
 

7.2、CoMap、CoFlatMap

ConnectedStreams → DataStream:作用于 ConnectedStreams 上,功能与 map和 flatMap 一样,对 ConnectedStreams 中的每一个 Stream 分别进行 map 和 flatMap处理。最终处理的结果可以一样也可以不一样。

7.3、案例

根据传感器传来的温度数据分为高温度和低温度两条流进行connect,进行CoMap或者CoFlatMap算子之后输出数据。

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.*;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

public class Transform_Connect_CoMap {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
        SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
   
     
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
   
     
                Double temperature = sensorReading.getTemperature();
                if (temperature >= 60) {
   
     
                    return Lists.newArrayList("high");
                } else {
   
     
                    return Lists.newArrayList("low");
                }
            }
        });

        DataStream<SensorReading> high = splitStream.select("high");
        SingleOutputStreamOperator<Tuple2<String, Double>> highDataStream = high.map(new MapFunction<SensorReading, Tuple2<String, Double>>() {
   
     
            @Override
            public Tuple2<String, Double> map(SensorReading sensorReading) throws Exception {
   
     
                return new Tuple2<>(sensorReading.getId(), sensorReading.getTemperature());
            }
        });
        DataStream<SensorReading> lowDataStream = splitStream.select("low");

        ConnectedStreams<Tuple2<String, Double>, SensorReading> connectDataStream = highDataStream.connect(lowDataStream);
        SingleOutputStreamOperator<Object> resultDataStream = connectDataStream.map(new CoMapFunction<Tuple2<String, Double>, SensorReading, Object>() {
   
     
            @Override
            public Object map1(Tuple2<String, Double> input) throws Exception {
   
     
                // 处理高温数据
                return new Tuple3<>(input.f0, input.f1, "warnning");
            }

            @Override
            public Object map2(SensorReading input) throws Exception {
   
     
                // 处理正常温度数据
                return new Tuple3<>(input.getId(), input.getTimestamp(), input.getTemperature());
            }
        });
        resultDataStream.print();
        env.execute();

    }
}

八、Union

DataStream → DataStream:对两个或者两个以上的DataStream进行union操作,产生一个包含所有DataStream元素的新DataStream。注意:如果你将一个DataStream跟它自己做union操作,在新的DataStream中,你将看到每一个元素都出现两次。
 

8.2、Connect 与 Union 区别:

Union 之前两个流的类型必须是一样,Connect 可以不一样,在之后的 coMap中可以再去调整成为一样的,也可以不一样的。
Connect 只能操作两个流,Union 可以操作多个。

8.1 案例

根据传感器传来的温度数据分为高温度和低温度两条流进行connect,进行union。

import com.tan.flink.bean.SensorReading;
import com.tan.flink.source.SourceFromCustom;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.SplitStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Transform_Union {
   
     
    public static void main(String[] args) throws Exception {
   
     
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStreamSource<SensorReading> inputDatStream = env.addSource(new SourceFromCustom.CustomSource());
        SplitStream<SensorReading> splitStream = inputDatStream.split(new OutputSelector<SensorReading>() {
   
     
            @Override
            public Iterable<String> select(SensorReading sensorReading) {
   
     
                Double temperature = sensorReading.getTemperature();
                if (temperature >= 60) {
   
     
                    return Lists.newArrayList("high");
                } else {
   
     
                    return Lists.newArrayList("low");
                }
            }
        });

        DataStream<SensorReading> high = splitStream.select("high");
        DataStream<SensorReading> low = splitStream.select("low");

        DataStream<SensorReading> unionDataStream = high.union(low);
        SingleOutputStreamOperator<Tuple3<String, Long, Object>> resultDataStream = unionDataStream.map(new MapFunction<SensorReading, Tuple3<String, Long, Object>>() {
   
     
            @Override
            public Tuple3<String, Long, Object> map(SensorReading input) throws Exception {
   
     
                if (input.getTemperature() >= 60) {
   
     
                    return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), "warnning");
                } else {
   
     
                    return new Tuple3<String, Long, Object>(input.getId(), input.getTimestamp(), input.getTemperature());
                }
            }
        });

        resultDataStream.print();
        env.execute();
    }
}