美文网首页
Apache Flink 学习笔记(一)

Apache Flink 学习笔记(一)

作者: 憨人Zoe | 来源:发表于2018-09-19 17:30 被阅读0次

最近在项目中需要用到Flink,关于Flink的基本介绍就不啰嗦了,官方文档传送门

由于是第一次接触,我花了一些时间整理了几个小demo(java)当作笔记。对Flink很多地方的理解有些片面甚至错误的,路过的朋友权当参考,不能保证说得都对。

之前接触过Spark的都知道,数据处理是在RDD中进行的(无论是批处理还是流处理)。Flink则不同,批处理用DataSet,流处理用DataStream,而且批处理和流处理的api也是不一样的。

先来看一下第一个demo 经典的 word count

我笔记中的例子都是基于 JDK1.8 ,Flink 1.6 编写的

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.aggregation.Aggregations;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 从本地文件读取字符串,按空格分割单词,统计每个分词出现的次数并输出
 */
public class Demo1 {
    public static void main(String[] args) {
        //获取执行环境 ExecutionEnvironment (批处理用这个对象)
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        
        //加载数据源到 DataSet
        DataSet<String> text = env.readTextFile("test.txt");
        DataSet<Tuple2<String, Integer>> counts =
                text.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
                        //s 即从文本中读取到一行字符串,按空格分割后得到数组tokens
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                //初始化每一个单词,保存为元祖对象
                                collector.collect(new Tuple2<String, Integer>(token, 1));
                            }
                        }
                    }
                })
                        .groupBy(0) //0表示Tuple2<String, Integer> 中的第一个元素,即分割后的单词
                        .aggregate(Aggregations.SUM, 1); //同理,1表示Tuple2<String, Integer> 中的第二个元素,即出现次数

        try {
            //从DataSet 中获得集合,并遍历
            List<Tuple2<String,Integer>> list = counts.collect();
            for (Tuple2<String,Integer> tuple2:list){
                System.out.println(tuple2.f0 + ":" + tuple2.f1);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

其中,groupBy(0) 表示按照DataSet中保存的元祖的第一个字段分组,aggregate 是聚合函数,Aggregations.SUM 指定了求和,1 表示对元祖的第二个字段进行求和计算。

//test.txt 
hello world
flink demo
this is a flink demo file
//控制台输出
demo:2
is:1
this:1
a:1
file:1
world:1
hello:1
flink:2

可以看到,Flink程序已经成功工作了。但是有一个问题,DataSet中的对象使用元祖Tuple来保存的,如果字段比较多,肯定不如pojo 更加方便,所以第二个demo 我用pojo来改造一下。

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.util.Collector;

import java.util.List;

/**
 * 用pojo 改造 demo1
 */
public class Demo2 {
    public static void main(String[] args) {
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        DataSet<String> text = env.readTextFile("test.txt");
        //用 WordWithCount 保存单词和次数信息
        DataSet<WordWithCount> counts =
                text.flatMap(new FlatMapFunction<String, WordWithCount>() {
                    @Override
                    public void flatMap(String s, Collector<WordWithCount> collector) throws Exception {
                        String[] tokens = s.toLowerCase().split("\\s+");
                        for (String token : tokens) {
                            if (token.length() > 0) {
                                collector.collect(new WordWithCount(token, 1));
                            }
                        }
                    }
                })
                        .groupBy("word")//直接指定字段名称
                        .reduce(new ReduceFunction<WordWithCount>() {
                            @Override
                            public WordWithCount reduce(WordWithCount wc, WordWithCount t1) throws Exception {
                                  return new WordWithCount(wc.word, wc.count + t1.count);
                            }
                        });
        try {
            List<WordWithCount> list = counts.collect();
            for (WordWithCount wc: list) {
                System.out.println(wc.toString());
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    // pojo
    public static class WordWithCount {
        public String word;
        public long count;

        public WordWithCount() {
        }

        public WordWithCount(String word, long count) {
            this.word = word;
            this.count = count;
        }

        @Override
        public String toString() {
            return word + " : " + count;
        }
    }
}

运行结果和demo1完全一致。但是你可能会注意到,demo1中的aggregate聚合函数被替换成了reduce,这是因为aggregate函数只接受int来表示filed。同时,.groupBy(0) 也相应改成用.groupBy("word")直接指定字段。

请注意,如果你的pojo demo 运行失败,你可能需要做以下检查工作:
1、pojo 有没有声明为public,如果是内部类必须是static
2、有没有为pojo创建一个无参的构造函数
3、有没有声明pojo的字段为public,或者生成publicgetset方法
4、必须使用Flink 支持的数据类型

如果你有提供publicget,set 方法,比如:

public String getWord() {
    return word;
}

public void setWord(String word) {
    this.word = word;
}

那么,.groupBy("word") 还可以用.groupBy(WordWithCount::getWord)替换

相关文章

  • Apache Flink 学习笔记(一)

    最近在项目中需要用到Flink,关于Flink的基本介绍就不啰嗦了,官方文档传送门 。 由于是第一次接触,我花了一...

  • 2019-02-26

    《从 1 到 100 深入学习 Flink》--- Apache Flink 介绍 目录: 1,flink 流介绍...

  • Flink

    本文主要参考自: Apache Flink 漫谈Apache Flink 漫谈系列 - 序Apache Flink...

  • Apache Flink笔记

    Apache Flink笔记原文链接 :http://timeyang.com/articles/28/2018/...

  • Introduction to Apache Flink - C

    初识Apache Flink Apache Flink项目首页的标语写着“Apache Flink是一个可在流数据...

  • BI系统套装

    flink 文档https://flink.apache.org/[https://flink.apache.or...

  • Flink —— 基本组件与 WordCount

    小白的新手学习笔记,请大佬轻喷本文归档于GitHub,欢迎大家批评指正 Apache Flink is a fra...

  • Apache Flink 学习笔记(三)

    本篇将演示如何用Table API 实现上一篇demo3的功能。上一篇传送门 Apache Flink 学习笔记(...

  • Apache Flink 学习笔记(四)

    本篇将演示如何使用 Flink SQL 实现上一篇demo5的功能,上一篇传送门 Apache Flink 学习笔...

  • Apache Flink 学习笔记(二)

    上一篇 Apache Flink 学习笔记(一) 简单示范了批处理的使用,本篇展示流式处理的使用方法。 流处理也叫...

网友评论

      本文标题:Apache Flink 学习笔记(一)

      本文链接:https://www.haomeiwen.com/subject/fbpinftx.html