美文网首页大数据程序员
Flume + Kafka + Spark Streaming整

Flume + Kafka + Spark Streaming整

作者: sparkle123 | 来源:发表于2018-05-09 16:46 被阅读0次

    参考:
    http://archive.cloudera.com/cdh5/cdh/5/flume-ng-1.6.0-cdh5.5.0/FlumeUserGuide.html

    • Logger-->Flume

    1/配置Flume配置文件streaming.conf

    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=log-sink
    
    #define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414
    
    #define channel
    agent1.channels.logger-channel.type=memory
    
    #define sink
    agent1.sinks.log-sink.type=logger
    
    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.log-sink.channel=logger-channel
    

    2/Java程序的日志配置文件

    log4j.rootLogger=INFO,stdout,flume
    
    log4j.appender.stdout = org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.target = System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n
    
    log4j.appender.flume = org.apache.flume.clients.log4jappender.Log4jAppender
    log4j.appender.flume.Hostname = hadoop
    log4j.appender.flume.Port = 41414
    log4j.appender.flume.UnsafeMode = true
    

    3/启动flume-ng

    flume-ng agent \
    --conf $FLUME_HOME/conf \
    --conf-file $FLUME_HOME/conf/streaming.conf \
    --name agent1 \
    -Dflume.root.logger=INFO,console
    

    4/在flume-ng窗口可以即时看到日志的产生

    • Logger-->Flume-->Kafka

    1/启动kafka,并创建topic
    ./kafka-topics.sh --create --zookeeper hadoop:2181 --replication-factor 1 --partitions 1 --topic flume-kafka-streaming-topic

    2/配置Flume配置文件streaming2.conf

    agent1.sources=avro-source
    agent1.channels=logger-channel
    agent1.sinks=kafka-sink
    
    #define source
    agent1.sources.avro-source.type=avro
    agent1.sources.avro-source.bind=0.0.0.0
    agent1.sources.avro-source.port=41414
    
    #define channel
    agent1.channels.logger-channel.type=memory
    
    #define sink
    agent1.sinks.kafka-sink.type=org.apache.flume.sink.kafka.KafkaSink
    agent1.sinks.kafka-sink.topic = flume-kafka-streaming-topic
    agent1.sinks.kafka-sink.brokerList = hadoop:9092
    agent1.sinks.kafka-sink.requiredAcks = 1
    agent1.sinks.kafka-sink.batchSize = 20
    
    agent1.sources.avro-source.channels=logger-channel
    agent1.sinks.kafka-sink.channel=logger-channel
    

    3/启动日志生产程序,产生的日志即时的在kafka-console-consumer窗口产生
    kafka-console-consumer.sh --zookeeper hadoop:2181 --topic flume-kafka-streaming-topic

    • Logger-->Flume-->Kafka-->Spark Streaming

    1/Java代码:

    object FlumeKafkaReceiverWordCount {
      def main(args: Array[String]): Unit = {
        if(args.length < 4) {
          //Edit Configuration : hadoop:2181 test flume-kafka-streaming-topic 1
          System.err.println("Usage: FlumeKafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
          System.exit(1)
        }
    
        val Array(zkQuorum, group, topics, numThreads) = args
        
        val sparkConf = new SparkConf().setAppName("FlumeKafkaReceiverWordCount").setMaster("local[2]")
        //val sparkConf = new SparkConf()
    
        val ssc = new StreamingContext(sparkConf, Seconds(5))
    
        val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap
    
        val messages = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap)
    
        messages.map(_._2).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _).print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    2/启动上面的程序,即可在Console窗口实时看到单词基数
    3/注意:
    在本地进行测试,
    在IDEA中运行LoggerGenerator
    然后使用Flume、Kafka以及Spark Streaming进行处理操作。

    在生产环境上,
    1.打包jar,执行LoggerGenerator类
    2.Flume、Kafka和本地测试步骤是一样的
    3.Spark Streaming的代码也是需要打成jar包,然后使用spark-submit的方式进行提交到环境上执行
    4.可以根据实际情况选择运行模式:local/yarn/standalone/mesos
    5.在生产上,整个流处理的流程都一样的,区别在于业务逻辑的复杂性

    相关文章

      网友评论

        本文标题:Flume + Kafka + Spark Streaming整

        本文链接:https://www.haomeiwen.com/subject/lranrftx.html