美文网首页
尚硅谷大数据技术之电信客服

尚硅谷大数据技术之电信客服

作者: 尚硅谷教育 | 来源:发表于2018-12-25 09:45 被阅读24次

    2) 新建测试单元类 :HBaseScanTest2

    |

    package com.atguigu;

    import com.atguigu.utils.DateTimeUtil;

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.hbase.Cell;

    import org.apache.hadoop.hbase.CellUtil;

    import org.apache.hadoop.hbase.HBaseConfiguration;

    import org.apache.hadoop.hbase.client.HTable;

    import org.apache.hadoop.hbase.client.Result;

    import org.apache.hadoop.hbase.client.ResultScanner;

    import org.apache.hadoop.hbase.client.Scan;

    import org.apache.hadoop.hbase.util.Bytes;

    import org.junit.Test;

    import java.io.IOException;

    import java.text.ParseException;

    public class HBaseScanTest2 {

    private static Configuration conf = null;

    static {

    conf = HBaseConfiguration.create();

    }

    @Test

    public void scanTest() throws IOException, ParseException {

    String call = "14473548449";

    String startPoint = "2017-01-01";

    String stopPoint = "2017-09-01";

    HTable hTable = new HTable(conf, "ns_telecom:calllog");

    Scan scan = new Scan();

    ScanRowkeyUtil scanRowkeyUtil = new ScanRowkeyUtil (call, startPoint, stopPoint);

    while (scanRowkeyUtil.hasNext()) {

    String[] rowKeys = scanRowkeyUtil.next();

    scan.setStartRow(Bytes.toBytes(rowKeys[0]));

    scan.setStopRow(Bytes.toBytes(rowKeys[1]));

    System.out.println("时间范围" + rowKeys[0].substring(15, 21) + "---" + rowKeys[1].substring(15, 21));

    ResultScanner resultScanner = hTable.getScanner(scan);

    //每一个rowkey对应一个result

    for (Result result : resultScanner) {

    //每一个rowkey里面包含多个cell

    Cell[] cells = result.rawCells();

    StringBuilder sb = new StringBuilder();

    sb.append(Bytes.toString(result.getRow())).append(",");

    for (Cell c : cells) {

    sb.append(Bytes.toString(CellUtil.cloneValue(c))).append(",");

    }

    System.out.println(sb.toString());

    }

    }

    }

    }

    |

    3) 运行测试

    观察是否已经按照时间范围查询出对应的数据。

    3.2.5 数据消费方案优化

    现在我们要使用

    使用HBase查找数据时,尽可能的使用rowKey去精准的定位数据位置,而非使用ColumnValueFilter或者SingleColumnValueFilter,按照单元格Cell中的Value过滤数据,这样做在数据量巨大的情况下,效率是极低的——如果要涉及到全表扫描。所以尽量不要做这样可怕的事情。注意,这并非ColumnValueFilter就无用武之地。现在,我们将使用协处理器,将数据一分为二。

    思路:

    a) 编写协处理器类,用于协助处理HBase的相关操作(增删改查)

    b) 在协处理器中,一条主叫日志成功插入后,将该日志切换为被叫视角再次插入一次,放入到与主叫日志不同的列族中。

    c) 重新创建hbase表,并设置为该表设置协处理器。

    d) 编译项目,发布协处理器的jar包到hbase的lib目录下,并群发该jar包

    e) 修改hbase-site.xml文件,设置协处理器,并群发该hbase-site.xml文件

    编码:

    1) 新建协处理器类:CalleeWriteObserver,并覆写postPut方法,该方法会在数据成功插入之后被回调。

    |

    package com.atguigu.coprocessor;

    import com.atguigu.utils.HBaseUtil;

    import com.atguigu.utils.PropertiesUtil;

    import org.apache.commons.lang.StringUtils;

    import org.apache.hadoop.hbase.TableName;

    import org.apache.hadoop.hbase.client.*;

    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;

    import org.apache.hadoop.hbase.coprocessor.ObserverContext;

    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;

    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;

    import org.apache.hadoop.hbase.util.Bytes;

    import java.io.IOException;

    import java.text.ParseException;

    import java.text.SimpleDateFormat;

    /**

    • 用于实现主叫日志插入成功之后,同时插入一条被叫日志

    */

    public class CalleeWriteObserver extends BaseRegionObserver{

    @Override

    public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {

    super.postPut(e, put, edit, durability);

    //1、获取需要操作的表

    String targetTableName = PropertiesUtil.getProperty("hbase.table.name");

    //2、获取当前操作的表

    String currentTableName = e.getEnvironment().getRegion().getRegionInfo().getTable().getNameAsString();

    //3、判断需要操作的表是否就是当前表,如果不是,则return

    if (!StringUtils.equals(targetTableName, currentTableName)) return;

    //4、得到当前插入数据的值并封装新的数据,oriRowkey举例:01_15369468720_20170727081033_13720860202_1_0180

    String oriRowKey = Bytes.toString(put.getRow());

    String[] splits = oriRowKey.split("_");

    String flag = splits[4];

    //如果当前插入的是被叫数据,则直接返回(因为默认提供的数据全部为主叫数据)

    if(StringUtils.equals(flag, "0")) return;

    //当前插入的数据描述

    String caller = splits[1];

    String callee = splits[3];

    String dateTime = splits[2];

    String duration = splits[5];

    String timestamp = null;

    try {

    SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");

    timestamp = String.valueOf(sdf.parse(dateTime).getTime());

    } catch (ParseException e1) {

    e1.printStackTrace();

    }

    //组装新的数据所在分区号

    int regions = Integer.valueOf(PropertiesUtil.getProperty("hbase.regions.count"));

    String regionHash = HBaseUtil.genPartitionCode(callee, dateTime, regions);

    String newFlag = "0";

    String rowKey = HBaseUtil.genRowKey(regionHash, callee, dateTime, caller, newFlag, duration);

    //开始存放被叫数据

    Put newPut = new Put(Bytes.toBytes(rowKey));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call1"), Bytes.toBytes(callee));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("call2"), Bytes.toBytes(caller));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time"), Bytes.toBytes(dateTime));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("date_time_ts"), Bytes.toBytes(timestamp));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("duration"), Bytes.toBytes(duration));

    newPut.add(Bytes.toBytes("f2"), Bytes.toBytes("flag"), Bytes.toBytes(newFlag));

    HTableInterface hTable = e.getEnvironment().getTable(TableName.valueOf(targetTableName));

    hTable.put(newPut);

    hTable.close();

    }

    }

    |

    2) 重新创建hbase****表,并设置为该表设置协处理器。在“表描述器”中调用addCoprocessor****方法进行协处理器的设置,大概是这样的:(你需要找到你的建表的那部分代码,添加如下逻辑)

    |

    tableDescriptor.addCoprocessor("com.atguigu.coprocessor.CalleeWriteObserver");

    |

    本教程由尚硅谷教育大数据研究院出品,如需转载请注明来源,欢迎大家关注尚硅谷公众号(atguigu)了解更多。

    相关文章

      网友评论

          本文标题:尚硅谷大数据技术之电信客服

          本文链接:https://www.haomeiwen.com/subject/lwuxlqtx.html