美文网首页
Apache Flink 学习笔记(一)

Apache Flink 学习笔记(一)

作者: 憨人Zoe | 来源:发表于2018-09-19 17:30 被阅读0次

    最近在项目中需要用到Flink,关于Flink的基本介绍就不啰嗦了,官方文档传送门

    由于是第一次接触,我花了一些时间整理了几个小demo(java)当作笔记。对Flink很多地方的理解有些片面甚至错误的,路过的朋友权当参考,不能保证说得都对。

    之前接触过Spark的都知道,数据处理是在RDD中进行的(无论是批处理还是流处理)。Flink则不同,批处理用DataSet,流处理用DataStream,而且批处理和流处理的api也是不一样的。

    先来看一下第一个demo 经典的 word count

    我笔记中的例子都是基于 JDK1.8 ,Flink 1.6 编写的

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.api.java.aggregation.Aggregations;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.util.Collector;
    
    import java.util.List;
    
    /**
     * 从本地文件读取字符串,按空格分割单词,统计每个分词出现的次数并输出
     */
    public class Demo1 {
        public static void main(String[] args) {
            //获取执行环境 ExecutionEnvironment (批处理用这个对象)
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            
            //加载数据源到 DataSet
            DataSet<String> text = env.readTextFile("test.txt");
            DataSet<Tuple2<String, Integer>> counts =
                    text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                        @Override
                        public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                            //s 即从文本中读取到一行字符串,按空格分割后得到数组tokens
                            String[] tokens = s.toLowerCase().split("\\s+");
                            for (String token : tokens) {
                                if (token.length() > 0) {
                                    //初始化每一个单词,保存为元祖对象
                                    collector.collect(new Tuple2<String, Integer>(token, 1));
                                }
                            }
                        }
                    })
                            .groupBy(0) //0表示Tuple2<String, Integer> 中的第一个元素,即分割后的单词
                            .aggregate(Aggregations.SUM, 1); //同理,1表示Tuple2<String, Integer> 中的第二个元素,即出现次数
    
            try {
                //从DataSet 中获得集合,并遍历
                List<Tuple2<String,Integer>> list = counts.collect();
                for (Tuple2<String,Integer> tuple2:list){
                    System.out.println(tuple2.f0 + ":" + tuple2.f1);
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    

    其中,groupBy(0) 表示按照DataSet中保存的元祖的第一个字段分组,aggregate 是聚合函数,Aggregations.SUM 指定了求和,1 表示对元祖的第二个字段进行求和计算。

    //test.txt 
    hello world
    flink demo
    this is a flink demo file
    
    //控制台输出
    demo:2
    is:1
    this:1
    a:1
    file:1
    world:1
    hello:1
    flink:2
    

    可以看到,Flink程序已经成功工作了。但是有一个问题,DataSet中的对象使用元祖Tuple来保存的,如果字段比较多,肯定不如pojo 更加方便,所以第二个demo 我用pojo来改造一下。

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.functions.ReduceFunction;
    import org.apache.flink.api.java.DataSet;
    import org.apache.flink.api.java.ExecutionEnvironment;
    import org.apache.flink.util.Collector;
    
    import java.util.List;
    
    /**
     * 用pojo 改造 demo1
     */
    public class Demo2 {
        public static void main(String[] args) {
            final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            DataSet<String> text = env.readTextFile("test.txt");
            //用 WordWithCount 保存单词和次数信息
            DataSet<WordWithCount> counts =
                    text.flatMap(new FlatMapFunction<String, WordWithCount>() {
                        @Override
                        public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
                            String[] tokens = s.toLowerCase().split("\\s+");
                            for (String token : tokens) {
                                if (token.length() > 0) {
                                    collector.collect(new WordWithCount(token, 1));
                                }
                            }
                        }
                    })
                            .groupBy("word")//直接指定字段名称
                            .reduce(new ReduceFunction<WordWithCount>() {
                                @Override
                                public WordWithCount reduce(WordWithCount wc, WordWithCount t1) throws Exception {
                                      return new WordWithCount(wc.word, wc.count + t1.count);
                                }
                            });
            try {
                List<WordWithCount> list = counts.collect();
                for (WordWithCount wc: list) {
                    System.out.println(wc.toString());
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        // pojo
        public static class WordWithCount {
            public String word;
            public long count;
    
            public WordWithCount() {
            }
    
            public WordWithCount(String word, long count) {
                this.word = word;
                this.count = count;
            }
    
            @Override
            public String toString() {
                return word + " : " + count;
            }
        }
    }
    

    运行结果和demo1完全一致。但是你可能会注意到,demo1中的aggregate聚合函数被替换成了reduce,这是因为aggregate函数只接受int来表示filed。同时,.groupBy(0) 也相应改成用.groupBy("word")直接指定字段。

    请注意,如果你的pojo demo 运行失败,你可能需要做以下检查工作:
    1、pojo 有没有声明为public,如果是内部类必须是static
    2、有没有为pojo创建一个无参的构造函数
    3、有没有声明pojo的字段为public,或者生成publicgetset方法
    4、必须使用Flink 支持的数据类型

    如果你有提供publicget,set 方法,比如:

    public String getWord() {
        return word;
    }
    
    public void setWord(String word) {
        this.word = word;
    }
    

    那么,.groupBy("word") 还可以用.groupBy(WordWithCount::getWord)替换

    相关文章

      网友评论

          本文标题:Apache Flink 学习笔记(一)

          本文链接:https://www.haomeiwen.com/subject/fbpinftx.html