美文网首页
Storm+Hbase广告实时统计

Storm+Hbase广告实时统计

作者: 阿坤的博客 | 来源:发表于2018-08-10 17:00 被阅读352次

    本文主要讲述使用Kafka+Strom+Hbase搭建的一套广告实时计算系统。其中服务器显示使用的是SpringBoot+Vue+ElementUI+EChats.

    主要内容:

    • 1.需求
    • 2.日志格式
    • 3.Hbase表格设计
    • 4.编写Storm程序
    • 5.Kafka接收消息
    • 6.Hbase数据查询
    • 7.参考

    1.需求

    • 1、某个广告在某个省的当前投放量
    • 2、某个广告在某个市的当前投放量
    • 3、某个广告在某个用户客户端上的当前投放量
    • 4、某个广告在累加一段时间内的某个省额历史投放趋势
    • 5、某个广告在累加一段时间内的某个市额历史投放趋势
    • 6、某个广告在累加一段时间内的某个客户端历史投放趋势
    • 7、某个广告的当前的点击量
    • 8、某个广告在累加一段时间内的点击趋势
    效果预览1
    效果预览2

    2.日志格式

    2014-01-13\t19:11:55\t{"adid":"31789","uid":"9871","action":"view"}\t63.237.239.3\t北京\t北京
    
    日期:2014-01-13
    时间:19:11:55
    Json:方便扩展
      adid:广告ID
      uid:用户ID
      action:用户行为click、view
    IP:63.237.239.3
    省:北京
    市:北京
    

    3.Hbase建表

    表名 realtime_ad_stat
    行键 ADID_Province_20181212 ADID_City_20181212 ADID_UID_20181212
    列簇 stat
    view_cnt、click_cnt
    # 创建表
    create 'realtime_ad_stat',{NAME => 'stat',VERSIONS => 2147483647}
    
    # 查看表
    list
    
    # 清空数据
    truncate 'realtime_ad_stat'
    
    # 删除表
    disable 'realtime_ad_stat'
    drop 'realtime_ad_stat'
    

    4.编写Storm程序

    4.1.AdTopology

    public class AdTopology {
        public static void main(String[] args) throws Exception {
            TopologyBuilder topologyBuilder = new TopologyBuilder();
    
            KafkaSpoutConfig<String, String> kafkaSpoutConfig =
                    KafkaSpoutConfig.builder("hadoop1:9092,hadoop2:9092,hadoop3:9092", "AD")
                            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "STORM_AD_GROUP")
                            .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST)
                            .build();
            topologyBuilder.setSpout("KafkaSpout", new KafkaSpout(kafkaSpoutConfig), 2);
            topologyBuilder.setBolt("me.jinkun.ad.storm.LogToModelBolt", new LogToModelBolt(), 2).localOrShuffleGrouping("KafkaSpout");
            topologyBuilder.setBolt("me.jinkun.ad.storm.ToHbaseBolt", new ToHbaseBolt(), 4).localOrShuffleGrouping("me.jinkun.ad.storm.LogToModelBolt");
    
            StormTopology topology = topologyBuilder.createTopology();
            Config config = new Config();
            config.setDebug(false);
    
            if (args != null && args.length > 0) {
                //运行集群模式
                config.setNumWorkers(4);
                StormSubmitter.submitTopology(args[0], config, topologyBuilder.createTopology());
            } else {
                LocalCluster localCluster = new LocalCluster();
                localCluster.submitTopology("AdTopology", config, topology);
            }
        }
    }
    

    从Kafka里读取Topic为AD的最新的日志消息并发送个LogToModelBolt

    4.2.LogToModelBolt

    public class LogToModelBolt extends BaseBasicBolt {
    
        private static final Logger LOG = LoggerFactory.getLogger(LogToModelBolt.class);
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            // 2014-01-13   19:11:55    {"adid":"31789","uid":"9871","action":"view"}    63.237.239.3    北京 北京
            String line = input.getStringByField("value");
            if (LOG.isInfoEnabled()) {
                LOG.info("line:[{}]", line);
            }
            String[] arr = line.split("\t", -1);
            if (arr.length == 6) {
                String date = arr[0].trim().replace("-", "");
                String time = arr[1].trim();
                String json = arr[2].trim();
                String ip = arr[3].trim();
                String province = arr[4].trim();
                String city = arr[5].trim();
    
                if (StringUtils.isNotEmpty(json)) {
                    Ad ad = new Gson().fromJson(json, Ad.class);
                    if (null != ad && StringUtils.isNotEmpty(ad.getAdid())) {
                        // 省
                        if (StringUtils.isNotEmpty(province)) {
                            String rowkey = ad.getAdid() + "_" + province + "_" + date;
                            collector.emit(new Values(ad.getAction(), rowkey, 1L));
                        }
    
                        // 市
                        if (StringUtils.isNotEmpty(city)) {
                            String rowkey = ad.getAdid() + "_" + city + "_" + date;
                            collector.emit(new Values(ad.getAction(), rowkey, 1L));
                        }
    
                        // 客户端
                        if (StringUtils.isNotEmpty(province)) {
                            String rowkey = ad.getAdid() + "_" + ad.getUid() + "_" + date;
                            collector.emit(new Values(ad.getAction(), rowkey, 1L));
                        }
                    }
                }
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("action", "rowkey", "cnt"));
        }
    }
    

    解析Log并转化为Model,发送给ToHbaseBolt

    4.3.ToHbaseBolt

    public class ToHbaseBolt extends BaseBasicBolt {
    
        private static final Logger LOG = LoggerFactory.getLogger(ToHbaseBolt.class);
    
        private Table table;
    
        @Override
        public void prepare(Map stormConf, TopologyContext context) {
            try {
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum", "hadoop1:2181,hadoop2:2181,hadoop3:2181");
                Connection conn = ConnectionFactory.createConnection(conf);
                table = conn.getTable(TableName.valueOf("realtime_ad_stat"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void execute(Tuple input, BasicOutputCollector collector) {
            String action = input.getStringByField("action");
            String rowkey = input.getStringByField("rowkey");
            Long pv = input.getLongByField("cnt");
    
            try {
                if ("view".equals(action)) {
                    table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"), pv);
                }
                if ("click".equals(action)) {
                    table.incrementColumnValue(Bytes.toBytes(rowkey), Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"), pv);
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
    
        }
    }
    

    ToHbaseBolt 将处理后的数据写入到Hbase表里

    5.Kafka

    5.1.创建名为AD的Topic

    #查看
    kafka-topics.sh --describe \
    --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka
    
    #创建AD
    kafka-topics.sh --create \
    --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
    --topic AD \
    --partitions 3 \
    --replication-factor 3
    
    #消费者AD
    kafka-console-consumer.sh \
    --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
    --topic AD \
    --from-beginning
    
    #删除
    kafka-topics.sh --delete \
    --zookeeper hadoop1:2181,hadoop2:2181,hadoop3:2181/kafka \
    --topic AD
    

    5.2.模拟发送消息

    public class ProducerClient {
    
        private static final Logger LOG = LoggerFactory.getLogger(ProducerClient.class);
        private static final String[] PROVINCES_CITIES = new String[]{
                "山东\t济南",
                "河北\t石家庄",
                "吉林\t长春",
                "黑龙江\t哈尔滨",
                "辽宁\t沈阳",
                "内蒙古\t呼和浩特",
                "新疆\t乌鲁木齐",
                "甘肃\t兰州",
                "宁夏\t银川",
                "山西\t太原",
                "陕西\t西安",
                "河南\t郑州",
                "安徽\t合肥",
                "江苏\t南京",
                "浙江\t杭州",
                "福建\t福州",
                "广东\t广州",
                "江西\t南昌",
                "海南\t海口",
                "广西\t南宁",
                "贵州\t贵阳",
                "湖南\t长沙",
                "湖北\t武汉",
                "四川\t成都",
                "云南\t昆明",
                "西藏\t拉萨",
                "青海\t西宁",
                "天津\t天津",
                "上海\t上海",
                "重庆\t重庆",
                "北京\t北京",
                "台湾\t台北",
                "香港\t香港",
                "澳门\t澳门"
        };
        private static final String[] ACTIONS = new String[]{
                "view", "click"
        };
        private static final String[] ADIDS = new String[]{
                "1", "2", "3", "4", "5"
        };
    
        public static void main(String[] args) throws Exception {
            Properties props = new Properties();
            props.put("bootstrap.servers", "hadoop1:9092,hadoop2:9092,hadoop3:9092");
            props.put("acks", "all");
            props.put("retries", 0);
            props.put("batch.size", 16384);
            props.put("linger.ms", 1);
            props.put("buffer.memory", 33554432);
            props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
            org.apache.kafka.clients.producer.KafkaProducer<String, String> kafkaProducer = new org.apache.kafka.clients.producer.KafkaProducer(props);
            boolean flag = true;
            if (flag) {
                for (int i = 0; i < 2000; i++) {
                    //3、发送数据
                    //2014-01-13   19:11:55    {"adid":"31789","uid":"9871"}    63.237.239.3    北京市 北京市
                    StringBuilder sb = new StringBuilder();
                    //sb.append(new SimpleDateFormat("yyyy-MM-dd").format(date));
                    sb.append("2018-08-10");
                    sb.append("\t");
                    sb.append("12:00:00");
                    sb.append("\t");
                    sb.append("{\"adid\":\"" + ADIDS[new Random().nextInt(ADIDS.length)] + "\",\"uid\":\"" + new Random().nextInt(200) + "\",\"action\":\"" + ACTIONS[new Random().nextInt(ACTIONS.length)] + "\"}");
                    sb.append("\t");
                    sb.append(new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255) + "." + new Random().nextInt(255));
                    sb.append("\t");
                    sb.append(PROVINCES_CITIES[new Random().nextInt(PROVINCES_CITIES.length)]);
                    kafkaProducer.send(new ProducerRecord("AD", sb.toString()));
                }
                Thread.sleep(1000);
                kafkaProducer.flush();
    
                if (LOG.isInfoEnabled()) {
                    LOG.info("{}", "发送消息完成");
                }
            }
    
            kafkaProducer.close();
        }
    }
    
    部分日志截图

    6.Hbase数据查询

    public Map<String, Object> get(Table table, String adid, String date, String province) {
      try {
        if (StringUtils.isNotEmpty(date)) {
          date = date.replace("-", "");
        }
    
        Map<String, Object> map = Maps.newHashMapWithExpectedSize(5);
        map.put("adid", adid);
        map.put("date", date);
        map.put("province", province);
    
        // adid_province_date or adid_city_date
        String rowKey = adid + "_" + province + "_" + date;
    
        Get get = new Get(Bytes.toBytes(rowKey));
        Result result = table.get(get);
    
        //获取stat:view_cnt
        long viewCnt = 0L;
        byte[] viewBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("view_cnt"));
        if (viewBytes != null) {
          viewCnt = Bytes.toLong(viewBytes);
        }
        map.put("view", viewCnt);
    
        //获取stat:click_cnt
        long clickCnt = 0L;
        byte[] clickBytes = result.getValue(Bytes.toBytes("stat"), Bytes.toBytes("click_cnt"));
        if (clickBytes != null) {
          clickCnt = Bytes.toLong(clickBytes);
        }
        map.put("click", clickCnt);
        return map;
      } catch (IOException e) {
        e.printStackTrace();
        throw new ServiceException("查询列表失败");
      }
    }
    

    使用Hbase客户端将realtime_ad_stat表里的数据封装成Map对象并转为Json给前端展示

    {
        "data":[
            {
                "date":"20180810",
                "view":6,
                "adid":"1",
                "province":"山东",
                "click":4
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"河北",
                "click":8
            },
            {
                "date":"20180810",
                "view":2,
                "adid":"1",
                "province":"吉林",
                "click":4
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"黑龙江",
                "click":2
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"辽宁",
                "click":7
            },
            {
                "date":"20180810",
                "view":6,
                "adid":"1",
                "province":"内蒙古",
                "click":5
            },
            {
                "date":"20180810",
                "view":10,
                "adid":"1",
                "province":"新疆",
                "click":6
            },
            {
                "date":"20180810",
                "view":12,
                "adid":"1",
                "province":"甘肃",
                "click":5
            },
            {
                "date":"20180810",
                "view":11,
                "adid":"1",
                "province":"宁夏",
                "click":5
            },
            {
                "date":"20180810",
                "view":5,
                "adid":"1",
                "province":"山西",
                "click":5
            },
            {
                "date":"20180810",
                "view":7,
                "adid":"1",
                "province":"陕西",
                "click":5
            },
            {
                "date":"20180810",
                "view":3,
                "adid":"1",
                "province":"河南",
                "click":6
            },
            {
                "date":"20180810",
                "view":1,
                "adid":"1",
                "province":"安徽",
                "click":8
            },
            {
                "date":"20180810",
                "view":6,
                "adid":"1",
                "province":"江苏",
                "click":10
            },
            {
                "date":"20180810",
                "view":12,
                "adid":"1",
                "province":"浙江",
                "click":5
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"福建",
                "click":2
            },
            {
                "date":"20180810",
                "view":5,
                "adid":"1",
                "province":"广东",
                "click":13
            },
            {
                "date":"20180810",
                "view":8,
                "adid":"1",
                "province":"江西",
                "click":6
            },
            {
                "date":"20180810",
                "view":5,
                "adid":"1",
                "province":"海南",
                "click":1
            },
            {
                "date":"20180810",
                "view":6,
                "adid":"1",
                "province":"广西",
                "click":7
            },
            {
                "date":"20180810",
                "view":5,
                "adid":"1",
                "province":"贵州",
                "click":11
            },
            {
                "date":"20180810",
                "view":8,
                "adid":"1",
                "province":"湖南",
                "click":8
            },
            {
                "date":"20180810",
                "view":9,
                "adid":"1",
                "province":"湖北",
                "click":4
            },
            {
                "date":"20180810",
                "view":6,
                "adid":"1",
                "province":"四川",
                "click":8
            },
            {
                "date":"20180810",
                "view":2,
                "adid":"1",
                "province":"云南",
                "click":7
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"西藏",
                "click":4
            },
            {
                "date":"20180810",
                "view":4,
                "adid":"1",
                "province":"青海",
                "click":3
            },
            {
                "date":"20180810",
                "view":16,
                "adid":"1",
                "province":"天津",
                "click":4
            },
            {
                "date":"20180810",
                "view":12,
                "adid":"1",
                "province":"上海",
                "click":12
            },
            {
                "date":"20180810",
                "view":10,
                "adid":"1",
                "province":"重庆",
                "click":16
            },
            {
                "date":"20180810",
                "view":10,
                "adid":"1",
                "province":"北京",
                "click":14
            },
            {
                "date":"20180810",
                "view":5,
                "adid":"1",
                "province":"台湾",
                "click":4
            },
            {
                "date":"20180810",
                "view":18,
                "adid":"1",
                "province":"香港",
                "click":10
            },
            {
                "date":"20180810",
                "view":8,
                "adid":"1",
                "province":"澳门",
                "click":12
            }
        ],
        "message":"操作成功!",
        "resultCode":"00000"
    }
    

    7.参考:

    EChats
    HBase企业应用开发实战 第8章
    Hadoop集群环境搭建(三台)
    Zookeeper集群安装
    Strom之WordCount
    Hbase之环境搭建
    Kafka之集群安装

    相关文章

      网友评论

          本文标题:Storm+Hbase广告实时统计

          本文链接:https://www.haomeiwen.com/subject/dfdkbftx.html