大数据项目之仿天猫数据分析

作者: z七夜 | 来源:发表于2019-09-26 17:36 被阅读0次

    简介

    项目简介

    此项目是实现仿大数据项目流程,包括,日志收集传输,日志格式化,数据实时分析,数据持久化到HDFS,数据离线报表统计,离线任务调度,日志记录搜索几大部分

    首先说一下这个项目的大致流程,这个项目是仿天猫数据分析,是自己模拟用户购买浏览商品,生成日志,对日志进行收集,然后分两部分,一部分对数据实时分析,生成日用户活跃地理位置,第二部分是数据持久化之后,T+1对数据进行分析,统计各省销量以及各省活跃用户数

    项目数据流

    1. 日志数据生成

      日志格式: requestid, ts, userid, 城市,经度,纬度,操作(浏览,购买)

    2. flume采集日志数据进入kafka log topic

    3. kafkastream消费log topic日志,写入process topic

      对数据进行格式化处理,以及过滤数据。格式化后的数据格式: requestid, ts, userid, 城市,经度,纬度,操作

    4. 实时模块

      sparkstreaming处理process topic日志,扔进realtime topic
      格式化数据,写到process topic中,得到城市和用户id

    5. flume采集process topic数据,写入hdfs

      持久化日志到hdfs中

    6. report模块(离线处理模块)

      T+1离线处理模块,spark计算hdfs中的数据,统计前一天的各省销售记录,写入mysql

    7. azkaban调度任务

    8. ElasticSearch查询历史记录

    9. web页面实时展示活跃用户,和报表页面

    然而。。电脑垃圾,扛不住,只做了一部分

    azkaban,elasticsearch,web页面没有做

    项目构成

    1. logbuilder

    模拟日志生成(后面为了方便,写了一个shell用于日志生成)

    1. kafkastream

      kafkastream清洗日志

    2. sparkstream

      sparkstream实时处理日志显示操作用户的地理位置

    3. report

      T+1报表项目,批处理日志,分析各省销售量对比,写入mysql

    实现

    我们按照数据流的方式来写实现

    1. 日志生成

    使用SpringBoot做了一个日志生成器。模拟生成日志,日志格式

    logger:>>>> requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
    
    public static void main(String[] args) {
    
            SpringApplication.run(LogBuilderApplication.class);
    
            HashMap map = new HashMap<Integer, String[]>();
    
            // 存储城市,经纬度
            map.put(0,new String[]{"海门","121.15","31.89"});
            map.put(1,new String[]{"盐城","120.13","33.38"});
            map.put(2,new String[]{"上海","121.48","31.22"});
            map.put(3,new String[]{"厦门","118.1","24.46"});
            Random random = new Random();
    
            while (true){
                int i1 = random.nextInt(map.size());
                String[] o = (String[]) map.get(i1);
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                long ts = System.currentTimeMillis();
                String requestid = UUID.randomUUID().toString();
    
                int userid = random.nextInt(100000);
                // requestid,userid, 城市,经度,纬度,操作(浏览,购买)0 浏览 1 购买
                logger.info("logger:>>>>{},{},{},{},{},{},{}",requestid, ts, userid, o[0],o[1],o[2],random.nextInt(2));
            }
        }
    

    由于不想打包,重启,就自己用shell写了一个日志生成器,更好使用

    #!/bin/bash
    #二维数组
    city=('海门 121.15 31.89' '盐城 120.13 33.38' '上海 121.48 31.22' '厦门 118.1 24.46')
    #city=('海门 121.15" "31.89"' '"盐城" "120.13" "33.38"' '"上海" "121.48" "31.22"' '"厦门" "118.1" "24.46"')
    #获取数组长度
    echo ${#city[@]}
    len=$((${#city[@]}-1))
    echo $len
    #死循环,随机数0-数组长度
    while :
    do
        rand=`shuf -i0-${len} -n1`
        echo $rand
        echo ${city[${rand}]}
     c=(${city[${rand}]})
    ci=${c[0]}
    cj=${c[1]}
    cw=${c[2]}
      req=$(cat /proc/sys/kernel/random/uuid)
    date=`date -d 'day' +%Y%m%d`
    ts=`date -d $date +%s`
    userid=`shuf -i1-100000 -n1`
    action=`shuf -i0-1 -n1`
        echo "logger:>>>>$req,$ts,$userid,${ci},${cj},${cw},${action}"
        sleep 3
    done
    
    

    2. 日志采集

    日志采集使用Flume,将日志文件数据采集到Kafka中,

    # 监控本地文件写到kafka
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #设置source
    a1.sources.r1.type = exec
    a1.sources.r1.command = tail -F /root/test_flume_file/test_flume_file
    
    #设置Sink
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.kafka.topic = test
    a1.sinks.k1.kafka.bootstrap.servers = 192.168.33.4:9092
    a1.sinks.k1.kafka.flumeBatchSize = 20
    a1.sinks.k1.kafka.producer.acks = 1
    a1.sinks.k1.kafka.producer.linger.ms = 1
    a1.sinks.k1.kafka.producer.compression.type = snappy
    
    #设置channels
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    #连接
    a1.sources.r1.channels = c1
    #注意,是channel
    a1.sinks.k1.channel = c1
    
    

    3. KafkaStream进行数据清洗

    这一步主要是对日志数据进行清洗,过滤掉不符合规范的日志,

    过滤后的日志

    requestid,userid,ts,城市,经度,维度,操作(0浏览 1购买)
    
     public static void main(String[] args) {
    
            String from = "log";
            String to = "process";
            Properties properties = new Properties();
    
            // 设置application id
            properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG,"stream-tmall");
            // 设置kafka地址
            properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092");
    
            // 创建拓扑结构
            Topology topology = new Topology();
            // 构建数据来源,数据处理逻辑,数据去向
            topology.addSource("SOURCE",from)
                    .addProcessor("PROCESS", ()-> new LogProcesser(), "SOURCE")
                    .addSink("SINK", to, "PROCESS");
            //过时
    //        TopologyBuilder builder = new TopologyBuilder();
    //        builder.addSource("SOURCE",from)
    //                .addProcessor("PROCESS", ()->new LogProcesser(), "SOURCE")
    //                .addSink("SINK",to);
    //
    //
    //        // 创建kafkastream
            KafkaStreams streams = new KafkaStreams(topology,properties);
            // 开启流处理
            streams.start();
            System.out.println("kafkaStream is start!!!");
    
    
        }
        
        
    
    public class LogProcesser implements Processor<byte[],byte[]> {
    
        private ProcessorContext context;
    
        @Override
        public void init(ProcessorContext context) {
    
            this.context = context;
        }
    
        @Override
        public void process(byte[] key, byte[] value) {
    
            // 核心流程,处理日志
            String line = new String(value);
            if (line.contains("logger:>>>>")){
                System.out.println("LogProcess process data:" + line);
                String[] split = line.split("logger:>>>>");
                // 转发
                context.forward("LogProcess".getBytes(), split[1].trim().getBytes());
            }
            context.commit();
        }
    
        @Override
        public void punctuate(long timestamp) {
    
        }
    
        @Override
        public void close() {
    
        }
    }
    
    

    将Kafka log topic中的数据消费,将日志数据进行格式化,写到Process topic中

    4. SparkStreaming数据实时处理

    我们实时部分,是消费Kafka中process topic中的数据,每一条数据都是今天用户的操作,所以我们将每一条日志的城市取出来,放到realtime topic中,等待后序消费,把数据推送到前端进行实时展示(这部分没做,页面不好写)

    public class SparkStreamingProcesser {
    
        private static final String brokers = "192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092";
        private static final String group_id = "tmall_online";
        private static final List<String> topic = new ArrayList<String>(Arrays.asList("process"));
    
        private static final String toTopic = "realtime";
    
    
        public static void main(String[] args) {
            //1. 得到spark上下文
            SparkConf conf = new SparkConf().setAppName("tmall_online").setMaster("local[*]");
    
            //2. 创建sparkstreamingcontext。每隔2钟会处理一次收集到的数据
            JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
    
            //3. 创建kafka的参数
            HashMap<String, Object> kafkaParams = new HashMap<String, Object>();
    
            // 设置kafka集群地址
            kafkaParams.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            // 设置消费者组
            kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, group_id);
            // 设置key反序列化类
            kafkaParams.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            kafkaParams.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    
    
            // 4. 通过参数创建一个kafka stream
            JavaInputDStream<ConsumerRecord<Object, Object>> stream = KafkaUtils.createDirectStream(
                    jssc,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.Subscribe(topic, kafkaParams));
    
            // 5.获取数据并处理
            JavaDStream<String> map = stream.map(msg-> msg.value().toString());
    
    
            JavaDStream<Object> map1 = map.map(x -> {
                KafkaProducerUtils producerUtils = new KafkaProducerUtils();
                String[] split = x.split(",");
                System.out.println("接收到:" + split[1]+","+split[2]);
                producerUtils.sendMessage(toTopic, split[3]+","+split[4]+","+split[5]);
                return "a";
            });
            map1.print();
    
    
            jssc.start();
    
            try {
                jssc.awaitTermination();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }finally {
                jssc.close();
            }
    
        }
    
    
    }
    
    

    5. 日志持久化

    我们需要将用户操作日志持久化到HDFS中,我们将格式化之后的数据采集到HDFS,所以我们使用Flume将Kafka中process topic的数据采集到HDFS

    # 监控kafka写到hdfs
    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    #设置source
    
    a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
    a1.sources.r1.batchSize = 500
    a1.sources.r1.batchDurationMillis = 2000
    a1.sources.r1.kafka.bootstrap.servers = 192.168.33.3:9092,192.168.33.4:9092,192.168.33.5:9092
    a1.sources.r1.kafka.topics = process
    a1.sources.r1.kafka.consumer.group.id = c_flume
    
    #设置Sink
    a1.sinks.k1.type = hdfs
    a1.sinks.k1.hdfs.path = /user/customfile/tmall/dt=%Y%m%d
    a1.sinks.k1.hdfs.filePrefix = hadoop1_%H_events_
    a1.sinks.k1.hdfs.fileSuffix=.log
    a1.sinks.k1.hdfs.rollSize = 102400000
    a1.sinks.k1.hdfs.rollCount = 0
    a1.sinks.k1.hdfs.rollInterval = 3600
    a1.sinks.k1.hdfs.idleTimeout = 5400
    #是否使用本地时间戳
    a1.sinks.k1.hdfs.useLocalTimeStamp = true
    #设置文件相关 文件类型为纯文本
    a1.sinks.k1.hdfs.fileType = DataStream
    #设置channels
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 1000
    
    #连接
    a1.sources.r1.channels = c1
    #注意,是channel
    a1.sinks.k1.channel = c1
    
    

    6. 报表项目

    当每天数据采集到HDFS之后,我们需要T+1处理这些日志,产生报表,我们这里主要产生两个指标

    1. 各省活跃用户
    2. 各省销售量

    这里我们使用Scala编写Spark程序,做报表,并没有写到mysql,将csv数据写到hdfs中,后序可以导入mysql,电脑带不起了

    object ReportStatistics {
    
      def main(args: Array[String]): Unit = {
    
        val spark = SparkSession.builder()
          .appName("report_statistics")
          .enableHiveSupport()
          .getOrCreate()
    
    
        val date = args(0)
        //1. 读取前一天数据
    
        import spark.implicits._
    
        val df = spark.sparkContext.textFile(s"/user/custom/tmall/dt=$date")
          .filter(x=>{
            val strings: Array[String] = x.split(",")
            strings.length == 7
          })
          .map(x=>{
            val strings: Array[String] = x.split(",")
            (strings(0),strings(1),strings(2),strings(3),strings(6))
          }).toDF("requestid", "ts", "userid", "provice", "action")
    
    
        df.cache().createOrReplaceTempView("t1")
        // 统计各省用户日活
        spark.sql(
          s"""
             |select count(distinct(userid)) as users,provice from t1 group by provice
             |union all
             |select count(distinct(userid)) as users,"all" as provice
           """.stripMargin).repartition(1).write
          .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/users")
    
        // 统计各省销售量
    
        spark.sql(
          s"""
             |select count(requestid) as cnt,provice from t1 where action=0 group by provice
             |union all
             |select count(requestid) as cnt from t1 where action=0
           """.stripMargin).repartition(1).write
          .mode(SaveMode.Overwrite).csv(s"/user/data/reporting/tmall/dt=$date/sales")
      }
    
    }
    
    

    总结

    项目现在就是做到现在这样,后面还有Azkaban任务调度,Flume将数据采集到ElasticSearch中,用于试试查询,还有web显示页面。

    显示的话,报表基本做完,一个web项目读取mysql展示就行,如果有想完善的,可以完善一下,

    项目中遇到的问题

    1. Flume将数据采集到HDFS的时候产生了大量的小文件,后面调整了配置参数

    源码看这里: https://gitee.com/zhangqiye/tmall

    如有问题,可以加群: 552113611

    相关文章

      网友评论

        本文标题:大数据项目之仿天猫数据分析

        本文链接:https://www.haomeiwen.com/subject/uvjnuctx.html