Spark Streaming实战

作者: 董二弯 | 来源:发表于2019-05-30 22:05 被阅读3次

    在之前Spark Streaming&Flume&Kafka打造通用流处理平台中已经一起学习了环境的搭建,接下来在此基础上做Spark Streaming处理日志实战。

    模拟日志生成

    通过python代码模拟实时生成日志,代码如下
    generate_log.py:

    #coding=UTF-8
    import random
    import time
    
    url_paths=[
        "class/112.html",
        "class/128.html",
        "class/145.html",
        "class/130.html",
        "class/146.html",
        "class/131.html",
        "learn/821",
        "course/list"
    ]
    
    ip_slices=[132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]
    
    http_referers=[
        "https://www.baidu.com/s?wd={query}",
        "https://www.sogou.com/web?query={query}",
        "https://cn.bing.com/search?q={query}",
        "https://www.so.com/s?q={query}"
    ]
    
    search_keyword=[
        "spark sql实战",
        "hadoop 基础",
        "storm实战",
        "spark streaming实战"
    ]
    
    status_code=["200","404","500"]
    
    def sample_status_code():
        return random.sample(status_code,1)[0]
    
    def sample_referer():
        if random.uniform(0,1)>0.2:
            return "-"
        refer_str=random.sample(http_referers,1)
        query_str=random.sample(search_keyword,1)
        return refer_str[0].format(query=query_str[0])
    
    def sample_url():
        return random.sample(url_paths,1)[0]
    
    def sample_ip():
        slice=random.sample(ip_slices,4)
        return ".".join([str(item) for item in slice])
    
    def generate_log(count=10):
        time_str=time.strftime("%Y-%m-%d %H:%M:%S",time.localtime())
    
        f=open("/root/data/streaming_access.log","w+")
    
        while count >=1:
            query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{refer}".format(url=sample_url(),ip=sample_ip(),refer=sample_referer(),status_code=sample_status_code(),local_time=time_str)
            print(query_log)
            f.write(query_log+"\n")
            count=count-1
    
    if __name__ == '__main__':
        # 每一分钟生成一次日志信息
        while True:
            generate_log()
            time.sleep(60)
    

    这段代码的含义是每一分钟随机生成10条(可修改)日志并写入到本地文件中,这里的文件路径是我虚拟机中的路径,可随意变动,要和后面flume source的路径相同。生成的日志格式如下。

    IP地址 时间 请求方式、url、引擎版本 状态码 搜索引擎提供商
    187.98.156.46 2019-05-29 GET /class/112.html HTTP/1.1 500 https://www.sogou.com/web?query=xx

    在服务器上通过以下命令可运行python程序

    //前提是虚拟机已经安装了python的环境,要是没有安装可百度自行安装
    python generate_log.py
    

    环境测试

    使用Spark Streaming&Flume&Kafka打造通用流处理平台中搭建的环境,唯一的区别是改造flume agent,在日志文件中读取生成的日志,在输出到Kafka,Spark Streaming在Kafka中消费并处理日志。

    agent 修改

    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = exec
    a1.sources.r1.command= tail -F /root/data/streaming_access.log
    a1.sources.r1.shell = /bin/sh -c
    
    # Describe the sink
    #设置Kafka接收器
    a1.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
    #设置Kafka的broker地址和端口号
    a1.sinks.k1.brokerList=192.168.30.131:9092
    #设置Kafka的Topic
    a1.sinks.k1.topic=kafka_spark
    #设置序列化方式
    a1.sinks.k1.serializer.class=kafka.serializer.StringEncoder
    a1.sinks.k1.batchSize = 3
    a1.sinks.k1.requiredAcks = 1
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    sources的路径要和日志生成程序文件写入路径一致。

    测试

    启动Kafka环境、flume agent、日志生成程序、本地Spark Streaming程序。
    观察本地客户端日志,输出了产生的日志,说明环境可用。若没有输出,建议先阅读以前的课程。


    image.png

    日志处理

    需求:

    统计今天到目前为止从搜索引擎过来的课程的访问量

    产品推销是很重要的一环,于是很多公司都会给各种各样的平台流水来为自己的产品做推销。但是怎么判断那个平台的推销效果比较适合自己呢?根据大数据实时处理推销平台过来的数据,根据推销平台和产品ID为key做一个访问量的Top排序。这样可增加效果好的平台的投资,结束和效果不好平台的合作。这样不仅大大增加了推销的效果还节约了成本。在这个例子中搜索引擎类似推销平台,课程类似产品。

    处理结果存储选型

    处理结果可存储在关系型数据库中,如myql oracle等
    也可以存储在NoSql中,如hbase,redis等。
    由于需求中有访问量累计的操作
    若使用mysql,需要每次存储时,先根据主键把数据查询出来,做了相加之后在做更新,比较麻烦。
    使用hbase,其中客户端在保存时有incrementColumnValue这个方法,自动与数据库中的记录相加,所以这里使用hbase。

    安装配置 HBase

    下载、解压、配置环境变量
    
    conf/hbase-env.sh
    修改JAVA_HOME
    export HBASE_MANAGES_ZK=false
    
    conf/hbase-site.xml:
    <configuration>
        <property>
            <name>hbase.rootdir</name>
             <value>hdfs://192.168.30.130:8092/hbase</value>
        </property>
        <property>
            <name>hbase.cluster.distributed</name>
            <value>true</value>
        </property>
        <property>
            <name>hbase.zookeeper.quorum</name>
            <value>192.168.30.131:2181</value>
        </property> 
    </configuration>
    
    conf/regionservers:
    localhost
    

    HBase 建表

    // 1 启动hbase,在此之前启动zookeeper,hadoop环境
    start-hbase.sh
    // 2 启动shell
    hbase shell
    // 3 建表
    create 'course_search_clickcount','info'
    // 4 查看数据表
    list
    // 5 查看数据表信息
    describe 'course_search_clickcount'
    // 6 查看表数据
    scan 'course_search_clickcount'
    

    业务开发

    消费kafka数据、数据清洗与统计
    1)实体类

    CourseSearchClickCount.scala:
    
    /**
      * 从搜索引擎过来的课程点击数实体类
      * @param day_search_course
      * @param click_count
      */
    case class CourseSearchClickCount(day_search_course: String, click_count: Long)
    

    3)添加依赖

    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>${hbase.version}</version>
    </dependency>
    
    <!-- hadoop -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>${hadoop.version}</version>
    </dependency>
    

    3)工具类
    DateUtils.scala

    mport java.util.Date
    
    import org.apache.commons.lang3.time.FastDateFormat
    
    /**
      * 日期时间工具类
      */
    object DateUtils {
    
      val OLD_FORMAT = FastDateFormat.getInstance("yyyy-MM-dd HH:mm:ss")
    
      val TARGET_FORMAT = FastDateFormat.getInstance("yyyyMMddHHmmss")
    
      def getTime(time: String) = {
        OLD_FORMAT.parse(time).getTime
      }
    
      def parseToMinute(time: String) = {
        TARGET_FORMAT.format(new Date(getTime(time)))
      }
    
      def main(args: Array[String]): Unit = {
        println(parseToMinute("2018-9-6 13:58:01"))
      }
    }
    

    HBaseUtils.java

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.client.HBaseAdmin;
    import org.apache.hadoop.hbase.client.HTable;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    
    /**
     * HBase操作工具类,Java工具类建议采用单例模式封装
     */
    public class HBaseUtils {
    
        HBaseAdmin admin = null;
        Configuration configuration = null;
    
        /**
         * 私有构造方法
         */
        private HBaseUtils() {
    
            configuration = new Configuration();
            configuration.set("hbase.zookeeper.quorum", "localhost:2181");
            configuration.set("hbase.rootdir", "hdfs://localhost:8020/hbase");
    
            try {
                admin = new HBaseAdmin(configuration);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private static HBaseUtils instance = null;
    
        public static synchronized HBaseUtils getInstance() {
            if (null == instance) {
                instance = new HBaseUtils();
            }
            return instance;
        }
    
        /**
         * 根据表名获取到HTable实例
         *
         * @param tableName
         * @return
         */
        public HTable getTable(String tableName) {
            HTable table = null;
            try {
                table = new HTable(configuration, tableName);
            } catch (IOException e) {
                e.printStackTrace();
            }
            return table;
        }
    
        /**
         * 添加一条记录到表中
         *
         * @param tableName
         * @param rowkey
         * @param cf
         * @param column
         * @param value
         */
        public void put(String tableName, String rowkey, String cf, String column, String value) {
            HTable table = getTable(tableName);
    
            Put put = new Put(Bytes.toBytes(rowkey));
            put.add(Bytes.toBytes(cf), Bytes.toBytes(column), Bytes.toBytes(value));
            try {
                table.put(put);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    

    4)数据库操作
    CourseSearchClickCountDAO.scala

    import com.lihaogn.spark.project.utils.HBaseUtils
    import com.lihaogn.sparkProject.domain.{CourseClickCount, CourseSearchClickCount}
    import org.apache.hadoop.hbase.client.Get
    import org.apache.hadoop.hbase.util.Bytes
    
    import scala.collection.mutable.ListBuffer
    
    /**
      * 数据访问层,从搜索引擎过来的课程点击数
      */
    object CourseSearchClickCountDAO {
    
      val tableName = "course_search_clickcount"
      val cf = "info"
      val qualifer = "click_count"
    
      /**
        * 保存数据到HBase
        *
        * @param list
        */
      def save(list: ListBuffer[CourseSearchClickCount]): Unit = {
    
        val table = HBaseUtils.getInstance().getTable(tableName)
    
        for (ele <- list) {
          table.incrementColumnValue(Bytes.toBytes(ele.day_search_course),
            Bytes.toBytes(cf),
            Bytes.toBytes(qualifer),
            ele.click_count)
        }
      }
    
      /**
        * 根据rowkey查询值
        *
        * @param day_search_course
        * @return
        */
      def count(day_search_course: String): Long = {
        val table = HBaseUtils.getInstance().getTable(tableName)
    
        val get = new Get(Bytes.toBytes(day_search_course))
        val value = table.get(get).getValue(cf.getBytes, qualifer.getBytes)
    
        if (value == null) {
          0L
        } else
          Bytes.toLong(value)
      }
    }
    

    5)主类

    import com.imooc.spark.demain.{ClickLog, CourseSearchClickCount}
    import com.imooc.spark.dto.CourseSearchClickCountDAO
    import com.imooc.spark.util.DateUtils
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
    import org.apache.spark.streaming.{Seconds, StreamingContext}
    import scala.collection.mutable
    import scala.collection.mutable.ListBuffer
    /**
      * description
      *
      * @author zhiying.dong@hand-china.com 2019/05/24 16:54
      */
    object SparkStreamingApp {
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf()
          .setAppName("DirectKafka")
          .setMaster("local[2]")
          .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    
        val ssc = new StreamingContext(conf, Seconds(2))
    
        val topicsSet = Array("kafka_spark")
        val kafkaParams = mutable.HashMap[String, String]()
        //必须添加以下参数,否则会报错
        kafkaParams.put("bootstrap.servers", "192.168.30.131:9092")
        kafkaParams.put("group.id", "group1")
        kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
        val messages = KafkaUtils.createDirectStream[String, String](
          ssc,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topicsSet, kafkaParams
          )
        )
    
        // 步骤一:测试数据接收
        val logs = messages.map(_.value)
    
        // 步骤二:数据清洗
        val cleanData = logs.map(line => {
          val infos = line.split("\t")
          val url = infos(2).split(" ")(1)
          var courseId = 0
          // 获取课程编号
          if (url.startsWith("/class")) {
            val courseHtml = url.split("/")(2)
            courseId = courseHtml.substring(0,courseHtml.lastIndexOf(".")).toInt
          }
          ClickLog(infos(0), DateUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
        }).filter(clicklog => clicklog.courseId != 0)
    
        // 步骤三:统计从搜索引擎过来的从今天开始到现在的课程的访问量
        cleanData.map(x=>{
          val referer=x.referer.replaceAll("//","/")
          val splits=referer.split("/")
          var host=""
          if(splits.length>2) {
            host=splits(1)
          }
    
          (host,x.courseId,x.time)
        }).filter(_._1!="").map(x=>{
          (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
        }).reduceByKey(_+_).foreachRDD(rdd=>{
          rdd.foreachPartition(partitionRecords=>{
            val list =new ListBuffer[CourseSearchClickCount]
    
            partitionRecords.foreach(pair=>{
              list.append(CourseSearchClickCount(pair._1,pair._2))
            })
            // 写入数据库
            CourseSearchClickCountDAO.save(list)
    
          })
        })
        // 开始计算
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    6)测试
    启动Kafka环境、flume agent、日志生成程序、Hadoop环境、hbase环境、本地Spark Streaming程序。
    查看hbase中course_search_clickcount表,出现以下内容说明处理结果成功存储到了hbase


    image.png

    7)后续操作
    通过以上几步已经把日志通过搜索引擎+课程编号为主键,和总点击量存储到了数据库。此时可通过Javaweb结合echars等开源展示框架为企业进行展示,实现可视化效果。

    总结

    这次实战是学习的慕课网上spark steaming的课程,只是找找Spark Streaming实战的感觉,从搭建环境到日志处理实战,理解了Spark Streaming整套处理的流程。其中很多代码都是照着视频一行一行敲出来的,下来还需要更多的扩展才行。接下来准备学习Scala的知识,在业务逻辑开发这一部分做更多的深入。

    相关文章

      网友评论

        本文标题:Spark Streaming实战

        本文链接:https://www.haomeiwen.com/subject/bwsutctx.html