需求1
大量数据(10W个点,每个点为double类型)实时存入hbase,hbase为单机版
实现核心代码
- put
public static long put(String tablename, List<Put> puts) throws Exception {
long currentTime = System.currentTimeMillis();
Connection conn = getConnection();
final BufferedMutator.ExceptionListener listener = new BufferedMutator.ExceptionListener() {
@Override
public void onException(RetriesExhaustedWithDetailsException e, BufferedMutator mutator) {
for (int i = 0; i < e.getNumExceptions(); i++) {
System.out.println("Failed to sent put " + e.getRow(i) + ".");
logger.error("Failed to sent put " + e.getRow(i) + ".");
}
}
};
BufferedMutatorParams params = new BufferedMutatorParams(TableName.valueOf(tablename))
.listener(listener);
params.writeBufferSize(5 * 1024 * 1024);
final BufferedMutator mutator = conn.getBufferedMutator(params);
try {
mutator.mutate(puts);
mutator.flush();
} finally {
mutator.close();
closeConnect(conn);
}
return System.currentTimeMillis() - currentTime;
}
- 多线程操作
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
HBaseUtil.put(tableName, puts);
} catch (Exception e) {
logger.error("batchPut failed . ", e);
}
}
});
if(waiting){
try {
threadPool.awaitTermination();
} catch (InterruptedException e) {
logger.error("HBase put job thread pool await termination time out.", e);
}
}
需求2
批量数据读取及优化
- 读取代码
public List<TimeValue> getPeroidDate(String point, DateTime startTime, DateTime endTime) throws IOException {
List<TimeValue> series = new ArrayList<>();
byte[] start = Bytes.toBytes(startTime.getMillis());
byte[] end = Bytes.toBytes(endTime.getMillis());
byte[] id = Bytes.toBytes(point);
Scan scan = new Scan(id, id);
scan.addFamily(Bytes.toBytes("history"));
Filter f = new ColumnRangeFilter(start, true, end, false);
scan.setFilter(f);
scan.setCaching(5000);
scan.setBatch(5000);
scan.setCacheBlocks(false);
//TODO Add parameters to support the pagination
ResultScanner r = null;
try {
r = table.getScanner(scan);
} catch (IOException e) {
e.printStackTrace();
}
for (Result result : r) {
for (Cell cell : result.rawCells()) {
series.add(new TimeValue(new Timestamp(Bytes.toLong(CellUtil.cloneQualifier(cell))), Bytes.toString(CellUtil.cloneValue(cell))));
}
}
return series;
}
- 优化
多线程将时间切割为一段一段分别读取并合并
网友评论