美文网首页
尚硅谷大数据技术之电信客服

尚硅谷大数据技术之电信客服

作者: 尚硅谷教育 | 来源:发表于2018-12-25 09:52 被阅读4次

3.3.4 需求实现

1) 创建类:CountDurationMapper

|

package com.atguigu.analysis.mapper;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.ContactDimension;

import com.atguigu.analysis.kv.impl.DateDimension;

import org.apache.commons.lang.StringUtils;

import org.apache.hadoop.hbase.client.Result;

import org.apache.hadoop.hbase.io.ImmutableBytesWritable;

import org.apache.hadoop.hbase.mapreduce.TableMapper;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

public class CountDurationMapper extends TableMapper<ComDimension, Text>{

//存放联系人电话与姓名的映射

private Map<String, String> contacts;

private byte[] family = Bytes.toBytes("f1");

private ComDimension comDimension = new ComDimension();

private void initContact(){

contacts = new HashMap<String, String>();

contacts.put("15369468720", "李雁");

contacts.put("19920860202", "卫艺");

contacts.put("18411925860", "仰莉");

contacts.put("14473548449", "陶欣悦");

contacts.put("18749966182", "施梅梅");

contacts.put("19379884788", "金虹霖");

contacts.put("19335715448", "魏明艳");

contacts.put("18503558939", "华贞");

contacts.put("13407209608", "华啟倩");

contacts.put("15596505995", "仲采绿");

contacts.put("17519874292", "卫丹");

contacts.put("15178485516", "戚丽红");

contacts.put("19877232369", "何翠柔");

contacts.put("18706287692", "钱溶艳");

contacts.put("18944239644", "钱琳");

contacts.put("17325302007", "缪静欣");

contacts.put("18839074540", "焦秋菊");

contacts.put("19879419704", "吕访琴");

contacts.put("16480981069", "沈丹");

contacts.put("18674257265", "褚美丽");

}

@Override

protected void setup(Context context) throws IOException, InterruptedException {

initContact();

}

@Override

protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {

//01_15837312345_20170810141024_13738909097_1_0180

String rowKey = Bytes.toString(value.getRow());

String[] values = rowKey.split("_");

String flag = values[4];

//只拿到主叫数据即可

if(StringUtils.equals(flag, "0")) return;

String date_time = values[2];

String duration = values[5];

String call1 = values[1];

String call2 = values[3];

int year = Integer.valueOf(date_time.substring(0, 4));

int month = Integer.valueOf(date_time.substring(4, 6));

int day = Integer.valueOf(date_time.substring(6, 8));

DateDimension dateDimensionYear = new DateDimension(year, -1, -1);

DateDimension dateDimensionMonth = new DateDimension(year, month, -1);

DateDimension dateDimensionDay = new DateDimension(year, month, day);

//第一个电话号码

ContactDimension contactDimension1 = new ContactDimension(call1, contacts.get(call1));

comDimension.setContactDimension(contactDimension1);

comDimension.setDateDimension(dateDimensionYear);

context.write(comDimension, new Text(duration));

comDimension.setDateDimension(dateDimensionMonth);

context.write(comDimension, new Text(duration));

comDimension.setDateDimension(dateDimensionDay);

context.write(comDimension, new Text(duration));

//第二个电话号码

ContactDimension contactDimension2 = new ContactDimension(call2, contacts.get(call2));

comDimension.setContactDimension(contactDimension2);

comDimension.setDateDimension(dateDimensionYear);

context.write(comDimension, new Text(duration));

comDimension.setDateDimension(dateDimensionMonth);

context.write(comDimension, new Text(duration));

comDimension.setDateDimension(dateDimensionDay);

context.write(comDimension, new Text(duration));

}

}

|

2) 创建类:CountDurationReducer

|

package com.atguigu.analysis.reducer;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.CountDurationValue;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class CountDurationReducer extends Reducer<ComDimension, Text, ComDimension, CountDurationValue>{

@Override

protected void reduce(ComDimension key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

int count = 0;

int sumDuration = 0;

for(Text text : values){

count ++;

sumDuration += Integer.valueOf(text.toString());

}

CountDurationValue countDurationValue = new CountDurationValue(count, sumDuration);

context.write(key, countDurationValue);

}

}

|

3) 创建类:CountDurationRunner

|

package com.atguigu.analysis.runner;

import com.atguigu.analysis.format.MySQLOutputFormat;

import com.atguigu.analysis.kv.impl.ComDimension;

import com.atguigu.analysis.kv.impl.CountDurationValue;

import com.atguigu.analysis.mapper.CountDurationMapper;

import com.atguigu.analysis.reducer.CountDurationReducer;

import org.apache.hadoop.conf.Configuration;

import org.apache.hadoop.conf.Configured;

import org.apache.hadoop.hbase.HBaseConfiguration;

import org.apache.hadoop.hbase.client.HBaseAdmin;

import org.apache.hadoop.hbase.client.Scan;

import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;

import org.apache.hadoop.hbase.util.Bytes;

import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;

import org.apache.hadoop.util.Tool;

import org.apache.hadoop.util.ToolRunner;

import java.io.IOException;

public class CountDurationRunner implements Tool{

private Configuration conf = null;

@Override

public void setConf(Configuration conf) {

this.conf = HBaseConfiguration.create(conf);

}

@Override

public Configuration getConf() {

return this.conf;

}

@Override

public int run(String[] args) throws Exception {

//得到conf对象

Configuration conf = this.getConf();

//创建Job

Job job = Job.getInstance(conf, "CALL_LOG_ANALYSIS");

job.setJarByClass(CountDurationRunner.class);

//为Job设置Mapper

this.setHBaseInputConfig(job);

//为Job设置Reducer

job.setReducerClass(CountDurationReducer.class);

job.setOutputKeyClass(ComDimension.class);

job.setOutputValueClass(CountDurationValue.class);

//为Job设置OutputFormat

job.setOutputFormatClass(MySQLOutputFormat.class);

return job.waitForCompletion(true) ? 0 : 1;

}

private void setHBaseInputConfig(Job job) {

Configuration conf = job.getConfiguration();

HBaseAdmin admin = null;

try {

admin = new HBaseAdmin(conf);

//如果表不存在则直接返回,抛个异常也挺好

if(!admin.tableExists("ns_telecom:calllog")) throw new RuntimeException("Unable to find the specified table.");

Scan scan = new Scan();

scan.setAttribute(Scan.SCAN_ATTRIBUTES_TABLE_NAME, Bytes.toBytes("ns_telecom:calllog"));

TableMapReduceUtil.initTableMapperJob("ns_telecom:calllog", scan,

CountDurationMapper.class, ComDimension.class, Text.class,

job, true);

} catch (IOException e) {

e.printStackTrace();

}finally {

if(admin != null) try {

admin.close();

} catch (IOException e) {

e.printStackTrace();

}

}

}

public static void main(String[] args) {

try {

int status = ToolRunner.run(new CountDurationRunner(), args);

System.exit(status);

if(status == 0){

System.out.println("运行成功");

}else {

System.out.println("运行失败");

}

} catch (Exception e) {

System.out.println("运行失败");

e.printStackTrace();

}

}

}

|

本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

相关文章

网友评论

      本文标题:尚硅谷大数据技术之电信客服

      本文链接:https://www.haomeiwen.com/subject/ddpxlqtx.html