美文网首页Spark 应用Kafka玩转大数据
flume-kafka-spark streaming(pysp

flume-kafka-spark streaming(pysp

作者: 玄月府的小妖在debug | 来源:发表于2016-11-15 17:30 被阅读1709次

    学习了差不多一个星期,终于把flume-kafka-spark streaming贯通了,直接上流程图:


    至于为什么要这样,当然是方便咯
    参考某博客

    一、环境部署

    hadoop集群2.7.1
    zookeerper集群
    kafka集群:kafka_2.11-0.10.0.0
    spark集群:spark-2.0.1-bin-hadoop2.7.tgz
    flume1.7.0
    环境搭建可参考我前面几篇文章。不再赘述
    三台机器:master,slave1,slave2

    二、配置flume

    sink为kafka
    source你随意

    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    # Describe/configure the source
    a1.sources.r1.type = syslogtcp
    a1.sources.r1.port = 5140
    a1.sources.r1.host = 192.168.31.131
    a1.sources.r1.channels = c1
    # Describe the sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = test5
    a1.sinks.k1.brokerList = 192.168.31.131:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    a1.sinks.k1.channel = c1
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    #yi bai tiao jiu submit
    

    三、编程,KafkaWordCount.py

    编写spark steaming 代码,读取kafka流数据,并统计词频

    
    # -*- coding: UTF-8 -*-
     ###spark streaming&&kafka
    from pyspark import SparkContext
    from pyspark.streaming import StreamingContext
    from pyspark.streaming.kafka import KafkaUtils
    
    sc=SparkContext("local[2]","KafkaWordCount")
    #处理时间间隔为1s
    ssc=StreamingContext(sc,2)
    zookeeper="192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181"
    #打开一个TCP socket 地址 和 端口号
    topic={"test5":0,"test5":1,"test5":2}
    groupid="test-consumer-group"
    lines = KafkaUtils.createStream(ssc, zookeeper,groupid,topic)
    lines1=lines.map(lambda x:x[1])
    
    #对1s内收到的字符串进行分割
    words=lines1.flatMap(lambda line:line.split(" "))
    
    #映射为(word,1)元祖
    pairs=words.map(lambda word:(word,1))
    
    wordcounts=pairs.reduceByKey(lambda x,y:x+y)
    
    #输出文件,前缀+自动加日期
    wordcounts.saveAsTextFiles("/tmp/fkafka")
    
    wordcounts.pprint()
    
    #启动spark streaming应用
    ssc.start()
    #等待计算终止
    ssc.awaitTermination()
    

    四、启动环境

    1.启动hadoop集群
    start-all.sh
    
    2.启动spark集群
    start-master.sh
    start-slaves.sh
    
    3.启动zookeeper集群

    在三台机器下均输入以下命令

    zkServer.sh start
    
    4.启动kafka集群

    在三台机器下均输入以下命令

    kafka-server-start.sh -daemon ../config/server.properties
    
    5.jps查看进程

    master:


    slave1与slave2一样:


    确保所有进程启动

    6.创建kafka topic

    kafka-topics.sh --create --zookeeper 192.168.31.131:2181,192.168.31.132:2181,192.168.31.133:2181 --replication-factor 3 --partitions 3 --topic test5

    7.启动flume agent
    flume-ng agent --conf flume/conf/  -f /home/cms/flume/conf/flume-conf.properties -n a1 -Dflume.root.logger=INFO,console
    

    五、测试

    1.运行KafkaWordCount.py

    在master下
    运行

    spark-submit --jars kafka/libs/spark-streaming-kafka-0-8-assembly_2.11-2.0.1.jar KafkaWordCount.py 2> error.txt
    
    2.发送数据
    echo "hello'\t'word" | nc 192.168.31.131 5140
    
    
    3.观察终端输出

    六、hdfs上查看输出

    hadoop fs -ls /tmp/fkafka*

    参考文档
    (flume-kafka- spark streaming(pyspark) - redis 实时日志收集实时计算)[http://blog.csdn.net/zhong_han_jun/article/details/50721736]

    http://spark.apache.org/docs/latest/streaming-kafka-0-8-integration.html

    相关文章

      网友评论

      • 下班儿去看书:sparkstreaming在读取hdfs一个配置文件后,怎么定时更新再读取,我试过map里不能进行sc操作,我想过也可以用pyhdfs模块实现,有没有好的方法?

      本文标题:flume-kafka-spark streaming(pysp

      本文链接:https://www.haomeiwen.com/subject/qaxlpttx.html