美文网首页
09. HBase数据存取API简介

09. HBase数据存取API简介

作者: 牦牛sheriff | 来源:发表于2018-09-06 12:18 被阅读0次

    HBase API
    HBase 2.0.1 API 常用类:

    使用org.apache.hadoop.hbase.client.ConnectionFactory来创建HBase数据库连接org.apache.hadoop.hbase.client.Connection;
    表名使用特定的类org.apache.hadoop.hbase.TableName 而不是字符串;
    使用Admin 新建、删除表,可通过org.apache.hadoop.hbase.client.Connection的getAdmin()方法获取;
    使用Table 定义表,可通过org.apache.hadoop.hbase.client.Connection的getTable()方法获取;
    使用Put、Get、Scan实现对表的put、get、scan操作。

    1.maven依赖

    <dependency>
    <groupId>org.apache.hbase</groupId>
    <artifactId>hbase-client</artifactId>
    <version>2.0.1</version>
    </dependency>
    2.create 操作
    Admin类的createTable方法,如下:

        public void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {
            Admin admin = null;
            try {
                admin = connection.getAdmin();
                if(admin.tableExists(tableName)){
                    logger.warn("table:{} exists!", tableName.getName());
                }else{
                    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
                    for(String columnFamily : columnFamilies) {
                        tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
                    }
                    admin.createTable(tableDescriptor);
                    logger.info("create table:{} success!", tableName.getName());
                }
            } finally {
                if(admin!=null) {
                    admin.close();
                }
            }
        }
    
    1. put操作
      api
    void put(List<Put> puts) throws IOException
    void put(Put put) throws IOException
    

    构造方法

    Put(byte[] row, boolean rowIsImmutable)
    Put(byte[] rowArray, int rowOffset, int rowLength)
    Put(byte[] rowArray, int rowOffset, int rowLength, long ts)
    Put(byte[] row, long ts)
    Put(byte[] row, long ts, boolean rowIsImmutable)
    Put(byte[] row, long ts, NavigableMap<byte[],List<Cell>> familyMap)
    Put(ByteBuffer row) 
    Put(ByteBuffer row, long ts) 
    Put(Put putToCopy)
    

    填充值

    public Put addColumn(byte[] family, byte[] qualifier, byte[] value)
    public Put addColumn(byte[] family, byte[] qualifier,  long ts, byte[] value)
    public Put addColumn(byte[] family, ByteBuffer qualifier, long ts, ByteBuffer value)
    public Put add(Cell cell) throws IOException
    

    原子检查写

    
    

    通过 org.apache.hadoop.hbase.client.Put 来操作,如下:

        /**批量插入可以使用 Table.put(List<Put> list)**/
        public void put(Connection connection, TableName tableName,
                        String rowKey, String columnFamily, String column, String data) throws IOException {
    
            Table table = null;
            try {
                table = connection.getTable(tableName);
                Put put = new Put(Bytes.toBytes(rowKey));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
                table.put(put);
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
    1. get操作
      api
    Result get(Get get) throws IOException
    Result[] get(List<Get> gets) throws IOException
    

    构造方法

    Get(byte[] row)
    Get(byte[] row, int rowOffset, int rowLength)
    Get(ByteBuffer row)
    Get(Get get)
    

    填充值

    public Get addFamily(byte[] family)
    public Get addColumn(byte[] family, byte[] qualifier)
    public Get setTimeRange(long minStamp, long maxStamp)  throws IOException
    public Get setTimestamp(long timestamp)
    public Get readAllVersions()
    public Get readVersions(int versions)  throws IOException
    

    原子检查写

    
    
        //根据row key获取表中的该行数据
        public void get(Connection connection,TableName tableName,String rowKey) throws IOException {
            Table table = null;
            try {
                table = connection.getTable(tableName);
                Get get = new Get(Bytes.toBytes(rowKey));
                Result result = table.get(get);
                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = result.getMap();
                for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : navigableMap.entrySet()){
    
                    logger.info("columnFamily:{}", Bytes.toString(entry.getKey()));
                    NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
                    for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
                        System.out.print(Bytes.toString(en.getKey())+"##");
                        NavigableMap<Long, byte[]> nm = en.getValue();
                        for(Map.Entry<Long, byte[]> me : nm.entrySet()){
                            logger.info("column key:{}, value:{}", me.getKey(), me.getValue());
                        }
                    }
                }
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
    1. scan操作
      api
    ResultScanner getScanner(Scan scan) throws IOException
    ResultScanner getScanner(byte[] family) throws IOException
    ResultScanner getScanner(byte[] family, byte[] qualifier) throws IOException
    

    构造方法

    public Scan()
    public Scan(Scan scan) throws IOException
    public Scan(Get get)
    

    填充值

    public boolean isGetScan()
    public Scan addFamily(byte[] family)
    public Scan addColumn(byte[] family, byte[] qualifier)
    public Scan setTimeRange(long minStamp, long maxStamp) throws IOException
    public Scan setTimestamp(long timestamp)
    public Scan setColumnFamilyTimeRange(byte[] cf, long minStamp, long maxStamp)
    public Scan withStartRow(byte[] startRow)
    public Scan withStartRow(byte[] startRow, boolean inclusive)
    public Scan withStopRow(byte[] stopRow)
    public Scan withStopRow(byte[] stopRow, boolean inclusive)
    

    原子检查写

    
    
        public void scan(Connection connection, TableName tableName) throws IOException {
            Table table = null;
            try {
                table = connection.getTable(tableName);
                ResultScanner rs = null;
                try {
                    //Scan scan = new Scan(Bytes.toBytes("u120000"), Bytes.toBytes("u200000"));
                    rs = table.getScanner(new Scan());
                    for(Result r:rs){
                        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = r.getMap();
                        for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : navigableMap.entrySet()){
                            logger.info("row:{} key:{}", Bytes.toString(r.getRow()), Bytes.toString(entry.getKey()));
                            NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
                            for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
                                System.out.print(Bytes.toString(en.getKey())+"##");
                                NavigableMap<Long, byte[]> ma = en.getValue();
                                for(Map.Entry<Long, byte[]>e: ma.entrySet()){
                                    System.out.print(e.getKey()+"###");
                                    System.out.println(Bytes.toString(e.getValue()));
                                }
                            }
                        }
                    }
                } finally {
                    if(rs!=null) {
                        rs.close();
                    }
                }
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
    1. delete操作
      api
    void delete(Delete delete) throws IOException
    void delete(List<Delete> deletes) throws IOException
    

    构造方法

    public Delete(byte[] row)
    public Delete(byte[] row, int rowOffset, int rowLength)
    public Delete(byte[] row, int rowOffset, int rowLength, long timestamp)
    public Delete(byte[] row, long timestamp)
    public Delete(byte[] row, long ts, NavigableMap<byte[],List<Cell>> familyMap)
    public Delete(Delete deleteToCopy) 
    

    填充值

    public Delete add(Cell cell) throws IOException
    public Delete addFamily(byte[] family)
    public Delete addFamily(byte[] family, long timestamp)
    public Delete addFamilyVersion(byte[] family, ong timestamp)
    public Delete addColumns(byte[] family, byte[] qualifier)
    public Delete addColumns(byte[] family, byte[] qualifier, long timestamp)
    public Delete addColumn(byte[] family, byte[] qualifier)
    public Delete addColumn(byte[] family, byte[] qualifier, long timestamp)
    public Delete setTimestamp(long timestamp)
    public Delete setAttribute(String name, byte[] value)
    

    原子检查写

    
    
        //删除表中的数据
        public void deleteTable(Connection connection, TableName tableName) throws IOException {
            Admin admin = null;
            try {
                admin = connection.getAdmin();
                if (admin.tableExists(tableName)) {
                    //必须先disable, 再delete
                    admin.disableTable(tableName);
                    admin.deleteTable(tableName);
                }
            } finally {
                if(admin!=null) {
                    admin.close();
                }
            }
        }
    

    测试

        public void testCrud() {
            Connection connection = null;
            try {
                connection = HBaseConnectionUtils.getConnection();
                TableName tableName = TableName.valueOf("demo");
    
                //创建HBase表
                createTable(connection, tableName, "cf1", "cf2");
    
                //put
                String rowKey = "u12000";
                put(connection, tableName, rowKey, "cf1", "name", "ricky");
                put(connection, tableName, rowKey, "cf1", "password", "root");
                put(connection, tableName, rowKey, "cf1", "age", "28");
    
                //get
                get(connection, tableName, rowKey);
    
                //scan
                scan(connection, tableName);
    
                //delete
                deleteTable(connection, tableName);
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    

    完整示例代码

    package com.mindflow.hbase.tutorials.crud;
    
    import org.apache.hadoop.hbase.HColumnDescriptor;
    import org.apache.hadoop.hbase.HTableDescriptor;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.util.Bytes;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.NavigableMap;
    
    /**
     * http://hbase.apache.org/1.2/apidocs/index.html
     *
     * @author Ricky Fung
     */
    public class HBaseCrudDemo {
        private final Logger logger = LoggerFactory.getLogger(this.getClass());
    
        public static void main(String[] args) {
    
            new HBaseCrudDemo().testCrud();
        }
    
        public void testCrud() {
            Connection connection = null;
            try {
                connection = HBaseConnectionUtils.getConnection();
                TableName tableName = TableName.valueOf("demo");
    
                //创建HBase表
                createTable(connection, tableName, "cf1", "cf2");
    
                //put
                String rowKey = "u12000";
                put(connection, tableName, rowKey, "cf1", "name", "ricky");
                put(connection, tableName, rowKey, "cf1", "password", "root");
                put(connection, tableName, rowKey, "cf1", "age", "28");
    
                //get
                get(connection, tableName, rowKey);
    
                //scan
                scan(connection, tableName);
    
                //delete
                deleteTable(connection, tableName);
    
            } catch (IOException e) {
                e.printStackTrace();
            } finally {
                try {
                    connection.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    
        public void scan(Connection connection, TableName tableName) throws IOException {
            Table table = null;
            try {
                table = connection.getTable(tableName);
                ResultScanner rs = null;
                try {
                    //Scan scan = new Scan(Bytes.toBytes("u120000"), Bytes.toBytes("u200000"));
                    rs = table.getScanner(new Scan());
                    for(Result r:rs){
                        NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = r.getMap();
                        for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : navigableMap.entrySet()){
                            logger.info("row:{} key:{}", Bytes.toString(r.getRow()), Bytes.toString(entry.getKey()));
                            NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
                            for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
                                System.out.print(Bytes.toString(en.getKey())+"##");
                                NavigableMap<Long, byte[]> ma = en.getValue();
                                for(Map.Entry<Long, byte[]>e: ma.entrySet()){
                                    System.out.print(e.getKey()+"###");
                                    System.out.println(Bytes.toString(e.getValue()));
                                }
                            }
                        }
                    }
                } finally {
                    if(rs!=null) {
                        rs.close();
                    }
                }
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
        //根据row key获取表中的该行数据
        public void get(Connection connection,TableName tableName,String rowKey) throws IOException {
            Table table = null;
            try {
                table = connection.getTable(tableName);
                Get get = new Get(Bytes.toBytes(rowKey));
                Result result = table.get(get);
                NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> navigableMap = result.getMap();
                for(Map.Entry<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> entry : navigableMap.entrySet()){
    
                    logger.info("columnFamily:{}", Bytes.toString(entry.getKey()));
                    NavigableMap<byte[], NavigableMap<Long, byte[]>> map =entry.getValue();
                    for(Map.Entry<byte[], NavigableMap<Long, byte[]>> en:map.entrySet()){
                        System.out.print(Bytes.toString(en.getKey())+"##");
                        NavigableMap<Long, byte[]> nm = en.getValue();
                        for(Map.Entry<Long, byte[]> me : nm.entrySet()){
                            logger.info("column key:{}, value:{}", me.getKey(), me.getValue());
                        }
                    }
                }
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
        /**批量插入可以使用 Table.put(List<Put> list)**/
        public void put(Connection connection, TableName tableName,
                        String rowKey, String columnFamily, String column, String data) throws IOException {
    
            Table table = null;
            try {
                table = connection.getTable(tableName);
                Put put = new Put(Bytes.toBytes(rowKey));
                put.addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(column), Bytes.toBytes(data));
                table.put(put);
            } finally {
                if(table!=null) {
                    table.close();
                }
            }
        }
    
        public void createTable(Connection connection, TableName tableName, String... columnFamilies) throws IOException {
            Admin admin = null;
            try {
                admin = connection.getAdmin();
                if(admin.tableExists(tableName)){
                    logger.warn("table:{} exists!", tableName.getName());
                }else{
                    HTableDescriptor tableDescriptor = new HTableDescriptor(tableName);
                    for(String columnFamily : columnFamilies) {
                        tableDescriptor.addFamily(new HColumnDescriptor(columnFamily));
                    }
                    admin.createTable(tableDescriptor);
                    logger.info("create table:{} success!", tableName.getName());
                }
            } finally {
                if(admin!=null) {
                    admin.close();
                }
            }
        }
    
        //删除表中的数据
        public void deleteTable(Connection connection, TableName tableName) throws IOException {
            Admin admin = null;
            try {
                admin = connection.getAdmin();
                if (admin.tableExists(tableName)) {
                    //必须先disable, 再delete
                    admin.disableTable(tableName);
                    admin.deleteTable(tableName);
                }
            } finally {
                if(admin!=null) {
                    admin.close();
                }
            }
        }
    
        public void disableTable(Connection connection, TableName tableName) throws IOException {
            Admin admin = null;
            try {
                admin = connection.getAdmin();
                if(admin.tableExists(tableName)){
                    admin.disableTable(tableName);
                }
            } finally {
                if(admin!=null) {
                    admin.close();
                }
            }
        }
    }
    

    相关文章

      网友评论

          本文标题:09. HBase数据存取API简介

          本文链接:https://www.haomeiwen.com/subject/yksowftx.html