美文网首页
hbase批量提交与批量读取

hbase批量提交与批量读取

作者: 愤怒的小猥琐 | 来源:发表于2018-12-03 16:58 被阅读0次

需求1

大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版

实现核心代码

  1. put
public static long put(String tablename, List<Put> puts) throws Exception {
        long currentTime = System.currentTimeMillis();
        Connection conn = getConnection();
        final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
            @Override
            public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
                for (int i = 0; i < e.getNumExceptions(); i++) {
                    System.out.println("Failed to sent put " + e.getRow(i) + ".");
                    logger.error("Failed to sent put " + e.getRow(i) + ".");
                }
            }
        };
        BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
                .listener(listener);
        params.writeBufferSize(5 * 1024 * 1024);

        final BufferedMutator mutator = conn.getBufferedMutator(params);
        try {
            mutator.mutate(puts);
            mutator.flush();
        } finally {
            mutator.close();
            closeConnect(conn);
        }
        return System.currentTimeMillis() - currentTime;
    }
  1. 多线程操作
 threadPool.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    HBaseUtil.put(tableName, puts);
                } catch (Exception e) {
                    logger.error("batchPut failed . ", e);
                }
            }
        });

        if(waiting){
            try {
                threadPool.awaitTermination();
            } catch (InterruptedException e) {
                logger.error("HBase put job thread pool await termination time out.", e);
            }
        }

需求2

批量数据读取及优化

  1. 读取代码
 public List<TimeValue> getPeroidDate(String point, DateTime startTime, DateTime endTime) throws IOException {
        List<TimeValue> series = new ArrayList<>();
        byte[] start = Bytes.toBytes(startTime.getMillis());
        byte[] end = Bytes.toBytes(endTime.getMillis());
        byte[] id = Bytes.toBytes(point);
        Scan scan = new Scan(id, id);
        scan.addFamily(Bytes.toBytes("history"));
        Filter f = new ColumnRangeFilter(start, true, end, false);
        scan.setFilter(f);
        scan.setCaching(5000);
        scan.setBatch(5000);
        scan.setCacheBlocks(false);
        //TODO Add parameters to support the pagination
        ResultScanner r = null;
        try {
            r = table.getScanner(scan);
        } catch (IOException e) {
            e.printStackTrace();
        }
        for (Result result : r) {
            for (Cell cell : result.rawCells()) {
                series.add(new TimeValue(new Timestamp(Bytes.toLong(CellUtil.cloneQualifier(cell))), Bytes.toString(CellUtil.cloneValue(cell))));
            }
        }
        return series;
    }
  1. 优化
    多线程将时间切割为一段一段分别读取并合并

相关文章

  • hbase批量提交与批量读取

    需求1 大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版 实现核心代码 pu...

  • 2022-06-29

    批量读取文本

  • HBase BulkLoad批量写入数据实战

    1.概述 在进行数据传输中,批量加载数据到HBase集群有多种方式,比如通过HBase API进行批量写入数据、使...

  • 2020-01-20

    properties读取文件 properties文档 批量下载 下载

  • hbase-spark bulk load(二)

    概述 之前写过spark批量导入Hbase的案例:Spark、BulkLoad Hbase、单列、多列,实现了多列...

  • 使用BulkLoad 向 HBase 中批量导入数据

    1 使用 BulkLoad 向 HBase 中批量导入数据 2 背景介绍 2.1 概述 我们经常面临向 HBase...

  • Postman读取JSON文件和CSV文件

    在使用postman读取文件前我们先看下使用postman批量执行请求集合 Postman批量执行测试集合 选择要...

  • redis批量操作

    redis multi pipeline平时使用redis的时候,经常有批量操作的需求;比如批量读取一批数据,或者...

  • 005.关于标签类需求设计

    1.手工 手工单个或批量文件上传发起 2.系统 定期跑批读取外部来源数据文件,批量打标识 系统内部自定义逻辑,批量...

  • R语言的文件读取小技能

    使用R软件,解锁数据读取新姿势。 1.批量读取文件夹里面某类文件 2.批量将文件夹某类文件读取进来并合并成一个数据...

网友评论

      本文标题:hbase批量提交与批量读取

      本文链接:https://www.haomeiwen.com/subject/vwaycqtx.html