HBase操作

作者: 木子Qing | 来源:发表于2017-03-07 20:40 被阅读302次

    http://blog.csdn.net/fengzheku/article/details/48447791

    packagecom.infobird.test1;

    importjava.io.IOException;

    importjava.util.ArrayList;

    importjava.util.HashMap;

    importjava.util.List;

    importjava.util.Map;

    importorg.apache.hadoop.conf.Configuration;

    importorg.apache.hadoop.hbase.Cell;

    importorg.apache.hadoop.hbase.HBaseConfiguration;

    importorg.apache.hadoop.hbase.HColumnDescriptor;

    importorg.apache.hadoop.hbase.HTableDescriptor;

    importorg.apache.hadoop.hbase.MasterNotRunningException;

    importorg.apache.hadoop.hbase.TableName;

    importorg.apache.hadoop.hbase.ZooKeeperConnectionException;

    importorg.apache.hadoop.hbase.client.Admin;

    importorg.apache.hadoop.hbase.client.Connection;

    importorg.apache.hadoop.hbase.client.ConnectionFactory;

    importorg.apache.hadoop.hbase.client.Delete;

    importorg.apache.hadoop.hbase.client.Put;

    importorg.apache.hadoop.hbase.client.Result;

    importorg.apache.hadoop.hbase.client.ResultScanner;

    importorg.apache.hadoop.hbase.client.Scan;

    importorg.apache.hadoop.hbase.client.Table;

    importorg.apache.hadoop.hbase.filter.Filter;

    importorg.apache.hadoop.hbase.filter.FilterList;

    importorg.apache.hadoop.hbase.filter.FilterList.Operator;

    importorg.apache.hadoop.hbase.filter.PageFilter;

    importorg.apache.hadoop.hbase.filter.SingleColumnValueFilter;

    importorg.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

    importorg.apache.hadoop.hbase.util.Bytes;

    publicclassHbaseDemo1 {

    privatestaticConfiguration configuration;

    privatestaticConnection connection;

    static{

    configuration = HBaseConfiguration.create();

    configuration.set("hbase.zookeeper.property.clientPort","2181");

    configuration.set("hbase.zookeeper.quorum","hostIp");

    configuration.set("hbase.master","hostIp:8020");

    try{

    connection = ConnectionFactory.createConnection(configuration);

    }catch(IOException e) {

    e.printStackTrace();

    }

    }

    /**

    * 创建表

    *

    * @param tableName

    * @param familyColumnName

    */

    publicstaticvoidcreateTable(String name, List familyColumnName) {

    try{

    TableName tableName = TableName.valueOf(name);

    Admin hAdmin = connection.getAdmin();

    HTableDescriptor descripter =newHTableDescriptor(tableName);

    for(String familyName : familyColumnName) {

    descripter.addFamily(newHColumnDescriptor(familyName));

    }

    if(hAdmin.tableExists(tableName)) {

    hAdmin.disableTable(tableName);

    hAdmin.deleteTable(tableName);

    System.out.println(tableName +"is exists...");

    }

    hAdmin.createTable(descripter);

    hAdmin.close();

    }catch(MasterNotRunningException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }catch(ZooKeeperConnectionException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }catch(IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    /**

    * 往表里面添加数据

    *

    * @param tableName

    * @param rowkey

    * @param columnValues

    * @return

    */

    publicstaticintaddDataForTable(String name, String rowkey,

    Map> columnValues) {

    try{

    Put put =newPut(Bytes.toBytes(rowkey));

    TableName tableName = TableName.valueOf(name);

    Table htable = connection.getTable(tableName);

    HColumnDescriptor[] columnFamilies = htable.getTableDescriptor()

    .getColumnFamilies();// 获取所有的列名

    for(HColumnDescriptor hColumnDescriptor : columnFamilies) {

    String familyName = hColumnDescriptor.getNameAsString();

    Map columnNameValueMap = columnValues

    .get(familyName);

    if(columnNameValueMap !=null) {

    for(String columnName : columnNameValueMap.keySet()) {

    put.addColumn(Bytes.toBytes(familyName), Bytes

    .toBytes(columnName), Bytes

    .toBytes(columnNameValueMap.get(columnName)));

    }

    }

    }

    htable.put(put);

    htable.close();

    returnput.size();

    }catch(IOException e) {

    e.printStackTrace();

    }

    return0;

    }

    /**

    * 批量添加数据

    *

    * @param list

    */

    publicstaticvoidinsertDataList(List list) {

    List puts =newArrayList();

    Table table =null;

    Put put;

    try{

    for(HbaseDataEntity entity : list) {

    TableName tableName = TableName.valueOf(entity.getTableName());

    table = connection.getTable(tableName);

    put =newPut(entity.getMobileKey().getBytes());// 一个PUT代表一行数据,再NEW一个PUT表示第二行数据,每行一个唯一的ROWKEY

    for(String columnfamily : entity.getColumns().keySet()) {

    for(String column : entity.getColumns().get(columnfamily)

    .keySet()) {

    put.addColumn(

    columnfamily.getBytes(),

    column.getBytes(),

    entity.getColumns().get(columnfamily)

    .get(column).getBytes());

    }

    }

    puts.add(put);

    }

    table.put(puts);

    table.close();

    }catch(Exception e) {

    e.printStackTrace();

    }finally{

    }

    }

    /**

    * 更新表中的一列

    *

    * @param tableName

    * @param rowKey

    * @param familyName

    * @param columnName

    * @param value

    */

    publicstaticvoidupdateTable(String name, String rowKey,

    String familyName, String columnName, String value) {

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    Put put =newPut(Bytes.toBytes(rowKey));

    put.addColumn(Bytes.toBytes(familyName), Bytes.toBytes(columnName),

    Bytes.toBytes(value));

    table.put(put);

    table.close();

    }catch(IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    /**

    * 批量删除数据

    *

    * @param list

    */

    publicstaticvoiddeleteDataList(List list) {

    Table table =null;

    List deletes =newArrayList();

    try{

    for(HbaseDataEntity entity : list) {

    TableName tableName = TableName.valueOf(entity.getTableName());

    table = connection.getTable(tableName);

    Delete delete =newDelete(Bytes.toBytes(entity.getMobileKey()));

    for(String columnfamily : entity.getColumns().keySet()) {

    for(String column : entity.getColumns().get(columnfamily)

    .keySet()) {

    delete.addColumn(columnfamily.getBytes(),

    column.getBytes());

    }

    }

    deletes.add(delete);

    }

    table.delete(deletes);

    table.close();

    }catch(Exception e) {

    e.printStackTrace();

    }finally{

    }

    }

    /**

    * 删除指定的列

    *

    * @param tableName

    * @param rowKey

    * @param familyName

    * @param columnName

    */

    publicstaticvoiddeleteColumn(String name, String rowKey,

    String familyName, String columnName) {

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    Delete delete =newDelete(Bytes.toBytes(rowKey));

    delete.addColumn(Bytes.toBytes(familyName),

    Bytes.toBytes(columnName));

    table.delete(delete);

    System.out.println(familyName +":"+ columnName +"is delete");

    table.close();

    }catch(IOException e) {

    e.printStackTrace();

    }

    }

    /**

    * 删除所有列

    *

    * @param tableName

    * @param rowKey

    */

    publicstaticvoiddeleteAllColumns(String name, String rowKey) {

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    Delete delete =newDelete(Bytes.toBytes(rowKey));

    table.delete(delete);

    System.out.println("all columns are deleted!");

    table.close();

    }catch(IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    }

    /**

    * 获取所有的数据

    * @param name

    * @param size

    * @return

    */

    publicstaticList getResultScans(String name,intsize) {

    Scan scan =newScan();

    ResultScanner resultScanner =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    longbeiginTime = System.currentTimeMillis();

    resultScanner = table.getScanner(scan);

    longendTime = System.currentTimeMillis();

    doublespentTime = (endTime - beiginTime) /1000.0;

    System.out.println("cost:==="+ spentTime +"s");

    for(Result result : resultScanner) {

    // System.out.println("获得到rowkey:" + new

    // String(result.getRow()));

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setTableName(name);

    entity.setMobileKey(newString(result.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : result.rawCells()) {

    if(familyMap.get(newString(cell.getFamilyArray())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    // System.out.println("列:" + new

    // String(cell.getFamilyArray(), cell.getFamilyOffset(),

    // cell.getFamilyLength())

    // + "====值:" + new

    // String(cell.getValueArray(),cell.getValueOffset(),cell.getValueLength()));

    }

    entity.setColumns(familyMap);

    list.add(entity);

    if(size == list.size()) {

    break;

    }

    }

    table.close();

    returnlist;

    }catch(IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }finally{

    resultScanner.close();

    }

    returnnull;

    }

    /**

    * 组合条件查询 and

    *

    * @param nameSpace

    *            命名空间

    * @param tableName

    *            表名

    * @param parameters

    *            格式是:columnFamily,columnName,columnValue

    */

    publicstaticList QueryDataByConditionsAnd(

    String nameSpace, String name, List parameters) {

    ResultScanner rs =null;

    Table table =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    table = connection.getTable(tableName);

    // 参数的格式:columnFamily,columnName,columnValue

    List filters =newArrayList();

    for(String parameter : parameters) {

    String[] columns = parameter.split(",");

    SingleColumnValueFilter filter =newSingleColumnValueFilter(

    Bytes.toBytes(columns[0]), Bytes.toBytes(columns[1]),

    CompareOp.valueOf(columns[2]),

    Bytes.toBytes(columns[3]));

    filter.setFilterIfMissing(true);

    filters.add(filter);

    }

    FilterList filterList =newFilterList(filters);

    Scan scan =newScan();

    scan.setFilter(filterList);

    rs = table.getScanner(scan);

    for(Result r : rs) {

    System.out.println("获得到rowkey:"+newString(r.getRow()));

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setNameSpace(nameSpace);

    entity.setTableName(name);

    entity.setMobileKey(newString(r.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : r.rawCells()) {

    if(familyMap.get(newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell.getFamilyLength())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    }

    entity.setColumns(familyMap);

    list.add(entity);

    }

    rs.close();

    table.close();

    returnlist;

    }catch(Exception e) {

    e.printStackTrace();

    }

    returnnull;

    }

    /**

    * 组合条件查询 or

    *

    * @param nameSpace

    *            命名空间

    * @param tableName

    *            表名

    * @param parameters

    *            格式是:columnFamily,columnName,columnValue

    * @return

    */

    publicstaticList QueryDataByConditionsOr(

    String nameSpace, String name, List parameters) {

    ResultScanner rs =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    // 参数的额格式:columnFamily,columnName,columnValue

    List filters =newArrayList();

    Scan scan =newScan();

    byte[] columnFamily =null;

    byte[] columnName =null;

    for(String parameter : parameters) {

    String[] columns = parameter.split(",");

    columnFamily = Bytes.toBytes(columns[0]);

    columnName = Bytes.toBytes(columns[1]);

    SingleColumnValueFilter filter =newSingleColumnValueFilter(

    Bytes.toBytes(columns[0]), Bytes.toBytes(columns[1]),

    CompareOp.valueOf(columns[2]),

    Bytes.toBytes(columns[3]));

    filter.setFilterIfMissing(true);

    filters.add(filter);

    }

    FilterList filterList =newFilterList(

    FilterList.Operator.MUST_PASS_ONE, filters);

    scan.setFilter(filterList);

    rs = table.getScanner(scan);

    for(Result r : rs) {

    if(r.containsColumn(columnFamily, columnName)) {

    System.out.println("获得到rowkey:"+newString(r.getRow()));

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setNameSpace(nameSpace);

    entity.setTableName(name);

    entity.setMobileKey(newString(r.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : r.rawCells()) {

    if(familyMap

    .get(newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    }

    entity.setColumns(familyMap);

    list.add(entity);

    }

    }

    table.close();

    rs.close();

    returnlist;

    }catch(Exception e) {

    e.printStackTrace();

    }

    returnnull;

    }

    /**

    * 组合条件查询 or

    *

    * @param nameSpace

    *            命名空间

    * @param tableName

    *            表名

    * @param parameters

    *            格式是:columnFamily,columnName,columnValue

    * @return

    */

    publicstaticList QueryDataByConditions(String nameSpace,

    String name, List hbaseConditions) {

    ResultScanner rs =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    // 参数的额格式:columnFamily,columnName,columnValue

    // List filters = new ArrayList();

    Scan scan =newScan();

    FilterList filterList =null;

    Operator operator =null;

    for(HbaseConditionEntity hbaseCondition : hbaseConditions) {

    SingleColumnValueFilter filter =newSingleColumnValueFilter(

    hbaseCondition.getFamilyColumn(),

    hbaseCondition.getColumn(),

    hbaseCondition.getCompareOp(),

    hbaseCondition.getValue());

    filter.setFilterIfMissing(true);

    if(hbaseCondition.getOperator() !=null) {

    if(operator ==null) {

    operator = hbaseCondition.getOperator();

    filterList =newFilterList(

    hbaseCondition.getOperator());

    filterList.addFilter(filter);

    System.out.println("filterList==1"+ filterList);

    }elseif(operator.equals(hbaseCondition.getOperator())) {

    filterList.addFilter(filter);

    }else{

    filterList.addFilter(filter);

    System.out.println("filterList==2"+ filterList);

    FilterList addFilterList =newFilterList(

    hbaseCondition.getOperator());

    addFilterList.addFilter(filterList);

    System.out.println("addFilterList==1"+ addFilterList);

    filterList = addFilterList;

    System.out.println("filterList==3"+ filterList);

    }

    }else{

    if(filterList ==null) {

    filterList =newFilterList(Operator.MUST_PASS_ALL);// 默认只有一个条件的时候

    }

    filterList.addFilter(filter);

    }

    }

    System.out.println(filterList +":filterList");

    scan.setFilter(filterList);

    rs = table.getScanner(scan);

    for(Result r : rs) {

    System.out.println("获得到rowkey:"+newString(r.getRow()));

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setNameSpace(nameSpace);

    entity.setTableName(name);

    entity.setMobileKey(newString(r.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : r.rawCells()) {

    if(familyMap.get(newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell.getFamilyLength())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    }

    entity.setColumns(familyMap);

    list.add(entity);

    }

    table.close();

    rs.close();

    returnlist;

    }catch(Exception e) {

    e.printStackTrace();

    }

    returnnull;

    }

    /**

    * 分页的复合条件查询

    * @param nameSpace

    *        命名空间

    * @param name

    *        表名

    * @param hbaseConditions

    *        复合条件

    * @param pageSize

    *        每页显示的数量

    * @param lastRow

    *        当前页的最后一行

    * @return

    */

    publicstaticList QueryDataByConditionsAndPage(

    String nameSpace, String name,

    List hbaseConditions,intpageSize,

    byte[] lastRow) {

    finalbyte[] POSTFIX =newbyte[] {0x00};

    ResultScanner rs =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    Table table = connection.getTable(tableName);

    Scan scan =newScan();

    FilterList filterList =null;

    Operator operator =null;

    for(HbaseConditionEntity hbaseCondition : hbaseConditions) {

    SingleColumnValueFilter filter =newSingleColumnValueFilter(

    hbaseCondition.getFamilyColumn(),

    hbaseCondition.getColumn(),

    hbaseCondition.getCompareOp(),

    hbaseCondition.getValue());

    filter.setFilterIfMissing(true);

    if(hbaseCondition.getOperator() !=null) {

    if(operator ==null) {

    operator = hbaseCondition.getOperator();

    filterList =newFilterList(

    hbaseCondition.getOperator());

    filterList.addFilter(filter);

    System.out.println("filterList==1"+ filterList);

    }elseif(operator.equals(hbaseCondition.getOperator())) {

    filterList.addFilter(filter);

    }else{

    filterList.addFilter(filter);

    System.out.println("filterList==2"+ filterList);

    FilterList addFilterList =newFilterList(

    hbaseCondition.getOperator());

    addFilterList.addFilter(filterList);

    System.out.println("addFilterList==1"+ addFilterList);

    filterList = addFilterList;

    System.out.println("filterList==3"+ filterList);

    }

    }else{

    if(filterList ==null) {

    filterList =newFilterList(Operator.MUST_PASS_ALL);// 默认只有一个条件的时候

    }

    filterList.addFilter(filter);

    }

    }

    System.out.println(filterList +":filterList");

    FilterList pageFilterList =newFilterList(Operator.MUST_PASS_ALL);// 默认只有一个条件的时候

    Filter pageFilter =newPageFilter(pageSize);

    pageFilterList.addFilter(pageFilter);

    pageFilterList.addFilter(filterList);

    if(lastRow !=null) {

    // 注意这里添加了POSTFIX操作,不然死循环了

    byte[] startRow = Bytes.add(lastRow, POSTFIX);

    scan.setStartRow(startRow);

    }

    System.out.println(pageFilterList +":pageFilterList");

    scan.setFilter(pageFilterList);

    rs = table.getScanner(scan);

    for(Result r : rs) {

    System.out.println("获得到rowkey:"+newString(r.getRow()));

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setNameSpace(nameSpace);

    entity.setTableName(name);

    entity.setMobileKey(newString(r.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : r.rawCells()) {

    if(familyMap.get(newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell.getFamilyLength())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    }

    entity.setColumns(familyMap);

    list.add(entity);

    }

    table.close();

    rs.close();

    returnlist;

    }catch(Exception e) {

    e.printStackTrace();

    }

    returnnull;

    }

    /**

    * 复合条件分页查询

    * @param name

    * @param pageSize

    * @param lastRow

    * @return

    */

    publicstaticList getHbaseDatasByPage(String name,

    intpageSize,byte[] lastRow) {

    finalbyte[] POSTFIX =newbyte[] {0x00};

    Scan scan =newScan();

    ResultScanner resultScanner =null;

    Table table =null;

    List list =newArrayList();

    try{

    TableName tableName = TableName.valueOf(name);

    table = connection.getTable(tableName);

    Filter filter =newPageFilter(pageSize);

    scan.setFilter(filter);

    if(lastRow !=null) {

    // 注意这里添加了POSTFIX操作,不然死循环了

    byte[] startRow = Bytes.add(lastRow, POSTFIX);

    scan.setStartRow(startRow);

    }

    resultScanner = table.getScanner(scan);

    for(Result result : resultScanner) {

    HbaseDataEntity entity =newHbaseDataEntity();

    entity.setTableName(name);

    entity.setMobileKey(newString(result.getRow()));

    Map> familyMap =newHashMap>();

    for(Cell cell : result.rawCells()) {

    if(familyMap.get(newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell.getFamilyLength())) ==null) {

    Map columnsMap =newHashMap();

    columnsMap.put(

    newString(cell.getQualifierArray(), cell

    .getQualifierOffset(), cell

    .getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    familyMap.put(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength()), columnsMap);

    }else{

    familyMap.get(

    newString(cell.getFamilyArray(), cell

    .getFamilyOffset(), cell

    .getFamilyLength())).put(

    newString(cell.getQualifierArray(),

    cell.getQualifierOffset(),

    cell.getQualifierLength()),

    newString(cell.getValueArray(), cell

    .getValueOffset(), cell

    .getValueLength()));

    }

    }

    entity.setColumns(familyMap);

    list.add(entity);

    }

    table.close();

    returnlist;

    }catch(IOException e) {

    e.printStackTrace();

    }finally{

    resultScanner.close();

    }

    returnnull;

    }

    publicstaticintgetDataByPage(String name,intpageSize) {

    finalbyte[] POSTFIX =newbyte[] {0x00};

    TableName tableName = TableName.valueOf(name);

    Table table;

    inttotalRows =0;

    try{

    table = connection.getTable(tableName);

    Filter filter =newPageFilter(pageSize);

    byte[] lastRow =null;

    while(true) {

    Scan scan =newScan();

    scan.setFilter(filter);

    if(lastRow !=null) {

    // 注意这里添加了POSTFIX操作,不然死循环了

    byte[] startRow = Bytes.add(lastRow, POSTFIX);

    scan.setStartRow(startRow);

    }

    ResultScanner scanner = table.getScanner(scan);

    intlocalRows =0;

    Result result;

    while((result = scanner.next()) !=null) {

    System.out.println(localRows++ +":"+ result);

    totalRows++;

    lastRow = result.getRow();

    }

    scanner.close();

    if(localRows ==0)

    break;

    }

    }catch(IOException e) {

    // TODO Auto-generated catch block

    e.printStackTrace();

    }

    System.out.println("total rows:"+ totalRows);

    returntotalRows;

    }

    publicstaticvoidmain(String[] args) {

    // 1、Create table

    // /String tableName = "caoShuaiTest09";

    /*

    * List columnFamilyName = new ArrayList();

    * columnFamilyName.add("info"); columnFamilyName.add("address");

    * columnFamilyName.add("score");

    *

    * createTable(tableName, columnFamilyName);

    */

    // 2、Insert data into table

    /*

    * String roeKey01 = "LiMing"; Map>

    * familyColumnMap01 = new HashMap>();

    * Map columnMap01 = new HashMap();

    * columnMap01.put("age", "23"); columnMap01.put("phone",

    * "13854285991"); familyColumnMap01.put("info", columnMap01);

    *

    * Map columnMap02 = new HashMap();

    * columnMap02.put("province", "shandong"); columnMap02.put("city",

    * "beijing"); familyColumnMap01.put("address", columnMap02);

    *

    * Map columnMap03 = new HashMap();

    * columnMap03.put("english", "80"); columnMap03.put("chinese", "100");

    * familyColumnMap01.put("score", columnMap03); int result01 =

    * addDataForTable(tableName, roeKey01, familyColumnMap01);

    * System.out.println("==result01==:" + result01);

    */

    // 3、获取结果 getResult(tableName, roeKey01);

    /*

    * String roeKey02 = "WangNing"; Map>

    * familyColumnMap01 = new HashMap>();

    * Map columnMap01 = new HashMap();

    * columnMap01.put("age", "50"); columnMap01.put("phone",

    * "13854285991"); familyColumnMap01.put("info", columnMap01);

    *

    * Map columnMap02 = new HashMap();

    * columnMap02.put("province", "shandong");

    * columnMap02.put("city","beijing"); familyColumnMap01.put("address",

    * columnMap02);

    *

    * Map columnMap03 = new HashMap();

    * columnMap03.put("english", "40"); columnMap03.put("chinese","70");

    * familyColumnMap01.put("score", columnMap03); int result01 =

    * addDataForTable(tableName, roeKey02, familyColumnMap01);

    * System.out.println("==result01==:" + result01);

    */

    // 4

    // getResultScan(tableName);

    /*

    * List parameters = new ArrayList();

    * parameters.add("info,age,EQUAL,23");

    * parameters.add("score,english,GREATER_OR_EQUAL,40");

    * QueryDataByConditionsAnd(null, tableName, parameters);

    */

    // 5

    /*

    * String newTableName = "caoShuaiTest04";

    *

    * List hbaseDatas = getResultScans(newTableName);

    *

    * System.out.println("hbaseDatas===" + hbaseDatas); for

    * (HbaseDataEntity hbaseData : hbaseDatas) {

    *

    * String rowKey = hbaseData.getMobileKey();

    *

    * Map> maps = hbaseData.getColumns();

    *

    * for (String key : maps.keySet()) {

    *

    * System.out.println("key===" + key); Map columnsMap =

    * maps.get(key);

    *

    * for (String columnsKey : columnsMap.keySet()) {

    * System.out.println("columnsKey===" + columnsKey);

    *

    * //updateTable("caoShuaiTest01", rowKey, key, columnsKey,columnsKey);

    * //deleteColumn("caoShuaiTest04", rowKey, key, columnsKey); }

    *

    * } }

    */

    /*

    * long beginTime = System.currentTimeMillis();

    * System.out.println("begin:" + beginTime);

    *

    * for (int i = 0; i < 100000000; i++) {

    *

    * long startTime = System.currentTimeMillis();

    *

    * String tableName = "caoShuaiTest06";

    *

    * String roeKey01 = "LiMing" + i; Map>

    * familyColumnMap01 = new HashMap>();

    * Map

    *

    * columnMap01 = new HashMap(); int age = i % 100 + 1;

    * columnMap01.put("age", String.valueOf(age)); columnMap01.put("phone",

    * "13854285991"); columnMap01.put("province", "shandong");

    * columnMap01.put("city", "beijing"); columnMap01.put("chinese",

    * "100"); familyColumnMap01.put("info", columnMap01);

    *

    * int result01 = addDataForTable(tableName, roeKey01,

    * familyColumnMap01); long finishedTime = System.currentTimeMillis();

    *

    * double smallTime = (finishedTime - startTime)/1000.0;

    * System.out.println("第" + i + "个花费时间" + smallTime + "s" ); }

    *

    * long endTime = System.currentTimeMillis();

    *

    * System.out.println("end:" + endTime);

    *

    * double time = (endTime - beginTime)/1000.0;

    *

    * System.out.println("all spent time:" + time + "s");

    */

    /*

    * String tableName = "caoShuaiTest10"; List hbaseDatas

    * = new ArrayList(); long startTime =

    * System.currentTimeMillis(); int k = 0; for (int i = 1; i <=

    * 100000000; i++) { HbaseDataEntity hbaseData = new HbaseDataEntity();

    * hbaseData.setTableName(tableName);

    * hbaseData.setMobileKey(String.valueOf(i)); Map

    * String>> familyMaps = new HashMap>();

    * Map columnMaps = new HashMap<>(); int age = i % 100 +

    * 1; columnMaps.put("age", String.valueOf(age));

    * columnMaps.put("phone", "13854285991"); columnMaps.put("province",

    * "shandong"); columnMaps.put("city", "beijing");

    * columnMaps.put("chinese", "100"); familyMaps.put("info", columnMaps);

    * hbaseData.setColumns(familyMaps);

    *

    * hbaseDatas.add(hbaseData);

    *

    * if(i%10000 == 0) { k ++; long time1 = System.currentTimeMillis();

    * insertDataList(hbaseDatas); hbaseDatas.clear(); long time2 =

    * System.currentTimeMillis(); double time = (time2 - time1)/1000.0;

    * System.out.println(k + "万条数据存入hbase花费时间" + time + "s" );

    *

    * } } long finishedTime = System.currentTimeMillis(); double smallTime

    * = (finishedTime - startTime)/1000.0; System.out.println("组装数据花费时间" +

    * smallTime + "s" );

    */

    /*

    * long beiginTime = System.currentTimeMillis();

    * insertDataList(hbaseDatas); long endTime =

    * System.currentTimeMillis(); double spentTime = (endTime -

    * beiginTime)/1000.0; System.out.println("数据花费时间" + spentTime + "s" );

    */

    /*

    * long beiginTime = System.currentTimeMillis(); String tableName =

    * "caoShuaiTest04"; String hbaseTableName =

    * "customer_portrait_library"; List datas =

    * getResultScans(tableName, 10000); //System.out.println(datas); long

    * endTime = System.currentTimeMillis(); double spentTime = (endTime -

    * beiginTime)/1000.0; System.out.println("数据花费时间" + spentTime + "s" );

    *

    * //System.out.println("hbaseDatas===" + datas);

    *

    * List newDatas = new ArrayList();

    *

    * long time1 = System.currentTimeMillis(); for (HbaseDataEntity

    * hbaseData : datas) {

    *

    * String rowKey = hbaseData.getMobileKey();

    *

    *

    * HbaseDataEntity newData = new HbaseDataEntity();

    * newData.setMobileKey(rowKey); newData.setTableName(hbaseTableName);

    *

    * Map> maps = hbaseData.getColumns();

    * Map> newMaps = new HashMap

    * Map>();

    *

    * for (String key : maps.keySet()) {

    *

    * Map columnsMap = maps.get(key); Map

    * newColumnsMap = new HashMap(); for (String columnsKey

    * : columnsMap.keySet()) {

    *

    * newColumnsMap.put(columnsKey, columnsKey);

    * updateTable(hbaseTableName, rowKey, key, columnsKey, columnsKey);

    * deleteColumn(tableName, rowKey, key, columnsKey); } newMaps.put(key,

    * newColumnsMap);

    *

    * } newData.setColumns(newMaps); newDatas.add(newData);

    *

    * }

    *

    * if(newDatas != null && newDatas.size() > 0) { long insertTime1 =

    * System.currentTimeMillis(); insertDataList(newDatas); long

    * insertTime2 = System.currentTimeMillis(); double insertTime =

    * (insertTime2 - insertTime1)/1000.0; System.out.println("修改数据时间" +

    * insertTime + "s" );

    *

    * long deleteTime1 = System.currentTimeMillis(); deleteDataList(datas);

    * long deleteTime2 = System.currentTimeMillis(); double deleteTime =

    * (deleteTime2 - deleteTime1)/1000.0; System.out.println("删除数据时间" +

    * deleteTime + "s" ); } long time2 = System.currentTimeMillis(); double

    * time = (time2 - time1)/1000.0; System.out.println("组装时间" + time + "s"

    * );

    */

    /*

    * List list = new ArrayList(); long insertTime1 =

    * System.currentTimeMillis(); for(int i=0; i<100000000; i++) {

    * list.add("abcdef" + i); } long insertTime2 =

    * System.currentTimeMillis(); double insertTime = (insertTime2 -

    * insertTime1)/1000.0; System.out.println("修改数据时间" + insertTime + "s"

    * );

    */

    // 7、复合条件查询

    String tableName ="caoShuaiTest01";

    List hbaseConditions =newArrayList();

    hbaseConditions.add(newHbaseConditionEntity( Bytes.toBytes("info"),

    Bytes.toBytes("age"), Bytes.toBytes("23"),

    Operator.valueOf("MUST_PASS_ALL"), CompareOp.valueOf("EQUAL")));

    hbaseConditions.add(newHbaseConditionEntity( Bytes.toBytes("score"),

    Bytes.toBytes("english"), Bytes.toBytes("80"),

    Operator.valueOf("MUST_PASS_ALL"), CompareOp.valueOf("EQUAL")));

    hbaseConditions.add(newHbaseConditionEntity( Bytes.toBytes("score"),

    Bytes.toBytes("english"), Bytes.toBytes("80"),

    Operator.valueOf("MUST_PASS_ONE"), CompareOp.valueOf("EQUAL")));

    hbaseConditions.add(newHbaseConditionEntity(

    Bytes.toBytes("address"), Bytes.toBytes("city"),

    Bytes.toBytes("beijing"),null, CompareOp.valueOf("EQUAL")));

    hbaseConditions.add(newHbaseConditionEntity( Bytes.toBytes("score"),

    Bytes.toBytes("english"), Bytes.toBytes("70"),null,

    CompareOp.valueOf("EQUAL")));

    List datas = QueryDataByConditionsAndPage(null, tableName, hbaseConditions,2,null);

    //List datas = QueryDataByConditions(null, tableName, hbaseConditions); //联合条件查询 String tableName = "caoShuaiTest01";

    /*  List parameters = new ArrayList();

    parameters.add("info,age,EQUAL,23");

    parameters.add("score,english,EQUAL,80");

    parameters.add("address,city,EQUAL,beijing"); String pm =

    "score,english,EQUAL,78";

    List datas = QueryDataByConditions(null, tableName, parameters, pm);*/

    System.out.println(datas);

    // 分页

    /*int pageSize = 1;

    String key = null;

    int dataCount = pageSize;

    String tableName = "caoShuaiTest01";

    while (dataCount == pageSize) {

    byte[] mobileKey = null;

    if (key != null) {

    mobileKey = key.getBytes();

    }

    List hbaseDatas = getHbaseDatasByPage(tableName,

    pageSize, mobileKey);

    if (hbaseDatas != null && hbaseDatas.size() > 0) {

    System.out.println(hbaseDatas);

    dataCount = hbaseDatas.size();

    key = hbaseDatas.get(dataCount - 1).getMobileKey();

    System.out.println("Key:" + key);

    } else {

    break;

    }

    }*/

    }

    }

    其中查询的结果以及查询的条件被我封装成了两个实体类:

    1、查询结果实体类:

    [java]view plaincopy

    packagecom.infobird.test1;

    importjava.util.Map;

    publicclassHbaseDataEntity {

    privateString tableName;

    privateString nameSpace;

    privateString mobileKey;

    privateMap> columns;//map>

    publicHbaseDataEntity() {

    super();

    }

    publicHbaseDataEntity(String tableName, String nameSpace,

    String mobileKey, Map> columns) {

    super();

    this.tableName = tableName;

    this.nameSpace = nameSpace;

    this.mobileKey = mobileKey;

    this.columns = columns;

    }

    publicString getTableName() {

    returntableName;

    }

    publicvoidsetTableName(String tableName) {

    this.tableName = tableName;

    }

    publicString getNameSpace() {

    returnnameSpace;

    }

    publicvoidsetNameSpace(String nameSpace) {

    this.nameSpace = nameSpace;

    }

    publicString getMobileKey() {

    returnmobileKey;

    }

    publicvoidsetMobileKey(String mobileKey) {

    this.mobileKey = mobileKey;

    }

    publicMap> getColumns() {

    returncolumns;

    }

    publicvoidsetColumns(Map> columns) {

    this.columns = columns;

    }

    @Override

    publicString toString() {

    return"HbaseDataEntity [tableName="+ tableName +", nameSpace="

    + nameSpace +", mobileKey="+ mobileKey +", columns="

    + columns +"]";

    }

    }

    2.查询条件实体类:

    [java]view plaincopy

    packagecom.infobird.test1;

    importjava.io.UnsupportedEncodingException;

    importjava.util.ArrayList;

    importjava.util.List;

    importorg.apache.hadoop.hbase.filter.CompareFilter.CompareOp;

    importorg.apache.hadoop.hbase.filter.FilterList.Operator;

    importorg.apache.hadoop.hbase.util.Bytes;

    publicclassHbaseConditionEntity {

    privatebyte[] familyColumn;

    privatebyte[] column;

    privatebyte[] value;

    privateOperator operator;

    privateCompareOp compareOp;

    publicbyte[] getFamilyColumn() {

    returnfamilyColumn;

    }

    publicvoidsetFamilyColumn(byte[] familyColumn) {

    this.familyColumn = familyColumn;

    }

    publicbyte[] getColumn() {

    returncolumn;

    }

    publicvoidsetColumn(byte[] column) {

    this.column = column;

    }

    publicbyte[] getValue() {

    returnvalue;

    }

    publicvoidsetValue(byte[] value) {

    this.value = value;

    }

    publicOperator getOperator() {

    returnoperator;

    }

    publicvoidsetOperator(Operator operator) {

    this.operator = operator;

    }

    publicCompareOp getCompareOp() {

    returncompareOp;

    }

    publicvoidsetCompareOp(CompareOp compareOp) {

    this.compareOp = compareOp;

    }

    publicHbaseConditionEntity(byte[] familyColumn,byte[] column,

    byte[] value, Operator operator, CompareOp compareOp) {

    super();

    this.familyColumn = familyColumn;

    this.column = column;

    this.value = value;

    this.operator = operator;

    this.compareOp = compareOp;

    }

    publicHbaseConditionEntity() {

    super();

    // TODO Auto-generated constructor stub

    }

    publicstaticList toHbaseConditions(String labels) {

    List hbaseConditions =newArrayList();

    String[] labelArray = labels.split(";");

    for(String labelWithCompares : labelArray) {

    String[] labelWithComparesArray = labelWithCompares.split(" ");

    String label = labelWithComparesArray[0];

    Operator compare =null;

    if(labelWithComparesArray.length >1) {

    if("and".equals(labelWithComparesArray[1])) {

    compare = Operator.MUST_PASS_ALL;

    }else{

    compare = Operator.MUST_PASS_ONE;

    }

    }

    byte[] familyColumn = Bytes.toBytes("label");

    byte[] column = Bytes.toBytes(label);

    byte[] value = Bytes.toBytes(label);

    HbaseConditionEntity hbaseCondition =newHbaseConditionEntity(

    familyColumn, column, value, compare, CompareOp.EQUAL);

    hbaseConditions.add(hbaseCondition);

    }

    returnhbaseConditions;

    }

    publicstaticvoidmain(String[] args)throwsUnsupportedEncodingException {

    String labels ="label1 and;label2 or;label3";

    List conditions = toHbaseConditions(labels);

    System.out.println("==========begin==========");

    for(HbaseConditionEntity hbaseConditionEntity : conditions) {

    System.out.println("[familyColumn: "+newString(hbaseConditionEntity.getFamilyColumn(),"UTF-8")

    +"] [column: "+newString(hbaseConditionEntity.getColumn(),"UTF-8")

    +"] [value: "+newString(hbaseConditionEntity.getValue(),"UTF-8")

    +"] [operator: "+ hbaseConditionEntity.getOperator()

    +"] [compare: "+ hbaseConditionEntity.getCompareOp() +"]");

    }

    System.out.println("==========end==========");

    }

    }

    相关文章

      网友评论

        本文标题:HBase操作

        本文链接:https://www.haomeiwen.com/subject/mnhvgttx.html