模拟网站实时数据流统计

作者: _Kantin | 来源:发表于2017-12-31 17:43 被阅读61次

平台:IDEA 2016
语言:scala , java,python
代码链接:https://pan.baidu.com/s/1o7JMCvo
功能:用python模拟实时数据的生成,用crontab来执行shell脚本生成实时流数据,之后用flume把log数据输出到kafka,在IDEA中通过sparkStreaming时候消费kafka的数据,存放到Hbase中。

1.Python模式数据的生成
import random
import time

url_paths = [
        "class/112.html",
        "class/128.html",
        "class/145.html",
        "class/146.html",
        "class/131.html",
        "class/130.html",
        "learn/821",
        "course/list"
]

ip_slices = [132,156,124,10,29,167,143,187,30,46,55,63,72,87,98,168]

       
http_referers=[
        "http://www.baidu.com/s?wd={query}",
        "https://www.sogou.com/web?query={query}",
        "http://cn.bing.com/search?q={query}",
        "https://search.yahoo.com/search?p={query}",
]
search_keyword = [
        "Spark SQL实战",
        "Hadoop基础",
        "Storm实战",
        "Spark Streaming实战",
        "大数据面试"
]

status_codes = ["200","404","500"]

def sample_url():
    return  random.sample(url_paths,1)[0]
def sample_id():
    slice =  random.sample(ip_slices,4)
    return ".".join(str(item) for item in slice)

def sample_referer():
    #一部分没有分流的地址,用“-”来表示,uniform用于在范围内生成随机数
    if random.uniform(0,1)>0.2:
       return "-"
    #sample返回的是一个list,不取出来的话,是带一个[]的
    refer_url=random.sample(http_referers,1)
    query_url = random.sample(search_keyword,1)
    return refer_url[0].format(query=query_url[0])
def sample_status():
    return random.sample(status_codes,1)[0]

def generate_log(count=10):
    time.str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
    while count>=1:
        query_log="{ip}\t{local_time}\t\"GET /{url} HTTP/1.1\"\t{status_code}\t{referer}".format(
            url = sample_url(),ip = sample_id(),referer=sample_referer(),status_code = sample_status(),
            local_time=time.str
        )
        print(query_log)
        count = count -1 

if __name__ == "__main__":
    generate_log(100)
2.crontable 定时任务的配置

(1)编写log_generator.sh 文件,输入命令:python /home/hadoop/data/project/generate_log.py
(2)输入 crontable -e ,输入命令:*/1 * * * * /home/hadoop/data/project/log_generator.sh

3.flume配置文件及启动flume的命令

(1)flume对接到kafka的配置文件:streaming_project.conf(关于flume的配置可参考官网)

exec-memory-kafka.sources = exec-source
exec-memory-kafka.sinks = kafka-sink
exec-memory-kafka.channels = memory-channel

exec-memory-kafka.sources.exec-source.type = exec
exec-memory-kafka.sources.exec-source.command = tail -F /home/hadoop/data/project/logs/access.log
exec-memory-kafka.sources.exec-source.shell = /bin/sh -c

exec-memory-kafka.channels.memory-channel.type = memory

exec-memory-kafka.sinks.kafka-sink.type = org.apache.flume.sink.kafka.KafkaSink
exec-memory-kafka.sinks.kafka-sink.brokerList = hadoop000:9092
exec-memory-kafka.sinks.kafka-sink.topic = streamingtopic
exec-memory-kafka.sinks.kafka-sink.batchSize = 5
exec-memory-kafka.sinks.kafka-sink.requiredAcks = 1

exec-memory-kafka.sources.exec-source.channels = memory-channel
exec-memory-kafka.sinks.kafka-sink.channel = memory-channel

(2)启动flume之前先启动zookeeper和kafka
./zkServer.sh start 和 ./kafka-server-start.sh -daemon /home/hadoop/app/kafka_2.11-0.9.0.0/config/server.properties

(3)启动flume的命令
flume-ng agent --name exec-memory-kafka --conf /home/hadoop/app/apache-flume-1.6.0-cdh5.7.0-bin/conf --conf-file /home/hadoop/data/project/streaming_project2.conf

(4)可启动一个kafka的消费者来测试配置文件
kafka-console-consumer.sh --zookeeper hadoop000:2181 --topic streamingtopic

4.sparkStreaming对kafka消息进行消费

(1)运行在local模式下,先测试下流程是否打通,其中argsw为:hadoop000:2181 test streamingtopic

if(args.length != 4) {
      System.err.println("Usage: KafkaReceiverWordCount <zkQuorum> <group> <topics> <numThreads>")
    }
    val Array(zkQuorum,groupId,topics,numThreads) = args
    val sparkConf =new SparkConf().setAppName("ImoocStatStreamingApp").setMaster("local[5]")
    val ssc = new StreamingContext(sparkConf,Seconds(60))

    val topicMap = topics.split(",").map((_,numThreads.toInt)).toMap

    val messages = KafkaUtils.createStream(ssc,zkQuorum,groupId,topicMap)

    messages.map(_._2).count().print()
    ssc.start()
    ssc.awaitTermination()

(2)第二步骤:开始清理log日志。其中ClickLog为case class,DataUtils为日期处理类(详情见代码)
此时返回的cleanData为ClickLog对象,包含:ip:String ,time :String,courseId:Int,statusCode:Int, referer:String。

 //第二步,开始清洗数据
    val log = messages.map(_._2)
    val cleanData = log.map(line=>{
      val infos = line.split("\t")
      //url= /class/128.html"

      val url = infos(2).split(" ")(1)
      var courseId=0

      if(url.startsWith("/class")){
        val courseIdHTML = url.split("/")(2)
        //把课程为class的分离出来
        courseId = courseIdHTML.substring(0,courseIdHTML.lastIndexOf(".")).toInt
      }
      ClickLog(infos(0), DataUtils.parseToMinute(infos(1)), courseId, infos(3).toInt, infos(4))
    }).filter(clicklog => clicklog.courseId != 0) //把那些编号为0 的去掉

(3)第三步骤:统计今天到现在为止课程的访问人数,操作对象还是cleanData,但是case class 为CourseClickCount,同时CourseClickCountDAO为Hbase存储操作

// 测试步骤三:统计今天到现在为止实战课程的访问量
    cleanData.map(x =>{
      (x.time.substring(0,8)+"_"+x.courseId,1)
    }).reduceByKey(_+_).foreachRDD(rdd =>{
      rdd.foreachPartition(partitionRecords =>{
        val list = new ListBuffer[CourseClickCount]
        partitionRecords.foreach(pair =>{
          list.append(CourseClickCount(pair._1,pair._2))
        })
        CourseClickCountDAO.save(list)
      })
    })

(4)关于hbase表的创建及java配置,分别有: course_clickcount 和course_search_clickcount
然后列族都是info, java 连接的代码如下:

public class HBaseUtils {
    HBaseAdmin admin = null;
    Configuration configuration = null;


    private HBaseUtils(){
        configuration = new Configuration();
        configuration.set("hbase.zookeeper.quorum", "hadoop000:2181");
        configuration.set("hbase.rootdir", "hdfs://hadoop000:8020/hbase");

        try {
            admin = new HBaseAdmin(configuration);
        } catch (IOException e) {

        }
    }
    private static  HBaseUtils instance=  null;
    public  static  synchronized  HBaseUtils getInstance(){
        if (instance == null ){
            instance = new HBaseUtils();
        }
        return  instance;
    }

    public HTable getTable(String tableName){
        HTable table  = null ;
        try {
            table = new HTable(configuration,tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
        return table;
    }
    /**
     * 添加一条记录到HBase表
     * @param tableName HBase表名
     * @param rowkey  HBase表的rowkey
     * @param cf HBase表的columnfamily
     * @param column HBase表的列
     * @param value  写入HBase表的值
     */
    public  void put(String tableName, String rowkey,String cf ,String column , String value  ){
        HTable table = getTable(tableName);
        Put put = new Put(Bytes.toBytes(rowkey));
        put.add(Bytes.toBytes(cf),Bytes.toBytes(column),Bytes.toBytes(value));

        try {
            table.put(put);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void main(String[] args) {
        String tableName = "imooc_course_clickcount" ;
        String rowkey = "20171111_88";
        String cf = "info" ;
        String column = "click_count";
        String value = "30";

        HBaseUtils.getInstance().put(tableName, rowkey, cf, column, value);
    }

(5)第四步骤:统计今天为止从搜索引擎过来的数据,其中CourseSearchClickCount为case class ,然后CourseSearchClickCountDAO为保存的操作

 // 测试步骤四:统计从搜索引擎过来的今天到现在为止实战课程的访问量
    cleanData.map(x=>{
      val referer = x.referer.replaceAll("//","/")
      val splits = referer.split("/")
      var host = ""
      if(splits.length>2){
        host=splits(1)
      }
      (host,x.courseId,x.time)
    }).filter(_._1 !="").map(x =>{
      (x._3.substring(0,8)+"_"+x._1+"_"+x._2,1)
    }).reduceByKey(_+_).foreachRDD(rdd =>{
      rdd.foreachPartition(partitionRecords =>{
        val list = new ListBuffer[CourseSearchClickCount]
        partitionRecords.foreach(pair =>{
          list.append((CourseSearchClickCount(pair._1,pair._2)))
        })
        CourseSearchClickCountDAO.save(list)
      })
    })
5.两部分数据在Hbase中的存储结果
image.png image.png
6.把项目进行服务器部署及其遇到问题

(1)先注释掉代码中的setMaster("local[5]"),用spark submit提交的时候指定

(2) mvn clean package -DskipTests 打包命令,会报src /main/scala/ package 没有找到,那是因为Scala和java文件没有关联起来,在pom文件中注释点编译定位文件即可。


image.png

(3) 编译之后,在项目的target目录下,可以找到相应的jar文件

(4)用spark提交命令到集群上运行,先看看当前启动的线程。
14914 NameNode
19347 Jps
15491 NodeManager
15049 DataNode
15385 ResourceManager
15225 SecondaryNameNode
10473 Kafka
16137 HRegionServer
15980 HMaster
4798 QuorumPeerMain
16607 Main

启动命令为:spark-submit --master local[5] --class com.lzk.spark.project.spark.ImoocStatStreamingApp
/home/hadoop/lib/sparktrain-1.0.jar hadoop000:2181 test streamingtopic 1

(5)遇到的问题


image.png

***即是缺少了package:--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.0

image.png

添加进去hbase的所有jar --jars $(echo /home/hadoop/app/hbase-1.2.0-cdh5.7.0/lib/*.jar | tr '' ',')

把相关参数加入到spark的命令提交命令中去。

相关文章

网友评论

    本文标题:模拟网站实时数据流统计

    本文链接:https://www.haomeiwen.com/subject/uqzzixtx.html