美文网首页
Flink中的wiki-edits例子实践

Flink中的wiki-edits例子实践

作者: alvin_wang | 来源:发表于2018-08-14 09:05 被阅读349次
    image

    概览

    wiki-edits教程是一个监控wikipedia编辑的flink监控程序,实时计算编辑者的编辑的byte数。它通过wikipedia connector来获取数据源,最终把数据sink到kafka中。

    建立Maven工程

    我们使用Flink的Maven原型来创建工程。Flink的版本号为1.5.0,脚本命令如下:

    $ mvn archetype:generate \
        -DarchetypeGroupId=org.apache.flink \
        -DarchetypeArtifactId=flink-quickstart-java \
        -DarchetypeVersion=1.5.0 \
        -DgroupId=wiki-edits \
        -DartifactId=wiki-edits \
        -Dversion=0.1 \
        -Dpackage=wikiedits \
        -DinteractiveMode=false
    

    然后我们可以通过tree命令来查看目录结构。

    $ wiki-edits/
    ├── pom.xml
    ├── src
    │   └── main
    │       ├── java
    │       │   └── wikiedits
    │       │       ├── BatchJob.java
    │       │       ├── StreamingJob.java
    │       └── resources
    │           └── log4j.properties
    

    最后我们用IDEA打开工程,并在pom.xml中添加如下依赖,分别为对flink-connector-wikiedits和flink-connector-kafka的依赖。

            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-wikiedits_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.8_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    

    编写Flink程序

    首先我们创建一个WikipediaAnalysis.java文件,并在main方法中添加如下代码。其大致步骤分为如下:

    1. 获取环境信息
    2. 为环境信息添加WikipediaEditsSource源
    3. 根据事件中的用户名为key来区分数据流
    4. 设置窗口时间为5s
    5. 聚合当前窗口中相同用户名的事件,最终返回一个tuple2<user,累加的ByteDiff>
    6. 把tuple2映射为string
    7. sink数据到kafka,topic为wiki-result
    8. 执行操作

    keyBy(...)函数是用来分片数据源的,可以把相同key的放在一个task任务中执行。

    timeWindow(...)函数默认使用tumbling windows。

    这边聚合函数使用了Aggregation函数,替换了原先的fold函数(提示为deprecated)。

    package wikiedits;
    
    import org.apache.flink.api.common.functions.AggregateFunction;
    import org.apache.flink.api.common.functions.MapFunction;
    import org.apache.flink.api.common.serialization.SimpleStringSchema;
    import org.apache.flink.api.java.functions.KeySelector;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.datastream.KeyedStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08;
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditEvent;
    import org.apache.flink.streaming.connectors.wikiedits.WikipediaEditsSource;
    
    
    public class WikipediaAnalysis {
        public static void main(String[] args) throws Exception{
            //1.获取环境信息
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            //2.为环境信息添加WikipediaEditsSource源
            DataStream<WikipediaEditEvent> edits = env.addSource(new WikipediaEditsSource());
    
            //3.根据事件中的用户名为key来区分数据流
            KeyedStream<WikipediaEditEvent, String> keyedEdits = edits
                    .keyBy(new KeySelector<WikipediaEditEvent, String>() {
                        @Override
                        public String getKey(WikipediaEditEvent wikipediaEditEvent) throws Exception {
                            return wikipediaEditEvent.getUser();
                        }
                    });
    
            
            DataStream<Tuple2<String, Integer>> result = keyedEdits
                    //4.设置窗口时间为5s
                    .timeWindow(Time.seconds(5))
                    //5.聚合当前窗口中相同用户名的事件,最终返回一个tuple2<user,累加的ByteDiff>
                    .aggregate(new AggregateFunction<WikipediaEditEvent, Tuple2<String, Integer>, Tuple2<String,Integer>>() {
                        @Override
                        public Tuple2<String, Integer> createAccumulator() {
                            return new Tuple2<>("",0);
                        }
    
                        @Override
                        public Tuple2<String, Integer> add(WikipediaEditEvent value, Tuple2<String, Integer> accumulator) {
                            return new Tuple2<>(value.getUser(), value.getByteDiff()+accumulator.f1);
                        }
    
                        @Override
                        public Tuple2<String, Integer> getResult(Tuple2<String, Integer> accumulator) {
                            return accumulator;
                        }
    
                        @Override
                        public Tuple2<String, Integer> merge(Tuple2<String, Integer> a, Tuple2<String, Integer> b) {
                            return new Tuple2<>(a.f0+b.f0, a.f1+b.f1);
                        }
                    });
    
            //6.把tuple2映射为string
            result.map(new MapFunction<Tuple2<String,Integer>, String>() {
    
                @Override
                public String map(Tuple2<String, Integer> stringLongTuple2) throws Exception {
                    return stringLongTuple2.toString();
                }
                //7.sink数据到kafka,topic为wiki-result
            }).addSink(new FlinkKafkaProducer08<String>("localhost:9092", "wiki-result", new SimpleStringSchema()));
    
            //8.执行操作
            env.execute();
    
        }
    }
    

    最后我们添加一下IDEA的运行配置信息。

    image.png

    安装运行zookeeper&kafka

    Mac可以通过brew来安装zookeeperkafka

    $ brew install zookeeper
    $ brew install kafka
    

    然后运行上述组件。在zookeeper目录下执行以下命令来zookeeper开启服务。

    $ ./bin/zkServer start
    

    在kafka目录下执行以下命令来开启kafka服务。

    $ ./bin/kafka-server-start /usr/local/etc/kafka/server.properties
    

    接着创建一个topic。

    $ ./bin/kafka-console-producer --topic wiki-result  --broker-list localhost:9092
    

    运行程序并消费kafka中的数据

    在IDEA中run刚才的程序,然后在kafka目录中执行开启消费者的命令,可以查看实时消费的数据。

    $ ./bin/kafka-console-consumer --zookeeper localhost:2181 --topic wiki-result
    (Tony1,17)
    (2.177.40.137,9)
    (Waelabdelhamid,279)
    (Falconatic,182)
    (JackintheBox,1934)
    (Zzbrandon123,26)
    (0.86.42.171,56)
    (.37.168.68,-44)
    (Aditya debnath wiki,3)
    

    总结

    本文实践了Flink的wiki-edit例子。其通过从wiki-connector中获取source,并sink数据到kafka中。

    参考

    Monitoring the Wikipedia Edit Stream
    kafka
    zookeeper
    Flink: How to convert the deprecated fold to aggregrate?

    相关文章

      网友评论

          本文标题:Flink中的wiki-edits例子实践

          本文链接:https://www.haomeiwen.com/subject/cxlmbftx.html