    • 1.生产数据发送到kafka,数据格式为:
      100 29448-000005 2021-01-17 21:05:21 2
      序号 小区编号 时间 掉话代码(0正常,1掉话,2断话)
    • 2.storm作为消费者从kafka拉取数据作为spout的数据源
    • 3.经过storm处理后存放到hbase中




    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
            <!-- 跳过测试 -->
            <!-- https://mvnrepository.com/artifact/org.apache.hbase/hbase-client -->


    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <?xml version="1.0" encoding="UTF-8"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <!-- Put site-specific property overrides in this file. -->



    [root@node02 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node02 ~]# jps
    15724 Jps
    15677 QuorumPeerMain
    [root@node03 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node03 ~]# jps
    11398 Jps
    11351 QuorumPeerMain
    [root@node04 ~]# zkServer.sh start
    ZooKeeper JMX enabled by default
    Using config: /opt/zookeeper/apache-zookeeper-3.6.2/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@node04 ~]# jps
    6138 QuorumPeerMain
    6191 Jps
    [root@node02 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    [root@node03 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    [root@node04 ~]# kafka-server-start.sh /opt/kafka/kafka_2.13-2.6.0/config/server.properties
    [root@node02 ~]# jps
    16164 Jps
    15739 Kafka
    15677 QuorumPeerMain
    [root@node03 ~]# jps
    11414 Kafka
    11351 QuorumPeerMain
    11835 Jps
    [root@node04 ~]# jps
    6629 Jps
    6138 QuorumPeerMain
    6207 Kafka


    package cn.itbin.kafka;
    import cn.itbin.constants.Constant;
    import cn.itbin.tools.DateFmt;
    import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import org.apache.kafka.common.utils.Utils;
    import java.util.Properties;
    import java.util.Random;
     * @author chenxiaogao
     * @className CellProducer
     * @description TODO
     * @date 2021/1/15
    public class CellProducer extends Thread {
        private final Producer<String, String> producer;
        private final String topic;
        private final Properties props = new Properties();
        public CellProducer(String topic) {
            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constant.BROKER_LIST);
            producer = new KafkaProducer<>(props);
            this.topic = topic;
        public void run() {
            Random random = new Random();
            String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" };
            // 正常0; 掉话1(信号断断续续); 断话2(完全断开)
            String[] drop_num = { "0", "1", "2" };
            int i = 0;
            while (true) {
                String testStr = String.format("%06d", random.nextInt(10) + 1);
                // messageStr: 2494 29448-000003 2016-01-05 10:25:17 1
                String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long)
                        + "\t" + drop_num[random.nextInt(drop_num.length)];
                System.out.println("product:" + messageStr);
                producer.send(new ProducerRecord<String, String>(topic, messageStr));
        public static void main(String[] args) {
            CellProducer test = new CellProducer(Constant.TOPIC_NAME);


    package cn.itbin.constants;
     * @author chenxiaogao
     * @className Constant
     * @description TODO
     * @date 2021/1/17
    public class Constant {
        public static final String BROKER_LIST = "node02:9092,node03:9092,node04:9092";
        public static final String TOPIC_NAME = "test";


    [root@node02 ~]# kafka-console-consumer.sh --bootstrap-server node02:9092,node03:9092,node04:9092 --from-beginning --topic test  
    1       29448-000010    2021-01-17 21:45:48     2
    2       29448-000008    2021-01-17 21:45:54     0
    3       29448-000008    2021-01-17 21:45:59     1
    4       29448-000010    2021-01-17 21:46:04     2
    5       29448-000009    2021-01-17 21:46:09     0
    6       29448-000007    2021-01-17 21:46:14     1
    7       29448-000003    2021-01-17 21:46:19     1
    8       29448-000001    2021-01-17 21:46:24     2



    package cn.itbin.skc;
    import cn.itbin.blot.CellDaoltBolt;
    import cn.itbin.blot.CellFilterBolt;
    import cn.itbin.constants.Constant;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.storm.Config;
    import org.apache.storm.LocalCluster;
    import org.apache.storm.StormSubmitter;
    import org.apache.storm.kafka.spout.KafkaSpout;
    import org.apache.storm.kafka.spout.KafkaSpoutConfig;
    import org.apache.storm.topology.TopologyBuilder;
    import org.apache.storm.tuple.Fields;
     * @author chenxiaogao
     * @className SkcTopo
     * @description TODO
     * @date 2021/1/16
    public class SkcTopo {
        public static void main(String[] args) throws Exception {
            final TopologyBuilder builder = new TopologyBuilder();
            builder.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(Constant.BROKER_LIST, Constant.TOPIC_NAME)), 3);
            builder.setBolt("cell_filter", new CellFilterBolt(),3).shuffleGrouping("kafka_spout");
            builder.setBolt("cell_dao", new CellDaoltBolt(),5).fieldsGrouping("cell_filter", new Fields("cell_num"));
            // 如果外部传参 cluster 则代表线上环境启动,否则代表本地启动
            if (args.length > 0 && args[0].equals("cluster")) {
                try {
                    StormSubmitter.submitTopology("ClusterReadingFromKafkaApp", new Config(), builder.createTopology());
                } catch (Exception e) {
            } else {
                LocalCluster cluster = new LocalCluster();
                        new Config(), builder.createTopology());
        private static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(String bootstrapServers, String topic) {
            return KafkaSpoutConfig.builder(bootstrapServers, topic)
                    // 除了分组 ID,以下配置都是可选的。分组 ID 必须指定,否则会抛出 InvalidGroupIdException 异常
                    .setProp(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group")
                    // 定时提交偏移量的时间间隔,默认是 15s


    package cn.itbin.blot;
    import cn.itbin.tools.DateFmt;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Fields;
    import org.apache.storm.tuple.Tuple;
    import org.apache.storm.tuple.Values;
    import java.util.Arrays;
    import java.util.List;
    import java.util.Map;
    public class CellFilterBolt implements IBasicBolt {
        private static final long serialVersionUID = 1L;
        public void execute(Tuple input, BasicOutputCollector collector) {
            List<Object> values = input.getValues();
            String value = null;
            if (values.size() > 4) {
                value = (String) values.get(4);
            try {
                if (value != null) {
                    String arr[] = value.split("\\t");
                    System.out.println("arr = " + arr.length);
                    String s = Arrays.toString(arr);
                    System.out.println("s = " + s);
                    // messageStr格式:消息编号\t小区编号\t时间\t状态
                    // 例:   2494  29448-000003  2016-01-05 10:25:17  1
                    // DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05
                    // 发出的数据格式: 时间, 小区编号, 掉话状态 
                    collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3]));
            } catch (Exception e) {
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("date", "cell_num", "drop_num"));
        public Map<String, Object> getComponentConfiguration() {
            return null;
        public void cleanup() {
            // TODO Auto-generated method stub
        public void prepare(Map map, TopologyContext arg1) {
            // TODO Auto-generated method stub


    package cn.itbin.blot;
    import cn.itbin.hbase.HBaseDAO;
    import cn.itbin.hbase.impl.HBaseDAOImpl;
    import cn.itbin.tools.DateFmt;
    import org.apache.storm.task.TopologyContext;
    import org.apache.storm.topology.BasicOutputCollector;
    import org.apache.storm.topology.IBasicBolt;
    import org.apache.storm.topology.OutputFieldsDeclarer;
    import org.apache.storm.tuple.Tuple;
    import java.util.Calendar;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Set;
    public class CellDaoltBolt implements IBasicBolt {
        private static final long serialVersionUID = 1L;
        HBaseDAO dao = null;
        long beginTime = System.currentTimeMillis();
        long endTime = 0;
        // 通话总数
        Map<String, Long> cellCountMap = new HashMap<String, Long>();
        // 掉话数
        Map<String, Long> cellDropCountMap = new HashMap<String, Long>();
        String todayStr = null;
        public void execute(Tuple input, BasicOutputCollector collector) {
            // input为2016-01-05,29448-000003,1
            if (input != null) {
                String dateStr = input.getString(0);
                String cellNum = input.getString(1);
                String dropNum = input.getString(2);
                // 判断是否是当天,不是当天 就清除map 避免内存过大
                // 基站数目 大概5-10万(北京市)
                // http://bbs.c114.net/thread-793707-1-1.html
                todayStr = DateFmt.getCountDate(null, DateFmt.date_short);
                // 跨天的处理,大于当天的数据来了,就清空两个map
                // 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加
                // 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断
                if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) {
                // 当前cellid的通话数统计
                Long cellAll = cellCountMap.get(cellNum);
                if (cellAll == null) {
                    cellAll = 0L;
                cellCountMap.put(cellNum, ++cellAll);
                // 掉话数统计,大于0就是掉话
                Long cellDropAll = cellDropCountMap.get(cellNum);
                int t = Integer.parseInt(dropNum);
                if (t > 0) {
                    if (cellDropAll == null) {
                        cellDropAll = 0L;
                    cellDropCountMap.put(cellNum, ++cellDropAll);
                // 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次
                // 2.也可以检测map里面数据size 写数据到 hbase
                // 3.自己可以设计一些思路 ,当然 采用redis 也不错
                // 4.采用tick定时存储也是一个思路
                endTime = System.currentTimeMillis();
                // flume+kafka 集成
                // 当前掉话数
                // 1.每小时掉话数目
                // 2.每小时 通话数据
                // 3.每小时 掉话率
                // 4.昨天的历史轨迹
                // 5.同比去年今天的轨迹(如果有数据)
                // hbase 按列存储的数据()
                // 10万
                // rowkey cellnum+ day
                if (endTime - beginTime >= 5000) {
                    // 5s 写一次库
                    if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) {
                        // x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移
                        String arr[] = this.getAxsi();
                        // 当前日期
                        String today = DateFmt.getCountDate(null, DateFmt.date_short);
                        // 当前分钟
                        String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute);
                        // cellCountMap为通话数据的map
                        Set<String> keys = cellCountMap.keySet();
                        for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
                            String key_cellnum = (String) iterator.next();
                            System.out.println("key_cellnum: " + key_cellnum + "***" 
                                    + arr[0] + "---" 
                                    + arr[1] + "---"
                                    + cellCountMap.get(key_cellnum) + "----" 
                                    + cellDropCountMap.get(key_cellnum));
                            //写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91}
                                    key_cellnum + "_" + today, 
                                    new String[] { today_minute },
                                    new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:"
                                            + cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" }
                    // 需要重置初始时间
                    beginTime = System.currentTimeMillis();
        public void prepare(Map stormConf, TopologyContext context) {
            // TODO Auto-generated method stub
            dao = new HBaseDAOImpl();
            Calendar calendar = Calendar.getInstance();
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            // TODO Auto-generated method stub
        public Map<String, Object> getComponentConfiguration() {
            // TODO Auto-generated method stub
            return null;
        // 获取X坐标,就是当前时间的坐标,小时是单位
        public String[] getAxsi() {
            // 取当前时间
            Calendar c = Calendar.getInstance();
            int hour = c.get(Calendar.HOUR_OF_DAY);
            int minute = c.get(Calendar.MINUTE);
            int sec = c.get(Calendar.SECOND);
            // 总秒数
            int curSecNum = hour * 3600 + minute * 60 + sec;
            // (12*3600+30*60+0)/3600=12.5
            Double xValue = (double) curSecNum / 3600;
            // 时:分,数值 数值是时间的偏移
            String[] end = { hour + ":" + minute, xValue.toString() };
            return end;
        public void cleanup() {


    [root@node01 ~]# start-all.sh
    Starting namenodes on [node01 node02 node03]
    Starting datanodes
    Starting journal nodes [node01 node02 node03]
    Starting ZK Failover Controllers on NN hosts [node01 node02 node03]
    Starting resourcemanagers on [ node01 node02 node03]
    Starting nodemanagers
    [root@node01 ~]# jps
    26368 NameNode
    26854 DFSZKFailoverController
    27639 Jps
    27272 ResourceManager
    26652 JournalNode
    [root@node01 ~]# start-hbase.sh 
    node04: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node04.out
    node03: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node03.out
    node02: running regionserver, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-regionserver-node02.out
    node02: running master, logging to /opt/hbase/hbase-2.3.3/bin/../logs/hbase-root-master-node02.out
    [root@node01 ~]# hbase shell
    Version 2.3.3, r3e4bf4bee3a08b25591b9c22fea0518686a7e834, Wed Oct 28 06:36:25 UTC 2020
    Took 0.0005 seconds                                                                                                              
    hbase(main):001:0> status
    1 active master, 1 backup masters, 1 servers, 0 dead, 0.0000 average load
    Took 1.3308 seconds   
    package cn.itbin.hbase;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.client.Result;
    import java.util.List;
    public interface HBaseDAO {
        public void save(Put put, String tableName);
        public void insert(String tableName, String rowKey, String family, String quailifer, String value);
        public void insert(String tableName, String rowKey, String family, String quailifer[], String value[]);
        public void save(List<Put> Put, String tableName);
        public Result getOneRow(String tableName, String rowKey);
        public List<Result> getRows(String tableName, String rowKey_like);
        public List<Result> getRows(String tableName, String rowKeyLike, String cols[]);
        public List<Result> getRows(String tableName, String startRow, String stopRow);
        public void deleteRecords(String tableName, String rowKeyLike);
        public void deleteTable(String tableName);
        public void createTable(String tableName,  String columnFamilies);
    package cn.itbin.hbase.impl;
    import cn.itbin.hbase.HBaseDAO;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.io.IOException;
    import java.util.List;
     * @author chenxiaogao
     * @className HBaseDAOImpl
     * @description TODO
     * @date 2021/1/17
    public class HBaseDAOImpl implements HBaseDAO {
        private static final String TABLE_NAME = "PHONE";
        private static final String CF_DEFAULT = "DEFAULT_COLUMN_PHONE";
        Admin admin = null;
        Connection connection = null;
        public HBaseDAOImpl()  {
            try {
                Configuration conf = HBaseConfiguration.create();
                conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
                conf.addResource(new Path("D:\\ideawork\\storm-kafka\\src\\main\\resources\\hbase-site.xml"));
                connection = ConnectionFactory.createConnection(conf);
                admin = connection.getAdmin();
            } catch (IOException e) {
        public void save(Put put, String tableName) {
        public void insert(String tableName, String rowKey, String family, String quailifer, String value) {
            Table table  = null;
            try {
                 table = connection.getTable(TableName.valueOf(tableName));
                Put put = new Put(rowKey.getBytes());
                put.addColumn(family.getBytes(), quailifer.getBytes(), value.getBytes());
            } catch (IOException e) {
            }finally {
                try {
                } catch (IOException e) {
        public void insert(String tableName, String rowKey, String family, String[] quailifer, String[] value) {
            Table table = null;
            try {
                table = connection.getTable(TableName.valueOf(tableName));
                Put put = new Put(rowKey.getBytes());
                // 批量添加
                for (int i = 0; i < quailifer.length; i++) {
                    String col = quailifer[i];
                    String val = value[i];
                    put.addColumn(family.getBytes(), col.getBytes(), val.getBytes());
            } catch (Exception e) {
            } finally {
                try {
                } catch (IOException e) {
        public void save(List<Put> Put, String tableName) {
        public Result getOneRow(String tableName, String rowKey) {
            return null;
        public List<Result> getRows(String tableName, String rowKey_like) {
            return null;
        public List<Result> getRows(String tableName, String rowKeyLike, String[] cols) {
            return null;
        public List<Result> getRows(String tableName, String startRow, String stopRow) {
            return null;
        public void deleteRecords(String tableName, String rowKeyLike) {
        public void deleteTable(String tableName) {
        public void createTable(String tableName, String columnFamilies) {
            TableDescriptor td = TableDescriptorBuilder.newBuilder(TableName.valueOf(tableName))
            try {
            } catch (IOException e) {
            System.out.print("Creating table. ");
    package cn.itbin.tools;
    import cn.itbin.hbase.impl.HBaseDAOImpl;
    import java.text.ParseException;
    import java.text.SimpleDateFormat;
    import java.util.Calendar;
    import java.util.Date;
    public class DateFmt {
        public static final String date_long = "yyyy-MM-dd HH:mm:ss";
        public static final String date_short = "yyyy-MM-dd";
        public static final String date_minute = "yyyyMMddHHmm";
        public static SimpleDateFormat sdf = new SimpleDateFormat(date_short);
        public static String getCountDate(String date, String pattern) {
            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
            Calendar cal = Calendar.getInstance();
            if (date != null) {
                try {
                    // 2016-01-05 10:25:17
                } catch (ParseException e) {
            return sdf.format(cal.getTime());
        public static String getCountDate(String date, String pattern, int step) {
            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
            Calendar cal = Calendar.getInstance();
            if (date != null) {
                try {
                } catch (ParseException e) {
            cal.add(Calendar.DAY_OF_MONTH, step);
            return sdf.format(cal.getTime());
        public static Date parseDate(String dateStr) throws Exception {
            return sdf.parse(dateStr);
        public static void main(String[] args) throws Exception {


    #创建hbase 表
    hbase(main):002:0> create 'cell_monitor_table','cf'
    hbase(main):004:0> scan 'cell_monitor_table'
    ROW                               COLUMN+CELL                                                                                    
     01                               column=cf:cf01, timestamp=2021-01-18T00:34:46.235, value=test_value                            
     29448-000001_2021-01-17          column=cf:202101171737, timestamp=2021-01-18T01:37:35.925, value={time_title:"17:37",xAxis:17.6



