美文网首页
hbase批量提交与批量读取

hbase批量提交与批量读取

作者: 愤怒的小猥琐 | 来源:发表于2018-12-03 16:58 被阅读0次

    需求1

    大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版

    实现核心代码

    1. put
    public static long put(String tablename, List<Put> puts) throws Exception {
            long currentTime = System.currentTimeMillis();
            Connection conn = getConnection();
            final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
                @Override
                public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
                    for (int i = 0; i < e.getNumExceptions(); i++) {
                        System.out.println("Failed to sent put " + e.getRow(i) + ".");
                        logger.error("Failed to sent put " + e.getRow(i) + ".");
                    }
                }
            };
            BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                    .listener(listener);
            params.writeBufferSize(5 * 1024 * 1024);
    
            final BufferedMutator mutator = conn.getBufferedMutator(params);
            try {
                mutator.mutate(puts);
                mutator.flush();
            } finally {
                mutator.close();
                closeConnect(conn);
            }
            return System.currentTimeMillis() - currentTime;
        }
    
    1. 多线程操作
     threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        HBaseUtil.put(tableName, puts);
                    } catch (Exception e) {
                        logger.error("batchPut failed . ", e);
                    }
                }
            });
    
            if(waiting){
                try {
                    threadPool.awaitTermination();
                } catch (InterruptedException e) {
                    logger.error("HBase put job thread pool await termination time out.", e);
                }
            }
    

    需求2

    批量数据读取及优化

    1. 读取代码
     public List<TimeValue> getPeroidDate(String point, DateTime startTime, DateTime endTime) throws IOException {
            List<TimeValue> series = new ArrayList<>();
            byte[] start = Bytes.toBytes(startTime.getMillis());
            byte[] end = Bytes.toBytes(endTime.getMillis());
            byte[] id = Bytes.toBytes(point);
            Scan scan = new Scan(id, id);
            scan.addFamily(Bytes.toBytes("history"));
            Filter f = new ColumnRangeFilter(start, true, end, false);
            scan.setFilter(f);
            scan.setCaching(5000);
            scan.setBatch(5000);
            scan.setCacheBlocks(false);
            //TODO Add parameters to support the pagination
            ResultScanner r = null;
            try {
                r = table.getScanner(scan);
            } catch (IOException e) {
                e.printStackTrace();
            }
            for (Result result : r) {
                for (Cell cell : result.rawCells()) {
                    series.add(new TimeValue(new Timestamp(Bytes.toLong(CellUtil.cloneQualifier(cell))), Bytes.toString(CellUtil.cloneValue(cell))));
                }
            }
            return series;
        }
    
    1. 优化
      多线程将时间切割为一段一段分别读取并合并

    相关文章

      网友评论

          本文标题:hbase批量提交与批量读取

          本文链接:https://www.haomeiwen.com/subject/vwaycqtx.html