美文网首页
SparkStreaming实时流处理

SparkStreaming实时流处理

作者: 朱Sir_小猿 | 来源:发表于2018-08-10 11:46 被阅读0次

    Spark Streaming之前项目中用过一段时间,最近正好闲下来做一下梳理。
    一. 简介:
    Spark Streaming 是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持多种数据源获取数据:


    Spark Streaming处理的数据流图.png

    Spark Streaming接收Kafka、Flume、HDFS等各种来源的实时输入数据,进行处理后,处理结构保存在HDFS、DataBase等各种地方。
    二 流程梳理:

    1. 1 架构:
      python脚本模拟生成日志 + flume + kafka + Spark Streaming

    2.2 实现流程:
    (1)使用Python脚本生成日志
    (2)使用linux crontab定时任务运行脚本
    (3)使用flume采集生成的日志
    (4)采集到日志放入kafka中
    (5)Spark Streaming从kafka中pull数据,进行微批处理
    (6)将处理结果存入hbase中

    注意:要分步测试,逐层排查问题。

    三. 流程详解:
    3.1 使用Python脚本编写日志生成器,模拟日志系统产生日志generate_log.py

    #coding=UTF-8
    
    import random
    import time
    
    url_paths = [
        "class/112.html",
        "class/128.html",
        "class/145.html",
        "class/146.html",
        "class/131.html",
        "class/130.html",
        "learn/821",
        "course/list"
    ]
    
    ip_slices = [132,168,175,10,23,179,187,224,73,29,90,169,48,89,120,67,138,168,220,221,98]
    
    http_referers = [
        "http://www.baidu.com/s?wd={query}",
        "https://www.sogou.com/web?query={query}",
        "http://cn.bing.com/search?q={query}",
        "https://search.yahoo.com/search?p={query}" 
    ]
    
    search_keyword = [
        "Spark实战",
        "Storm实战",
        "Flink实战",
        "Bean实战",
        "Spark Streaming实战",
        "Spark SQL实战"
    ]
    
    status_codes = ["200","404","500"]
    
    def sample_url():
        return random.sample(url_paths,1)[0]
        
    def sample_ip():
        slice = random.sample(ip_slices,4)
        return ".".join([str(item) for item in slice])
        
    def sample_referer():
        if random.uniform(0,1) > 0.2:
            return "-"
            
        refer_str = random.sample(http_referers,1)
        query_str = random.sample(search_keyword,1)
        return refer_str[0].format(query=query_str[0])
        
    def sample_status_code():
        return random.sample(status_codes,1)[0]
        
    def generate_log(count = 10):
        time_str = time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
        
        f = open("/usr/local/hadoop/data/logs/access.log","w+")
        while count >= 1:
            query_log = "{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(local_time=time_str,url= sample_url(),ip=sample_ip(),referer=sample_referer(),status_code=sample_status_code())
            #print(query_log)
            f.write(query_log + "\n")
            count = count - 1
            
    if __name__ == '__main__':
        generate_log(100)
    

    3.2 使用linux crontab定时任务运行脚本:
    参考网站: http://tool.lu/crontab
    每一分钟执行一次的crontab表达式: */1 * * * *

    crontab相关linux命令:
    service crond start ---查看crontab服务是否启动
    crontab -u root -l ---查看root用户当前是否有自动执行计划

    3.2.1. 编写shell脚本,调用执行generate_log.py
    vi log_generator.sh

    ! /bin/sh

    python /usr/local/hadoop/data/generate_log.py

    3.2.2. 增加操作权限
    chmod u+x log_generator.sh

    3.2.3. 设置crontab定时任务
    crontab -e
    */1 * * * * /usr/local/hadoop/data/project/log_generator.sh

    3.2.4. 检查确认日志是否成功

    3.3 使用flume采集日志生成器产生的日志
    就像前面所说的,每一步都要分步进行测试,测试没有问题后,再进行对接使用。

    3.3.1. 初期测试flume选型
    access.log ==> 控制台输出
    source.type --> exec
    channel.type --> memory
    sink.type -->logger

    (1)配置文件 streaming_project.conf

    agent.sources=r1
    agent.channels=c1
    agent.sinks=k1
    
    agent.sources.r1.type=exec
    agent.sources.r1.command=tail -F /usr/local/hadoop/data/logs/access.log
    agent.sources.r1.shell=/bin/bash -c
    
    agent.sources.r1.channels=c1
    
    agent.sinks.k1.type=logger
    
    agent.sinks.k1.channel = c1
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    agent.channels.c1.transactionCapacity = 100
    

    (2)启动flume:

    flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming_project.conf --name agent -Dflume.root.logger=INFO,console    
    

    (3)观察flume日志,检查是否能够采集日志。
    2.3.2. flume对接kafka
    初步调通flume之后,就可以开始使用flume对接kafka了。
    (1)更新flume配置文件:streaming_project2.conf

    agent.sources=r1
    agent.channels=c1
    agent.sinks=k1
    
    agent.sources.r1.type=exec
    agent.sources.r1.command=tail -F /usr/local/hadoop/data/logs/access.log
    agent.sources.r1.shell=/bin/bash -c
    
    agent.sources.r1.channels=c1
    
    agent.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.k1.topic = streamingtopic
    agent.sinks.k1.brokerList = slave2:9092
    agent.sinks.k1.requiredAcks = 1
    agent.sinks.k1.batchSize = 20
    
    agent.sinks.k1.channel = c1
    
    agent.channels.c1.type = memory
    agent.channels.c1.capacity = 1000
    agent.channels.c1.transactionCapacity = 100
    

    (2)启动zk(kafka需要)
    zkServer.sh start $ZK_HOME/conf/zoo.cfg

    (3)启动kafka server
    kafka-server-start.sh $KAFKA_HOME/config/server.properties

    (4)查看topic是否创建成功
    kafka-topics.sh --list --zookeeper slave2:2181

    (5)若未创建,则创建topic
    kafka-topics.sh --create --zookeeper slave2:2181 --replication-factor 1 --partitions 1 --topic streamingtopic

    (6)查看topic是否创建成功或已存在
    kafka-topics.sh --list --zookeeper slave2:2181

    (7)启动flume

    flume-ng agent --conf $FLUME_HOME/conf --conf-file $FLUME_HOME/conf/streaming_project2.conf --name agent -Dflume.root.logger=INFO,console
    

    (8)启动kafka consumer 测试是否接收成功
    kafka-console-consumer.sh --zookeeper slave2:2181 --topic streamingtopic --from-beginning

    如果能成功消费消息,则说明flume对接kafka成功,那下面就可以开始使用spark Streaming对接kafka进行实时处理了。

    3.4 kafka对接spark Streaming
    Spark Streaming对接kafka使用生产中最常用的方式:Direct Approach(直接方法)(No Receivers)

    初步接收消息创建input DStream
    代码如下:

    package com.crn.spark.project
    
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    object ProjectStreaming {
      def main(args: Array[String]): Unit = {
        if(args.length != 2){
          System.err.println("Usage ProjectStreaming: <brokers> <topics>")
          System.exit(1)
        }
        val sparkConf = new SparkConf().setAppName("ProjectStreaming").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf,Seconds(60))
    
        val Array(brokers,topics) = args
        val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
        val topicSet = topics.split(",").toSet
        val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
        message.map(_._2).count().print()
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    

    代码中需要的参数,在启动时以参数的形式进入传入。

    测试可以成功接收之后,就要正式进入项目核心部分的开发工作了。

    3.5 项目开发:
    3.5.1. 需求分析:
    实战开发首先要关注就是需求,只有明确了需求,才能真正开始开发。
    功能需求1: 统计今天到现在为止实战课程的访问量
    这个可以做到每天实时的查看当天网站的访问量.
    功能需求2: 今天到现在为止从搜索引擎引流过来的实现课程的访问量
    这个可以为不同渠道做广告投资,提供一定的决策分析资料.

    3.5.2. 技术实现分析:
    【流程】: 数据清洗 > 统计分析 > 统计结果入库

    1)数据清洗:
    就是按照需求对实时产生的点击日志数据进行筛选过滤,只保留我们需要的内容字段。

    下面观察日志信息,分析如何取数据


    日志信息.png

    保留IP,访问时间,课程编号,状态码,搜索引擎地址。

    2)统计分析:
    为了更好的进行统计分析,我们需要定义一个实体类来封装我们需要保存的日志信息:


    定义实体类.png

    下面看一下需求功能1如何实现。功能1: 今天到现在为止,实战课程的访问量

    (1) rowkey设计

    因为是统计一天的实战课程访问量:故rowkey可以设计为 day_coureseid,比如20180808_118.

    注意:rowkey有设计原则,但设计方法并不是一成不变的。RowKey的设计要强烈依靠业务,要考虑到如何设计,更要考虑到如何更好的查询。

    因为是要统计每天的课程访问量,使用日期+课程id的方式,后期就可以直接以日期为前缀条件进行筛选查询。

    依次类推,可以用来统计***电商每天不同电商品类的销售额情况,而在电商交易信息中,有包含商品品类的信息。同样设计rowkey时,就可以使用日期+商品品类的方式进行设计。
    而查询时,就可以依靠日期前缀进行查询。

    按照rowkey的设计规则,现在日志中的时期格式,还不符合我们的规范,所以需要我们进行一下转换:编写日期时间工具类DateUtils,将日志中的时间转换为自己想要的格式.

    数据清洗结果类似如下: ClickLog(102.10.55.22,2017110812121201,128,-)

    功能1的rowkey设计为:yyyyMMdd_courseid,示例:20180808_118.

    rowkey设计好后,我们就可以使用数据库来进行存储我们的统计结果了。spark streaming把统计结果写入到数据库中,可视化前端根据:yyyyMMdd把数据库里面的统计结果展示出来。

    (2)储存介质选择

    选择什么数据库作为统计结果的储存呢?

    a. 关系型数据库RDBMS: mysql,oracle

    day course_id click_count
    20171111 1 10

    存储在关系型数据库,当一个批次的数据过来之后,需要先将之前的数据查出来进行加和,然后再执行更新操作。

    b. 非常关系型数据库Nosql: Hbase,redis

    使用hbase只需要调用一个API即可搞定更新操作.

    通过对比,明显就可以看出来使用hbase存储更方便。那咱就确定选用hbase作为存储介质。

    (3)实现流程

    接下来我们需要做的就是:

    1) 进行Hbase表设计
    create 'course_clickcount', 'info'

    2)编写操作hbase的数据访问层DAO代码
    操作Hbase的工具类(HBaseUtils.java):

    package com.crn.spark.project.utils;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    
    import java.io.IOException;
    
    /**
     * HBase操作工具类:Java工具类建议采用单例模式封装
     */
    
    public class HBaseUtils {
        HBaseAdmin admin = null;
        Configuration configuration = null;
    
        /**
         * 私有构造方法
         */
        private HBaseUtils(){
            configuration = new Configuration();
            configuration.set("hbase.zookeeper.quorum","master,slave2");
            configuration.set("hbase.rootdir","hdfs://master:9000/hbase");
    
            //ctrl+alt+T
            try {
                admin = new HBaseAdmin(configuration);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static HBaseUtils instance = null;
    
        public static synchronized HBaseUtils getInstance(){
            if(null == instance){
                instance = new HBaseUtils();
            }
            return instance;
        }
    
        /**
         * 根据表名获取到HTable实例
         * @param tableName
         * @return
         */
        public HTable getTable(String tableName){
            HTable table = null;
    
            try {
                table = new HTable(configuration,tableName);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return table;
        }
    
        /**
         * 添加一条记录到HBase表
         * @param tableName 表名
         * @param rowkey rowkey
         * @param cf columnFamily
         * @param column 列
         * @param value 写入的值
         */
        public void put(String tableName,String rowkey,String cf,String column,String value){
            HTable table = getTable(tableName);
    
            Put put = new Put(rowkey.getBytes());
            put.add(cf.getBytes(),column.getBytes(),value.getBytes());
    
            try {
                table.put(put);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public static void main(String[] args) {
            String tableName = "course_clickcount";
            String rowkey = "20171111_188";
            String cf = "info";
            String column = "click_count";
            String value ="2";
            HBaseUtils.getInstance().put(tableName,rowkey,cf,column,value);
    
    
        }
    }
    

    封装实战课程点击量的实体类(ClickCourseCount.scala):

    package com.crn.spark.project.domain
    
    case class ClickCourseCount(dayCourse:String,clickCount:Long)
    
    

    保存操作hbase的DAO(ClickCourseCountDao.scala):

    package com.crn.spark.project.dao
    
    import com.crn.spark.project.domain.ClickCourseCount
    import com.crn.spark.project.utils.HBaseUtils
    import org.apache.hadoop.hbase.client.{Get, HTable}
    import org.apache.hadoop.hbase.util.Bytes
    
    import scala.collection.mutable.ListBuffer
    
    object ClickCourseCountDao {
      val tableName = "course_clickcount"
      val cf = "info"
      val column = "clickcount"
    
      def save(list:ListBuffer[ClickCourseCount]): Unit ={
        val htable = HBaseUtils.getInstance().getTable(tableName)
        for(clk <- list){
          htable.incrementColumnValue(clk.dayCourse.getBytes,
            cf.getBytes,
            column.getBytes,
            clk.clickCount)
        }
      }
    
      def count(dayCourse:String):Long={
        val htable = HBaseUtils.getInstance().getTable(tableName)
        val get = new Get(dayCourse.getBytes)
        val value = htable.get(get).getValue(cf.getBytes,column.getBytes())
        if(null == value){
          0L
        }else{
          Bytes.toLong(value)
        }
      }
    
      def main(args: Array[String]): Unit = {
        val listBuffer = new ListBuffer[ClickCourseCount]
    
        listBuffer.append(ClickCourseCount("20171024_88",1L))
        listBuffer.append(ClickCourseCount("20171024_88",2L))
        listBuffer.append(ClickCourseCount("20171024_88",3L))
        save(listBuffer)
        println(count("20171024_88")+"---"+count("20171024_88"))
      }
    }
    
    
    1. 完善核心代码(ProjectStreaming.scala):
    package com.crn.spark.project.spark
    
    import com.crn.spark.project.dao.{ClickCourseCountDao, ClickCourseSearchCountDao}
    import com.crn.spark.project.domain.{ClickCourceSearchCount, ClickCourseCount, ClickLog}
    import com.crn.spark.project.utils.DateUtils
    import com.crn.spark.project.utils.DateUtils.{formatDateToYYYYMMDDStr, getDateFromTimeStr}
    import kafka.serializer.StringDecoder
    import org.apache.spark.SparkConf
    import org.apache.spark.status.api.v1.RDDPartitionInfo
    import org.apache.spark.streaming.kafka.KafkaUtils
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    
    import scala.collection.mutable.ListBuffer
    
    object ProjectStreaming {
      def main(args: Array[String]): Unit = {
        if(args.length != 2){
          System.err.println("Usage ProjectStreaming: <brokers> <topics>")
          System.exit(1)
        }
        val sparkConf = new SparkConf().setAppName("ProjectStreaming").setMaster("local[2]")
        val ssc = new StreamingContext(sparkConf,Seconds(60))
    
        val Array(brokers,topics) = args
        val kafkaParams = Map[String,String]("metadata.broker.list" -> brokers)
        val topicSet = topics.split(",").toSet
        val message = KafkaUtils.createDirectStream[String,String,StringDecoder,StringDecoder](ssc,kafkaParams,topicSet)
        //message.map(_._2).count().print()
    
        //132.168.89.224    2018-07-13 05:53:02 "GET /class/145.html HTTP/1.1"  200 https://search.yahoo.com/search?p=Flink实战
    
        val cleanData = message.map(_._2).map{x =>
          val strArr = x.split("\t")
          strArr(1)
          val ip = strArr(0)
          val time = DateUtils.formatDateToYYYYMMDDStr(DateUtils.getDateFromTimeStr(strArr(1)))
          val refer = strArr(2).split(" ")(1)
          val status = strArr(3).toInt
          val searchArr = strArr(4).replaceAll("//","/").split("/")
          var searchUrl = ""
          if(searchArr.length > 2){
            searchUrl =searchArr(1)
          }else{
            searchUrl = searchArr(0)
          }
          (ip,time,refer,status,searchUrl)
        }.filter(_._3.startsWith("/class")).map{x =>
          //145.html
          val referStr= x._3.split("/")(2)
          val refer = referStr.substring(0,referStr.lastIndexOf("."))
          ClickLog(x._1,x._2,refer,x._4,x._5)
        }
    
        //功能1: 统计今天到现在为止,实战课程的访问量
        cleanData.map(x=>(x.time+"_"+x.course,1)).reduceByKey(_+_).foreachRDD{RDD =>
          RDD.foreachPartition{rddPartition =>
            val list = new ListBuffer[ClickCourseCount]
            rddPartition.foreach{pair =>
              list.append(ClickCourseCount(pair._1,pair._2))
            }
            ClickCourseCountDao.save(list)
          }
        }
    
        //功能2: 统计今天到现在为止从搜索引擎引流过来的,实战课程的访问量
    
        cleanData.filter{x =>x.search != "-"}.map(x=>(x.time+"_"+x.search+"_"+x.course,1)).reduceByKey(_+_).foreachRDD { rdd =>
          rdd.foreachPartition { partition =>
            val list = new ListBuffer[ClickCourceSearchCount]
            partition.foreach { pair =>
              list.append(ClickCourceSearchCount(pair._1, pair._2))
            }
            ClickCourseSearchCountDao.save(list)
          }
        }
    
    
        ssc.start()
        ssc.awaitTermination()
    
      }
    }
    
    

    需求功能2的功能流程,同样流程实现。这里交代一下rowkey的设计规则:yyyyMM_searchUrl_courseId.

    相关文章

      网友评论

          本文标题:SparkStreaming实时流处理

          本文链接:https://www.haomeiwen.com/subject/pehkbftx.html