美文网首页
44.基于storm的电信移动项目

44.基于storm的电信移动项目

作者: 文茶君 | 来源:发表于2020-03-01 16:40 被阅读0次

项目需求

中国移动,联通,电信无线网络保障项目
3 大运营商:gsm(2g)
3g:移动 td-scdma(2.5g)、联通 wcdma(欧美)、电信 cdma2000
4g: 移动 td-lte; 联通 td-lte(时分复用)、fdd-lte(码分多址)

广州基站 5万个,北京市 8万左右

截止2016年5月,中国移动4G用户总数超过4.09亿户,中国联通4G用户数达到6367.9万,而中国电信4G用户达到7952万户。
2016年中国移动将大力发展4G网络,争取2016年4G用户量达到5亿。

项目背景:
提供更好的服务
1、 实时监控基站,提供快速网络调优预警
2、 哪些地点是用户的常态地点(经常驻留的地点)?
3、 网元、区域下有哪些常态用户(常驻在该网元下的用户)?
网元就是网络中被管理单元。就是这个用户经常在这个基站
4、 哪些终端限制了用户对业务的需求,哪些终端值得推广?
5、 哪些地点是用户使用业务的热点,需要结构优化?
例如商业区



功能:
1、 每个小区掉话实时分析
5 秒级别监控(实时监控 总共掉多少话)
每小时掉话(这个小时里面的掉话) 柱状图
2、 3g 上网上行流量监控
下行流量监控
3、 终端流量实时分析(每一个手机号实时流量分析)
上行 下行
4、 语音通话实时分析(通话汇总)
5、 掉话最多的小区 前 50 个
6、 流量最多小区 前 50
7、 最多终端前 50
8、 掉话最多终端 前 500
9、 用户最多终端(本市区使用某种型号手机最多,前 50)

名词解释
基站,就是我们通常讲的一个站点,由 BTS 主要设备构成。
小区:一个基站覆盖的范围就叫小区,每个小区都有独立的小区号,逻辑概念
扇区:通常一个基站分为三个扇区,每个扇区打向一个方向,每个扇区覆盖 120 度的范围

BSS:是基站子系统
BTS:是基站收发信台
BSC:是基站控制器
MSC:移动业务交换中心

GSM 系统的基本结构:
GSM 可分为三个子系统,基站子系统 BSS,网络交换子系统 NSS,网络管理子系统。
基站子系统 BSS 包括:
BTS:基站收发信台
BSC:基站控制器
TC: 码变换器
网络交换子系统 NSS:
MSC:移动业务交换中心
AUC:鉴权中心
HLR:本地用户位置寄存器
VLR:外来用户位置寄存器
EIR:设备身份寄存器
网络管理子系统 NMS:
OMC:操作维护中心
NMC:网络管理中心
空中接口:指的是移动终端(手机)和基站之间的接口,一般是指的协议。

掉话原因:
网络的掉话按照信道的方式可分为两种形式:
一类是在 SDCCH 信道上的掉话
另一类是在 TCH 信道上的掉话
SDCCH 的掉话是指在 BSC 给 MS 分配了 SDCCH 信道,而 TCH 信道还没有分配成功期间发生的掉话。TCH 的掉话是指在 BSC 给 MS 成功分配了 TCH 信道后,发生的不正常 TCH释放


架构


中国移动项目架构图.jpg

在这里我们并不能得到电线的官方数据,所以我们只能使用模拟数据
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端

package kafka.productor;

import java.util.Properties;
import java.util.Random;

import backtype.storm.utils.Utils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import tools.DateFmt;

/***
 * 模拟发送数据到kafka中
 * 
 * @author hadoop
 *
 */
public class CellProducer extends Thread {

    // bin/kafka-topics.sh --create --zookeeper localhost:2181
    // --replication-factor 3 --partitions 5 --topic cmcccdr
    private final kafka.javaapi.producer.Producer<Integer, String> producer;
    private final String topic;
    private final Properties props = new Properties();

    public CellProducer(String topic) {
        props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
        props.put("metadata.broker.list", KafkaProperties.broker_list);
        producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
        this.topic = topic;
    }

    /*
     * public void run() { // order_id,order_amt,create_time,province_id Random
     * random = new Random(); String[] cell_num = { "29448-37062",
     * "29448-51331", "29448-51331","29448-51333", "29448-51343" }; String[]
     * drop_num = { "0","1","2"};//掉话1(信号断断续续) 断话2(完全断开)
     * 
     * // Producer.java // record_time, imei, cell,
     * ph_num,call_num,drop_num,duration,drop_rate,net_type,erl // 2011-06-28
     * 14:24:59.867,356966,29448-37062,0,0,0,0,0,G,0 // 2011-06-28
     * 14:24:59.867,352024,29448-51331,0,0,0,0,0,G,0 // 2011-06-28
     * 14:24:59.867,353736,29448-51331,0,0,0,0,0,G,0 // 2011-06-28
     * 14:24:59.867,353736,29448-51333,0,0,0,0,0,G,0 // 2011-06-28
     * 14:24:59.867,351545,29448-51333,0,0,0,0,0,G,0 // 2011-06-28
     * 14:24:59.867,353736,29448-51343,1,0,0,8,0,G,0 int i =0 ; NumberFormat nf
     * = new DecimalFormat("000000"); while(true) { i ++ ; // String messageStr
     * = i+"\t"+cell_num[random.nextInt(cell_num.length)]+"\t"+DateFmt.
     * getCountDate(null,
     * DateFmt.date_long)+"\t"+drop_num[random.nextInt(drop_num.length)] ;
     * String testStr = nf.format(random.nextInt(10)+1);
     * 
     * String messageStr =
     * i+"\t"+("29448-"+testStr)+"\t"+DateFmt.getCountDate(null,
     * DateFmt.date_long)+"\t"+drop_num[random.nextInt(drop_num.length)] ;
     * 
     * System.out.println("product:"+messageStr); producer.send(new
     * KeyedMessage<Integer, String>(topic, messageStr)); Utils.sleep(1000) ; //
     * if (i==500) { // break; // } }
     * 
     * }
     */
    public void run() {
        Random random = new Random();
        String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" };
        // 正常0; 掉话1(信号断断续续); 断话2(完全断开)
        String[] drop_num = { "0", "1", "2" };
        int i = 0;
        while (true) {
            i++;
            String testStr = String.format("%06d", random.nextInt(10) + 1);

            // messageStr: 2494 29448-000003 2016-01-05 10:25:17 1
            //
            String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long)
                    + "\t" + drop_num[random.nextInt(drop_num.length)];
            System.out.println("product:" + messageStr);
            producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
            Utils.sleep(1000);
            // if(i == 500) {
            // break;
            // }
        }
    }

    public static void main(String[] args) {
        // topic设置
        CellProducer producerThread = new CellProducer(KafkaProperties.Cell_Topic);

        // 启动线程生成数据
        producerThread.start();

    }
}

1.png

运行打开kafka

2.png

运行java客户端上面的代码,产生上图数据
bolt过滤数据
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端

package bolt;

import java.util.Map;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import tools.DateFmt;

public class CellFilterBolt implements IBasicBolt {

    /**
     * 
     */
    private static final long serialVersionUID = 1L;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        String logString = input.getString(0);
        try {
            if (input != null) {
                String arr[] = logString.split("\\t");
//切割
                // messageStr格式:消息编号\t小区编号\t时间\t状态
                // 例:   2494  29448-000003  2016-01-05 10:25:17  1
                // DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05
                // 发出的数据格式: 时间, 小区编号, 掉话状态 
                collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3]));
//DateFmt.getCountDate()自定义函数,精确到日

            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("date", "cell_num", "drop_num"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    @Override
    public void cleanup() {
        // TODO Auto-generated method stub
    }

    @Override
    public void prepare(Map map, TopologyContext arg1) {
        // TODO Auto-generated method stub
    }

}

流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
消费数据

package topo;

import java.util.ArrayList;
import java.util.List;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolt.CellDaoltBolt;
import bolt.CellFilterBolt;
import cmcc.constant.Constants;
import kafka.productor.KafkaProperties;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;

public class KafkaOneCellMonintorTopology {

    /**
     * @param args
     */
    public static void main(String[] args) {

        TopologyBuilder builder = new TopologyBuilder();

        ZkHosts zkHosts = new ZkHosts(Constants.KAFKA_ZOOKEEPER_LIST);
        SpoutConfig spoutConfig = new SpoutConfig(zkHosts, 
                "mylog_cmcc", 
                "/MyKafka", // 偏移量offset的根目录
                "MyTrack"); // 对应一个应用
        List<String> zkServers = new ArrayList<String>();
        System.out.println(zkHosts.brokerZkStr);
        for (String host : zkHosts.brokerZkStr.split(",")) {
            zkServers.add(host.split(":")[0]);
        }

        spoutConfig.zkServers = zkServers;
        spoutConfig.zkPort = 2181;
        // 是否从头开始消费
        spoutConfig.forceFromStart = false; 
        spoutConfig.socketTimeoutMs = 60 * 1000;
        // String
        spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); 

        builder.setSpout("spout", new KafkaSpout(spoutConfig), 3);
        builder.setBolt("cellBolt", new CellFilterBolt(), 3).shuffleGrouping("spout");
        builder.setBolt("CellDaoltBolt", new CellDaoltBolt(), 5)
                .fieldsGrouping("cellBolt", new Fields("cell_num"));//按字段分发(小区编号)

        
        Config conf = new Config();
        conf.setDebug(false);
        conf.setNumWorkers(5);
        if (args.length > 0) {
            try {
                StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
            } catch (AlreadyAliveException e) {
                e.printStackTrace();
            } catch (InvalidTopologyException e) {
                e.printStackTrace();
            }
        } else {
            System.out.println("Local running");
            LocalCluster localCluster = new LocalCluster();
            localCluster.submitTopology("mytopology", conf, builder.createTopology());
        }

    }

}

流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
下面写入HBase数据

ackage bolt;

import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;

import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import cmcc.hbase.dao.HBaseDAO;
import cmcc.hbase.dao.impl.HBaseDAOImp;
import tools.DateFmt;

public class CellDaoltBolt implements IBasicBolt {

    private static final long serialVersionUID = 1L;

    HBaseDAO dao = null;

    long beginTime = System.currentTimeMillis();
    long endTime = 0;

    // 通话总数
    Map<String, Long> cellCountMap = new HashMap<String, Long>();
    // 掉话数
    Map<String, Long> cellDropCountMap = new HashMap<String, Long>();

    String todayStr = null;

    @Override
    public void execute(Tuple input, BasicOutputCollector collector) {
        // input为2016-01-05,29448-000003,1
        if (input != null) {
            String dateStr = input.getString(0);
            String cellNum = input.getString(1);
            String dropNum = input.getString(2);

            // 判断是否是当天,不是当天 就清除map 避免内存过大
            // 基站数目 大概5-10万(北京市)
            // http://bbs.c114.net/thread-793707-1-1.html
            todayStr = DateFmt.getCountDate(null, DateFmt.date_short);

            // 跨天的处理,大于当天的数据来了,就清空两个map
            // 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加
            // 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断
            if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) {
                cellCountMap.clear();
                cellDropCountMap.clear();
            }

            // 当前cellid的通话数统计
            Long cellAll = cellCountMap.get(cellNum);
            if (cellAll == null) {
                cellAll = 0L;
            }
            cellCountMap.put(cellNum, ++cellAll);

            // 掉话数统计,大于0就是掉话
            Long cellDropAll = cellDropCountMap.get(cellNum);
            int t = Integer.parseInt(dropNum);
            if (t > 0) {
                if (cellDropAll == null) {
                    cellDropAll = 0L;
                }
                cellDropCountMap.put(cellNum, ++cellDropAll);
            }

            // 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次
            // 2.也可以检测map里面数据size 写数据到 hbase
            // 3.自己可以设计一些思路 ,当然 采用redis 也不错
            // 4.采用tick定时存储也是一个思路
            endTime = System.currentTimeMillis();

            // flume+kafka 集成
            // 当前掉话数
            // 1.每小时掉话数目
            // 2.每小时 通话数据
            // 3.每小时 掉话率
            // 4.昨天的历史轨迹
            // 5.同比去年今天的轨迹(如果有数据)

            // hbase 按列存储的数据()
            // 10万
            // rowkey cellnum+ day
            if (endTime - beginTime >= 5000) {
                // 5s 写一次库
                if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) {
                    // x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移
                    String arr[] = this.getAxsi();

                    // 当前日期
                    String today = DateFmt.getCountDate(null, DateFmt.date_short);
                    // 当前分钟
                    String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute);

                    // cellCountMap为通话数据的map
                    Set<String> keys = cellCountMap.keySet();
                    for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
                        
                        String key_cellnum = (String) iterator.next();
                        
                        System.out.println("key_cellnum: " + key_cellnum + "***" 
                                + arr[0] + "---" 
                                + arr[1] + "---"
                                + cellCountMap.get(key_cellnum) + "----" 
                                + cellDropCountMap.get(key_cellnum));
                        
                        //写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91}
                        
                        dao.insert("cell_monitor_table", 
                                key_cellnum + "_" + today, 
                                "cf", 
                                new String[] { today_minute },
                                new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:"
                                        + cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" }
                                );
                    }
                }
                // 需要重置初始时间
                beginTime = System.currentTimeMillis();
            }
        }
    }

    @Override
    public void prepare(Map stormConf, TopologyContext context) {
        // TODO Auto-generated method stub
        dao = new HBaseDAOImp();
        Calendar calendar = Calendar.getInstance();
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        // TODO Auto-generated method stub
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        // TODO Auto-generated method stub
        return null;
    }

    // 获取X坐标,就是当前时间的坐标,小时是单位
    public String[] getAxsi() {
        // 取当前时间
        Calendar c = Calendar.getInstance();
        int hour = c.get(Calendar.HOUR_OF_DAY);
        int minute = c.get(Calendar.MINUTE);
        int sec = c.get(Calendar.SECOND);
        // 总秒数
        int curSecNum = hour * 3600 + minute * 60 + sec;

        // (12*3600+30*60+0)/3600=12.5
        Double xValue = (double) curSecNum / 3600;
        // 时:分,数值 数值是时间的偏移
        String[] end = { hour + ":" + minute, xValue.toString() };
        return end;
    }

    @Override
    public void cleanup() {
    }
}

hbase配置(这里采用的是伪分布式)



1.png 2.png 3.png

相关文章

网友评论

      本文标题:44.基于storm的电信移动项目

      本文链接:https://www.haomeiwen.com/subject/nyikhhtx.html