美文网首页
Flink-Streaming-算子学习-01

Flink-Streaming-算子学习-01

作者: 李小李的路 | 来源:发表于2019-07-14 18:06 被阅读0次

    概述

    • 系统的学习一下flink的streaming 算子操作,学习一下新的技术,增加一下个人的技术栈储备,java/scala的api;
    • Flink流式算子api官网连接:DataStream Transformations
    • 建议多阅读官网,Flink在阿里的推动下,发展很猛,在官网已经有可中文版,可以尝试阅读一下,边阅读边总结;
    • flink 1.9.0官网释义

    数据准备

    • 先准备一份测试数据,flink消费kafka_010的数据,不采用batch的方式;

    • scala的api练习详见github;

    • github:flink_way (https://github.com/yahuili1128/flink_way)

    • java代码中flink消费kafka中的单个topic,scla代码中消费的是两个topic

    • java代码中先运行mockup包下方法,产生测试数据,即可运行算子操作的demo;

    streaming算子

    Map

    • 处理DataStream中的每一个元素;
    • 官网释义如下:
    DataStream → DataStream 
    Takes one element and produces one element. A map function that doubles the values of the input stream:
    
    DataStream<Integer> dataStream = //...
    dataStream.map(new MapFunction<Integer, Integer>() {
        @Override
        public Integer map(Integer value) throws Exception {
            return 2 * value;
        }
    });
    
    • demo
    /**
         * 获取的是整个pojo
         *  MockUpModel(name=yahui, gender=female, timestamp=1563090399305, age=34)
         * @param kafkaData
         * @return
         */
        private static SingleOutputStreamOperator<MockUpModel> getAddAgePojo(SingleOutputStreamOperator<MockUpModel> kafkaData) {
            //      返回的是MockUpModel pojo类,其中age字段均+5
            return kafkaData.map(new MapFunction<MockUpModel, MockUpModel>() {
                @Override
                public MockUpModel map(MockUpModel value) throws Exception {
                    MockUpModel mockUpModel = new MockUpModel();
                    mockUpModel.name = value.name;
                    mockUpModel.gender = value.gender;
                    mockUpModel.age = value.age + 5;
                    mockUpModel.timestamp = value.timestamp;
                    return mockUpModel;
    
                }
            });
        }
    

    FlatMap

    • 将DataStream中的每一个元素展开返回0到多个元素,以iterator的形式返回;
    • 官网释义
    DataStream → DataStream 
    Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
    
    dataStream.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public void flatMap(String value, Collector<String> out)
            throws Exception {
            for(String word: value.split(" ")){
                out.collect(word);
            }
        }
    });
    
    • demo
    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SingleOutputStreamOperator<MockUpModel> kafkaData = getKafka010Data(env);
    
            //      这个操作不能反应flatMap的算子作用,下面的作用相当于filter,输出结果为MockUpModel(name=liyahui-0, gender=male, timestamp=1561516105296, age=0)
            kafkaData.flatMap(new FlatMapFunction<MockUpModel, MockUpModel>() {
                @Override
                public void flatMap(MockUpModel value, Collector<MockUpModel> out) throws Exception {
                    if (value.age % 2 == 0) {
                        out.collect(value);
                    }
                }
            }).print().setParallelism(1);
            //   flatmap,是将嵌套集合转换并平铺成非嵌套集合。最好的解释详见官网的释义
            /*
            dataStream.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public void flatMap(String value, Collector<String> out)
                        throws Exception {
                    for(String word: value.split(" ")){
                        out.collect(word);
                    }
                }
            });
            */
    
    
            env.execute("flink kafka010 demo");
        }
    

    Fliter

    • 过滤,用于DataStream的每一个元素,返回的是true或者false
    DataStream → DataStream 
    Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
    
    dataStream.filter(new FilterFunction<Integer>() {
        @Override
        public boolean filter(Integer value) throws Exception {
            return value != 0;
        }
    });
    
    • demo
    // method 1
    /**
         * 过滤出年龄是偶数的人
         *
         * @param kafkaData
         * @return
         */
        private static SingleOutputStreamOperator<MockUpModel> getFilterDS2(SingleOutputStreamOperator<MockUpModel> kafkaData) {
            return kafkaData.filter(new FilterFunction<MockUpModel>() {
                @Override
                public boolean filter(MockUpModel value) throws Exception {
                    if (value.age % 2 == 0) {
                        return true;
                    }
                    return false;
                }
            });
        }
    // method-2
    /**
         * lambda 的方式
         *
         * @param kafkaData
         * @return
         */
        private static SingleOutputStreamOperator<MockUpModel> getFilterDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
            return kafkaData.filter(line -> line.age % 2 == 0);
        }
    

    KeyBy

    • 对DataStream进行按照指定的key进行分区
    • 注意:pojo不能重写hashCode方法;
    DataStream → KeyedStream 
    
    Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, *keyBy()* is implemented with hash partitioning. There are different ways to [specify keys](https://ci.apache.org/projects/flink/flink-docs-
    master/dev/api_concepts.html#specifying-keys).
    
    This transformation returns a *KeyedStream*, which is, among other things, required to use [keyed state](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#keyed-state).
    
    
    dataStream.keyBy("someKey") // Key by field "someKey"
    dataStream.keyBy(0) // Key by the first element of a Tuple
    
    
    Attention A type **cannot be a key** if:
    
    1.  it is a POJO type but does not override the *hashCode()* method and relies on the *Object.hashCode()* implementation.
    2.  it is an array of any type.
    
    • demo
    /**
         * 以年龄为分组条件进行keyBy
         *
         * @param kafkaData
         * @return
         */
        private static KeyedStream<MockUpModel, Integer> getKeyedDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
            //  lambda 表达式
            //kafkaData.keyBy(line -> line.age).print().setParallelism(1);
            return kafkaData.keyBy(new KeySelector<MockUpModel, Integer>() {
                @Override
                public Integer getKey(MockUpModel value) throws Exception {
                    return value.age;
                }
            });
        }
    

    Reduce

    • 叠加操作
    Reduce
    KeyedStream → DataStream    
    A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value. 
    
    A reduce function that creates a stream of partial sums:
    
    keyedStream.reduce(new ReduceFunction<Integer>() {
        @Override
        public Integer reduce(Integer value1, Integer value2)
        throws Exception {
            return value1 + value2;
        }
    });
    
    • demo
    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            SingleOutputStreamOperator<MockUpModel> kafka010Data = getKafka010Data(env);
            //      lambda +java
            kafka010Data.keyBy(line -> line.gender).reduce(new ReduceFunction<MockUpModel>() {
                @Override
                public MockUpModel reduce(MockUpModel value1, MockUpModel value2) throws Exception {
                    MockUpModel mockUpModel = new MockUpModel();
                    mockUpModel.name = value1.name + "--" + value2.name;
                    mockUpModel.gender = value1.gender;
                    mockUpModel.age = (value1.age + value2.age) / 2;
                    return mockUpModel;
    
                }
            }).print().setParallelism(1);
    
            env.execute("flink kafka010 demo");
        }
    

    Fold

    • 将keyedDS转成DS;在V1.9中是过期函数
    • 官网释义
    KeyedStream → DataStream    
    A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value. 
    
    A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
    
    DataStream<String> result =
      keyedStream.fold("start", new FoldFunction<Integer, String>() {
        @Override
        public String fold(String current, Integer value) {
            return current + "-" + value;
        }
      });
    

    Aggregations

    • 聚合函数,获取keyedStream中的最大/最小/sum值, max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。
    KeyedStream → DataStream    
    Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
    
    keyedStream.sum(0);
    keyedStream.sum("key");
    keyedStream.min(0);
    keyedStream.min("key");
    keyedStream.max(0);
    keyedStream.max("key");
    keyedStream.minBy(0);
    keyedStream.minBy("key");
    keyedStream.maxBy(0);
    keyedStream.maxBy("key");
    

    union

    • 聚合算子,可以将多个结构相同的DataStream进行union;
    DataStream* → DataStream    
    Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
    
    dataStream.union(otherStream1, otherStream2, ...);
    

    connect

    • 用来将两个dataStream组装成一个ConnectedStreams,而且这个connectedStream的组成结构就是保留原有的dataStream的结构体;这样我们就可以把不同的数据组装成同一个结构;
    Connect
    DataStream,DataStream → ConnectedStreams    
    "Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
    
    DataStream<Integer> someStream = //...
    DataStream<String> otherStream = //...
    
    ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
    

    split

    • Split就是将一个DataStream分成两个或者多个DataStream
    DataStream → SplitStream    
    Split the stream into two or more streams according to some criterion.
    
    SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
        @Override
        public Iterable<String> select(Integer value) {
            List<String> output = new ArrayList<String>();
            if (value % 2 == 0) {
                output.add("even");
            }
            else {
                output.add("odd");
            }
            return output;
        }
    });
    

    select

    • Select就是获取分流后对应的数据
    SplitStream → DataStream    
    Select one or more streams from a split stream.
    
    SplitStream<Integer> split;
    DataStream<Integer> even = split.select("even");
    DataStream<Integer> odd = split.select("odd");
    DataStream<Integer> all = split.select("even","odd");
    

    iterate

    DataStream → IterativeStream → DataStream 
    
    Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html#iterations) for a complete description.
    
    IterativeStream<Long> iteration = initialStream.iterate();
    DataStream<Long> iterationBody = iteration.map (/*do something*/);
    DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Long value) throws Exception {
            return value > 0;
        }
    });
    iteration.closeWith(feedback);
    DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
        @Override
        public boolean filter(Long value) throws Exception {
            return value <= 0;
        }
    });
    

    总结

    • Flink的stream算子学习第一部分,简单完成。后期优化;

    李小李可不能落后啊

    相关文章

      网友评论

          本文标题:Flink-Streaming-算子学习-01

          本文链接:https://www.haomeiwen.com/subject/eetikctx.html