项目需求
中国移动,联通,电信无线网络保障项目
3 大运营商:gsm(2g)
3g:移动 td-scdma(2.5g)、联通 wcdma(欧美)、电信 cdma2000
4g: 移动 td-lte; 联通 td-lte(时分复用)、fdd-lte(码分多址)
广州基站 5万个,北京市 8万左右
截止2016年5月,中国移动4G用户总数超过4.09亿户,中国联通4G用户数达到6367.9万,而中国电信4G用户达到7952万户。
2016年中国移动将大力发展4G网络,争取2016年4G用户量达到5亿。
项目背景:
提供更好的服务
1、 实时监控基站,提供快速网络调优预警
2、 哪些地点是用户的常态地点(经常驻留的地点)?
3、 网元、区域下有哪些常态用户(常驻在该网元下的用户)?
网元就是网络中被管理单元。就是这个用户经常在这个基站
4、 哪些终端限制了用户对业务的需求,哪些终端值得推广?
5、 哪些地点是用户使用业务的热点,需要结构优化?
例如商业区
功能:
1、 每个小区掉话实时分析
5 秒级别监控(实时监控 总共掉多少话)
每小时掉话(这个小时里面的掉话) 柱状图
2、 3g 上网上行流量监控
下行流量监控
3、 终端流量实时分析(每一个手机号实时流量分析)
上行 下行
4、 语音通话实时分析(通话汇总)
5、 掉话最多的小区 前 50 个
6、 流量最多小区 前 50
7、 最多终端前 50
8、 掉话最多终端 前 500
9、 用户最多终端(本市区使用某种型号手机最多,前 50)
名词解释
基站,就是我们通常讲的一个站点,由 BTS 主要设备构成。
小区:一个基站覆盖的范围就叫小区,每个小区都有独立的小区号,逻辑概念
扇区:通常一个基站分为三个扇区,每个扇区打向一个方向,每个扇区覆盖 120 度的范围
BSS:是基站子系统
BTS:是基站收发信台
BSC:是基站控制器
MSC:移动业务交换中心
GSM 系统的基本结构:
GSM 可分为三个子系统,基站子系统 BSS,网络交换子系统 NSS,网络管理子系统。
基站子系统 BSS 包括:
BTS:基站收发信台
BSC:基站控制器
TC: 码变换器
网络交换子系统 NSS:
MSC:移动业务交换中心
AUC:鉴权中心
HLR:本地用户位置寄存器
VLR:外来用户位置寄存器
EIR:设备身份寄存器
网络管理子系统 NMS:
OMC:操作维护中心
NMC:网络管理中心
空中接口:指的是移动终端(手机)和基站之间的接口,一般是指的协议。
掉话原因:
网络的掉话按照信道的方式可分为两种形式:
一类是在 SDCCH 信道上的掉话
另一类是在 TCH 信道上的掉话
SDCCH 的掉话是指在 BSC 给 MS 分配了 SDCCH 信道,而 TCH 信道还没有分配成功期间发生的掉话。TCH 的掉话是指在 BSC 给 MS 成功分配了 TCH 信道后,发生的不正常 TCH释放
架构
中国移动项目架构图.jpg
在这里我们并不能得到电线的官方数据,所以我们只能使用模拟数据
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
package kafka.productor;
import java.util.Properties;
import java.util.Random;
import backtype.storm.utils.Utils;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
import tools.DateFmt;
/***
* 模拟发送数据到kafka中
*
* @author hadoop
*
*/
public class CellProducer extends Thread {
// bin/kafka-topics.sh --create --zookeeper localhost:2181
// --replication-factor 3 --partitions 5 --topic cmcccdr
private final kafka.javaapi.producer.Producer<Integer, String> producer;
private final String topic;
private final Properties props = new Properties();
public CellProducer(String topic) {
props.put("serializer.class", "kafka.serializer.StringEncoder");// 字符串消息
props.put("metadata.broker.list", KafkaProperties.broker_list);
producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props));
this.topic = topic;
}
/*
* public void run() { // order_id,order_amt,create_time,province_id Random
* random = new Random(); String[] cell_num = { "29448-37062",
* "29448-51331", "29448-51331","29448-51333", "29448-51343" }; String[]
* drop_num = { "0","1","2"};//掉话1(信号断断续续) 断话2(完全断开)
*
* // Producer.java // record_time, imei, cell,
* ph_num,call_num,drop_num,duration,drop_rate,net_type,erl // 2011-06-28
* 14:24:59.867,356966,29448-37062,0,0,0,0,0,G,0 // 2011-06-28
* 14:24:59.867,352024,29448-51331,0,0,0,0,0,G,0 // 2011-06-28
* 14:24:59.867,353736,29448-51331,0,0,0,0,0,G,0 // 2011-06-28
* 14:24:59.867,353736,29448-51333,0,0,0,0,0,G,0 // 2011-06-28
* 14:24:59.867,351545,29448-51333,0,0,0,0,0,G,0 // 2011-06-28
* 14:24:59.867,353736,29448-51343,1,0,0,8,0,G,0 int i =0 ; NumberFormat nf
* = new DecimalFormat("000000"); while(true) { i ++ ; // String messageStr
* = i+"\t"+cell_num[random.nextInt(cell_num.length)]+"\t"+DateFmt.
* getCountDate(null,
* DateFmt.date_long)+"\t"+drop_num[random.nextInt(drop_num.length)] ;
* String testStr = nf.format(random.nextInt(10)+1);
*
* String messageStr =
* i+"\t"+("29448-"+testStr)+"\t"+DateFmt.getCountDate(null,
* DateFmt.date_long)+"\t"+drop_num[random.nextInt(drop_num.length)] ;
*
* System.out.println("product:"+messageStr); producer.send(new
* KeyedMessage<Integer, String>(topic, messageStr)); Utils.sleep(1000) ; //
* if (i==500) { // break; // } }
*
* }
*/
public void run() {
Random random = new Random();
String[] cell_num = { "29448-37062", "29448-51331", "29448-51331", "29448-51333", "29448-51343" };
// 正常0; 掉话1(信号断断续续); 断话2(完全断开)
String[] drop_num = { "0", "1", "2" };
int i = 0;
while (true) {
i++;
String testStr = String.format("%06d", random.nextInt(10) + 1);
// messageStr: 2494 29448-000003 2016-01-05 10:25:17 1
//
String messageStr = i + "\t" + ("29448-" + testStr) + "\t" + DateFmt.getCountDate(null, DateFmt.date_long)
+ "\t" + drop_num[random.nextInt(drop_num.length)];
System.out.println("product:" + messageStr);
producer.send(new KeyedMessage<Integer, String>(topic, messageStr));
Utils.sleep(1000);
// if(i == 500) {
// break;
// }
}
}
public static void main(String[] args) {
// topic设置
CellProducer producerThread = new CellProducer(KafkaProperties.Cell_Topic);
// 启动线程生成数据
producerThread.start();
}
}
1.png
运行打开kafka
2.png运行java客户端上面的代码,产生上图数据
bolt过滤数据
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
package bolt;
import java.util.Map;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import tools.DateFmt;
public class CellFilterBolt implements IBasicBolt {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
String logString = input.getString(0);
try {
if (input != null) {
String arr[] = logString.split("\\t");
//切割
// messageStr格式:消息编号\t小区编号\t时间\t状态
// 例: 2494 29448-000003 2016-01-05 10:25:17 1
// DateFmt.date_short是yyyy-MM-dd,把2016-01-05 10:25:17格式化2016-01-05
// 发出的数据格式: 时间, 小区编号, 掉话状态
collector.emit(new Values(DateFmt.getCountDate(arr[2], DateFmt.date_short), arr[1], arr[3]));
//DateFmt.getCountDate()自定义函数,精确到日
}
} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("date", "cell_num", "drop_num"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void cleanup() {
// TODO Auto-generated method stub
}
@Override
public void prepare(Map map, TopologyContext arg1) {
// TODO Auto-generated method stub
}
}
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
消费数据
package topo;
import java.util.ArrayList;
import java.util.List;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.generated.AlreadyAliveException;
import backtype.storm.generated.InvalidTopologyException;
import backtype.storm.spout.SchemeAsMultiScheme;
import backtype.storm.topology.TopologyBuilder;
import backtype.storm.tuple.Fields;
import bolt.CellDaoltBolt;
import bolt.CellFilterBolt;
import cmcc.constant.Constants;
import kafka.productor.KafkaProperties;
import storm.kafka.KafkaSpout;
import storm.kafka.SpoutConfig;
import storm.kafka.StringScheme;
import storm.kafka.ZkHosts;
public class KafkaOneCellMonintorTopology {
/**
* @param args
*/
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
ZkHosts zkHosts = new ZkHosts(Constants.KAFKA_ZOOKEEPER_LIST);
SpoutConfig spoutConfig = new SpoutConfig(zkHosts,
"mylog_cmcc",
"/MyKafka", // 偏移量offset的根目录
"MyTrack"); // 对应一个应用
List<String> zkServers = new ArrayList<String>();
System.out.println(zkHosts.brokerZkStr);
for (String host : zkHosts.brokerZkStr.split(",")) {
zkServers.add(host.split(":")[0]);
}
spoutConfig.zkServers = zkServers;
spoutConfig.zkPort = 2181;
// 是否从头开始消费
spoutConfig.forceFromStart = false;
spoutConfig.socketTimeoutMs = 60 * 1000;
// String
spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme());
builder.setSpout("spout", new KafkaSpout(spoutConfig), 3);
builder.setBolt("cellBolt", new CellFilterBolt(), 3).shuffleGrouping("spout");
builder.setBolt("CellDaoltBolt", new CellDaoltBolt(), 5)
.fieldsGrouping("cellBolt", new Fields("cell_num"));//按字段分发(小区编号)
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(5);
if (args.length > 0) {
try {
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
} catch (AlreadyAliveException e) {
e.printStackTrace();
} catch (InvalidTopologyException e) {
e.printStackTrace();
}
} else {
System.out.println("Local running");
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("mytopology", conf, builder.createTopology());
}
}
}
流程是cellproducer到mylog——cmcc----kafkaspout---cellfilterbolt--celldaobolt---hbase
--tomcat---前端
下面写入HBase数据
ackage bolt;
import java.util.Calendar;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.BasicOutputCollector;
import backtype.storm.topology.IBasicBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
import cmcc.hbase.dao.HBaseDAO;
import cmcc.hbase.dao.impl.HBaseDAOImp;
import tools.DateFmt;
public class CellDaoltBolt implements IBasicBolt {
private static final long serialVersionUID = 1L;
HBaseDAO dao = null;
long beginTime = System.currentTimeMillis();
long endTime = 0;
// 通话总数
Map<String, Long> cellCountMap = new HashMap<String, Long>();
// 掉话数
Map<String, Long> cellDropCountMap = new HashMap<String, Long>();
String todayStr = null;
@Override
public void execute(Tuple input, BasicOutputCollector collector) {
// input为2016-01-05,29448-000003,1
if (input != null) {
String dateStr = input.getString(0);
String cellNum = input.getString(1);
String dropNum = input.getString(2);
// 判断是否是当天,不是当天 就清除map 避免内存过大
// 基站数目 大概5-10万(北京市)
// http://bbs.c114.net/thread-793707-1-1.html
todayStr = DateFmt.getCountDate(null, DateFmt.date_short);
// 跨天的处理,大于当天的数据来了,就清空两个map
// 思考: 如果程序崩溃了,map清零了,如果不出问题,一直做同一个cellid的累加
// 这个逻辑不好,应该换成一个线程定期的清除map数据,而不是这里判断
if (todayStr != dateStr && todayStr.compareTo(dateStr) < 0) {
cellCountMap.clear();
cellDropCountMap.clear();
}
// 当前cellid的通话数统计
Long cellAll = cellCountMap.get(cellNum);
if (cellAll == null) {
cellAll = 0L;
}
cellCountMap.put(cellNum, ++cellAll);
// 掉话数统计,大于0就是掉话
Long cellDropAll = cellDropCountMap.get(cellNum);
int t = Integer.parseInt(dropNum);
if (t > 0) {
if (cellDropAll == null) {
cellDropAll = 0L;
}
cellDropCountMap.put(cellNum, ++cellDropAll);
}
// 1.定时写库.为了防止写库过于频繁 这里间隔一段时间写一次
// 2.也可以检测map里面数据size 写数据到 hbase
// 3.自己可以设计一些思路 ,当然 采用redis 也不错
// 4.采用tick定时存储也是一个思路
endTime = System.currentTimeMillis();
// flume+kafka 集成
// 当前掉话数
// 1.每小时掉话数目
// 2.每小时 通话数据
// 3.每小时 掉话率
// 4.昨天的历史轨迹
// 5.同比去年今天的轨迹(如果有数据)
// hbase 按列存储的数据()
// 10万
// rowkey cellnum+ day
if (endTime - beginTime >= 5000) {
// 5s 写一次库
if (cellCountMap.size() > 0 && cellDropCountMap.size() > 0) {
// x轴,相对于小时的偏移量,格式为 时:分,数值 数值是时间的偏移
String arr[] = this.getAxsi();
// 当前日期
String today = DateFmt.getCountDate(null, DateFmt.date_short);
// 当前分钟
String today_minute = DateFmt.getCountDate(null, DateFmt.date_minute);
// cellCountMap为通话数据的map
Set<String> keys = cellCountMap.keySet();
for (Iterator iterator = keys.iterator(); iterator.hasNext();) {
String key_cellnum = (String) iterator.next();
System.out.println("key_cellnum: " + key_cellnum + "***"
+ arr[0] + "---"
+ arr[1] + "---"
+ cellCountMap.get(key_cellnum) + "----"
+ cellDropCountMap.get(key_cellnum));
//写入HBase数据,样例: {time_title:"10:45",xAxis:10.759722222222223,call_num:140,call_drop_num:91}
dao.insert("cell_monitor_table",
key_cellnum + "_" + today,
"cf",
new String[] { today_minute },
new String[] { "{" + "time_title:\"" + arr[0] + "\",xAxis:" + arr[1] + ",call_num:"
+ cellCountMap.get(key_cellnum) + ",call_drop_num:" + cellDropCountMap.get(key_cellnum) + "}" }
);
}
}
// 需要重置初始时间
beginTime = System.currentTimeMillis();
}
}
}
@Override
public void prepare(Map stormConf, TopologyContext context) {
// TODO Auto-generated method stub
dao = new HBaseDAOImp();
Calendar calendar = Calendar.getInstance();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
// TODO Auto-generated method stub
}
@Override
public Map<String, Object> getComponentConfiguration() {
// TODO Auto-generated method stub
return null;
}
// 获取X坐标,就是当前时间的坐标,小时是单位
public String[] getAxsi() {
// 取当前时间
Calendar c = Calendar.getInstance();
int hour = c.get(Calendar.HOUR_OF_DAY);
int minute = c.get(Calendar.MINUTE);
int sec = c.get(Calendar.SECOND);
// 总秒数
int curSecNum = hour * 3600 + minute * 60 + sec;
// (12*3600+30*60+0)/3600=12.5
Double xValue = (double) curSecNum / 3600;
// 时:分,数值 数值是时间的偏移
String[] end = { hour + ":" + minute, xValue.toString() };
return end;
}
@Override
public void cleanup() {
}
}
hbase配置(这里采用的是伪分布式)
1.png 2.png 3.png
网友评论