美文网首页
利用HDFS、RabbitMQ、MongoDB实现统计

利用HDFS、RabbitMQ、MongoDB实现统计

作者: 陈兄 | 来源:发表于2019-01-25 11:44 被阅读0次
    image

    业务分析

    IPTV行业桌面数据分析,分析用户行为数据。每天数据量可达3000万。统计用户访问量(PV)、UV(独立用户)、VV(视频播放次数)、
    DAU(日活)、WAU(周活)、MAU(月活)、月开机率、点击次数、排行榜数据等等。

    架构设计

    jiagoutu.jpg
    • client上报数据存入缓存中
    • 定时将缓存的字符流刷新到文件,并将文件上传到hdfs
    • 通过mq 客户端发送至服务端
    • mq服务端监听到hdfs进行处理( 将字节数组反序列化为实体Bean)
    • 将其实体Bean写入mongo数据库
    • 利用mongoDB聚合函数aggregate()查询(分库分表)

    环境部署

    HADOOP环境搭建:[Hadoop环境搭建地址]
    RabbitMQ环境搭建:[RabbitMQ环境搭建地址]

    部分代码

    客户端收集数据

    @RequestMapping(value = "/singleData/{policyId}")
     public ResponseData postSingleData(@PathVariable("policyId") String policyId, HttpServletRequest resuest) {
         ResponseData response = getResponseData();
         try {
             ServletInputStream inputStream = resuest.getInputStream();
    
             StringBuffer sb = new StringBuffer();
             BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(inputStream, "utf-8"));
             String data = null;
             while ((data = bufferedReader.readLine()) != null) {
                 sb.append(data);
                 sb.append("\n");
             }
             if (sb.length() <= 0) {
                 throw new Exception("请提供提交数据");
             }
             DataStr s = new DataStr(policyId, sb);
             service.receiveNewData(s);
         } catch (Exception e) {
             response.setCode(0);
             response.setMsg(e.getMessage());
         }
    
         return response;
     }
    

    定时将缓存的字符流刷新到文件

    private void flushData() throws Exception {
          Queue<DataStr> dataCache = DataPond.getDataCache();
          DataStr dataIte = null;
          Integer size = dataCache.size();
          logger.info("There are [" + size + "] of datas in queue");
          while (size > 0 && (dataIte = dataCache.poll()) != null) {
              String policyId = dataIte.getPolicyId();
              Map<String, FileStruct> fileCache = DataPond.getFileCache();
              FileStruct fileStruct = fileCache.get(policyId);
              if (fileStruct == null) {
                  fileStruct = new FileStruct(policyId);
                  fileCache.put(policyId, fileStruct);
              }
              fileStruct.write(dataIte.getData().toString());
              fileStruct.write("\n");
              size--;
          }
      }
    

    文件上传到hdfs,并通过mq发送

    private void uploadFilesAndSendToMQ() throws Exception {
           /* 遍历文件 */
           Map<String, FileStruct> fileCache = DataPond.getFileCache();
    
           Set<String> keySet = fileCache.keySet();
           for (String key : keySet) {
               FileStruct fs = fileCache.get(key);
               /* 标记是否可以被flush,并上传hdfs */
               Boolean shallBeFlush = false;
               if (fs.getFielSize() >= SystemChangeableConstant.MAX_FILESIZE) {
                   shallBeFlush = true;
               }
               if (System.currentTimeMillis() - fs.getLastUpdateTime() >= SystemChangeableConstant.MAX_FILE_NOACTION_TIMESPAN) {
                   shallBeFlush = true;
               }
               if (shallBeFlush) {
                   if (!hdfsUtil.isOpened()) {
                       //TODO 临时获取hadoop环境变量
                       System.setProperty("hadoop.home.dir", "D:\\hadoop-2.6.5");
                       hdfsUtil.open();
                   }
                   logger.info("File of policy [" + key + "] is full and will send out!");
                   fs.flush();
                   try {
                       transferFileToDfs(fs);
                       logger.info("File of policy [" + key + "] send to hdfs success");
                   } catch (Exception e) {
                       logger.error("File of policy [" + key + "] send to hdfs fail as " + e.getMessage());
                   }
                   try {
                       sendToMq(fs);
                       logger.info("File of policy [" + key + "] send to MQ success");
                   } catch (Exception e) {
                       logger.error("File of policy [" + key + "] send to MQ fail as " + e.getMessage());
                   }
                   fileCache.remove(key);
                   fs.destroy();
               }
           }
           hdfsUtil.close();
       }
    

    mq服务端监听到hdfs进行处理

    /* 提取command放入线程对象 */
      Consumer consumer = new DefaultConsumer(channel) {
          @Override
          public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
              logger.info("Router [" + SystemConstant.NEW_FILE_QUEUE + "] received new file");
              FileStruct fileStruct = (FileStruct) SerializingUtil.deserialize(body);
              factory.createNewTask(fileStruct);
          }
      };
      channel.basicConsume(SystemConstant.NEW_FILE_QUEUE, true, consumer);
    

    数据入mongo

        for (HashMap aMap : parsedDatas) {
           // 预置的默认时间列
           aMap.put(DataTable.DEFAUL_TTIMECOLUMN, now);
    
           dataTable.genId(aMap);
           String tableName = dataTable.getSplitTableName(aMap);
           Document doc = new Document(aMap);
    
           if (dataTable.hasIdentify()) {
               mongoConnector.insertSingle(tableName, doc);
           } else {
               List<Document> list = tableToDatasMap.get(tableName);
               if (list == null) {
                   list = new ArrayList<Document>(parsedDatas.size() / 2);
                   tableToDatasMap.put(tableName, list);
               }
               list.add(doc);
           }
       }
       for (String key : tableToDatasMap.keySet()) {
           mongoConnector.insert(key, tableToDatasMap.get(key));
       }
    

    聚合分析

    public List<Map<String, Object>> getExhibitionCount(Date start , Date end ) throws ParseException {
         List<Map<String, Object>> map = new ArrayList<Map<String, Object>>();
          //获取区间日期天数
         int days = differentDaysByMillisecond(start, end);
         for (int i = 0 ;i < days + 1 ; i++){
             Aggregation aggregation = null;
             aggregation = Aggregation.newAggregation(
                     Aggregation.group("templateCode","columnId","positionId").sum("duration").as("duration"),
                     Aggregation.project("templateCode","columnId","positionId","duration")
             );
             String tableName = this.getTableName(start);
             AggregationResults<HashMap> aggregate = mongoTemplate.aggregate(aggregation, tableName, HashMap.class);
             //key 日期  value 当天数据
             List<HashMap> mappedResults = aggregate.getMappedResults();
             System.out.print("处理后数据:" + mappedResults);
             for (Map adCountDtoMap:mappedResults) {
                 String json = JsonUtils.obj2Str(adCountDtoMap);
                 AdCountDto adCountDto = JsonUtils.str2Obj(json, AdCountDto.class);
                 adCountDto.setDate(dateFormatCheck.format(new Date()));
                 createIndex(COL_NAME);
                 mongoTemplate.insert(adCountDto,COL_NAME);
             }
             start = dataPlus(start ,1);
         }
         return map;
     }
    

    相关文章

      网友评论

          本文标题:利用HDFS、RabbitMQ、MongoDB实现统计

          本文链接:https://www.haomeiwen.com/subject/upjmjqtx.html