美文网首页
flink WordCount初体验

flink WordCount初体验

作者: Alien_Swordsman | 来源:发表于2018-12-18 10:03 被阅读0次

    这篇文章以flink官方local cluster教程为主线,引导大家体验一下flink的初次开发。文章中所提到的代码我已经放到github上,欢迎指正。

    下载和启动Flink

    Flink可以运行在Linux、Mac和Windows上,唯一的要求就是必须安装Java 8或以上版本。可以使用一下命令查看:

    (py2.7) bogon:flink-examples yss$ java -version
    java version "1.8.0_181"
    Java(TM) SE Runtime Environment (build 1.8.0_181-b13)
    Java HotSpot(TM) 64-Bit Server VM (build 25.181-b13, mixed mode)
    

    java安装后去官网下载Flink,然后解压即可运行。这里以flink-1.7.0为例。

    cd ~/Downloads/
    tar xzf flink-1.7.0-bin-scala_2.12.tgz
    cd flink-1.7.0
    

    以本地模式启动Flink

    在flink目录下运行以下命令即可启动

     ./bin/start-cluster.sh  # Start Flink
    

    这时候可以打开浏览器访问http://localhost:8081
    ]flink的监控页面。

    flink.png

    我们通过jps可以看到多出来两个JVM进程,运行的主类StandaloneSessionClusterEntrypoint和TaskManagerRunner。

    [root@henghe-121 bin]# jps -l
    16016 org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint
    16513 org.apache.flink.runtime.taskexecutor.TaskManagerRunner
    16619 sun.tools.jps.Jps
    

    集群启动后就可以开发我们的flink程序了~

    写第一个Flink WorkCount程序

    官网给出的Scala例子如下:

    object SocketWindowWordCount {
    
        def main(args: Array[String]) : Unit = {
    
            // the port to connect to
            val port: Int = try {
                ParameterTool.fromArgs(args).getInt("port")
            } catch {
                case e: Exception => {
                    System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")
                    return
                }
            }
    
            // get the execution environment
            val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    
            // get input data by connecting to the socket
            val text = env.socketTextStream("localhost", port, '\n')
    
            // parse the data, group it, window it, and aggregate the counts
            val windowCounts = text
                .flatMap { w => w.split("\\s") }
                .map { w => WordWithCount(w, 1) }
                .keyBy("word")
                .timeWindow(Time.seconds(5), Time.seconds(1))
                .sum("count")
    
            // print the results with a single thread, rather than in parallel
            windowCounts.print().setParallelism(1)
    
            env.execute("Socket Window WordCount")
        }
    
        // Data type for words with count
        case class WordWithCount(word: String, count: Long)
    }
    

    代码本身不难,不过建立scala的Maven工程,打成可执行jar还需要很多设置。在这里我将官方的例子配置做了很多简化,对于初学者更友好,并将代码上传到了github上。大家clone之后,直接运行mvn clean package就可以生成jar包。

    运行WordCount

    对于这个例子首先要用netcat创建服务,监听某个端口。

    nc -l 9000
    

    之后就可以提交job任务了

    $ ./bin/flink run examples/streaming/SocketWindowWordCount.jar --port 9000
    Starting execution of program
    

    然后再nc端输入要处理的字符串,flink就可以拿到数据进行处理

    $ nc -l 9000
    lorem ipsum
    ipsum ipsum ipsum
    bye
    

    该任务的输出在flink的log下以.out结尾的文件下。

    $ tail -f flink-yss-taskexecutor-0-bogon.out
    WordWithCount(hello,1)
    WordWithCount(sdsd,1)
    WordWithCount(sdsd,1)
    WordWithCount(sss,1)
    

    最后测试完可以关闭集群:

    ./bin/stop-cluster.sh
    

    相关文章

      网友评论

          本文标题:flink WordCount初体验

          本文链接:https://www.haomeiwen.com/subject/ylmckqtx.html