HBase使用

作者: 若与 | 来源:发表于2019-06-24 22:51 被阅读6次

    一、HBase介绍

    1、基本概念

    HBase是一种Hadoop数据库,经常被描述为一种稀疏的,分布式的,持久化的,多维有序映射,它基于行键、列键和时间戳建立索引,是一个可以随机访问的存储和检索数据的平台。HBase不限制存储的数据的种类,允许动态的、灵活的数据模型,不用SQL语言,也不强调数据之间的关系。HBase被设计成在一个服务器集群上运行,可以相应地横向扩展。

    2、HBase使用场景和成功案例

    • 互联网搜索问题:爬虫收集网页,存储到BigTable里,MapReduce计算作业扫描全表生成搜索索引,从BigTable中查询搜索结果,展示给用户。
    • 抓取增量数据:例如,抓取监控指标,抓取用户交互数据,遥测技术,定向投放广告等
    • 内容服务
    • 信息交互

    上面简单介绍一下hbase, 至于hbase的原理,以及架构, 后面我整理完, 再发出来。 现在只是对hbase会使用。 就先从使用开始入门。

    二、 HBase使用

    hbase是数据库, 数据库那就是存储数据的, 那就离不开curd.
    类似mysql, 有shell客户端以及语言的sdk方式。

    2.1 HBASE shell

    hbase shell 类似mysql的客户端

    help可以查看所有的命名帮助

    下面是命令分组:

    COMMAND GROUPS:
      Group name: general
      Commands: processlist, status, table_help, version, whoami
    
      Group name: ddl
      Commands: alter, alter_async, alter_status, create, create_layered, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, list_regions, locate_region, show_filters
    
      Group name: namespace
      Commands: alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables
    
      Group name: dml
      Commands: append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve
    
      Group name: tools
      Commands: assign, balance_switch, balancer, balancer_enabled, catalogjanitor_enabled, catalogjanitor_run, catalogjanitor_switch, cleaner_chore_enabled, cleaner_chore_run, cleaner_chore_switch, clear_block_cache, clear_compaction_queues, clear_deadservers, close_region, compact, compact_rs, compaction_state, flush, is_in_maintenance_mode, list_deadservers, major_compact, merge_region, move, normalize, normalizer_enabled, normalizer_switch, split, splitormerge_enabled, splitormerge_switch, trace, unassign, wal_roll, zk_dump
    
      Group name: replication
      Commands: add_peer, append_peer_namespaces, append_peer_tableCFs, disable_peer, disable_table_replication, enable_peer, enable_table_replication, get_peer_config, list_peer_configs, list_peers, list_replicated_tables, remove_peer, remove_peer_namespaces, remove_peer_tableCFs, set_peer_bandwidth, set_peer_exclude_namespaces, set_peer_exclude_tableCFs, set_peer_namespaces, set_peer_replicate_all, set_peer_tableCFs, show_peer_tableCFs, update_peer_config
    
      Group name: snapshots
      Commands: clone_snapshot, delete_all_snapshot, delete_snapshot, delete_table_snapshots, list_snapshots, list_table_snapshots, restore_snapshot, snapshot
    
      Group name: configuration
      Commands: update_all_config, update_config
    
      Group name: quotas
      Commands: list_quota_snapshots, list_quota_table_sizes, list_quotas, list_snapshot_sizes, set_quota
    
      Group name: security
      Commands: grant, list_security_capabilities, revoke, user_permission
    
      Group name: procedures
      Commands: abort_procedure, list_locks, list_procedures
    
      Group name: visibility labels
      Commands: add_labels, clear_auths, get_auths, list_labels, set_auths, set_visibility
    
      Group name: rsgroup
      Commands: add_rsgroup, balance_rsgroup, get_rsgroup, get_server_rsgroup, get_table_rsgroup, list_rsgroups, move_namespaces_rsgroup, move_servers_namespaces_rsgroup, move_servers_rsgroup, move_servers_tables_rsgroup, move_tables_rsgroup, remove_rsgroup, remove_servers_rsgroup
    
    

    1. 常规命名:

    1. 集群状态 status
    hbase(main):005:0> status
    1 active master, 0 backup masters, 1 servers, 0 dead, 793.0000 average load
    Took 0.9453 seconds
    
    1. 版本 version
    hbase(main):006:0> version
    2.0.2, rc6f16dff66b5d7c4fb66d3bf7eda4f56515c63f3, Fri Jan 25 19:23:41 CST 2019
    Took 0.0004 seconds
    

    2. DDL命令

    命令 命令含义 命令使用示例
    alter 修改表的列族的描述属性 aliter 't1',NAME => 'f1',VERSIONS => 5
    alter_async 异步修改表的列族的描述属性,并不需要等待所有Region都完成操作。用法和alter命令相同 alter_async 't1',NAME => 'f1',VERSIONS => 5
    alter_status 获取alter命令的状态,会标注已经有多少region更改了Schema。 命令的参数是表名 alter_status 't1'
    create 创建表 create 't1' ,{NAME => 'f1', VERSIONS => 5}; create 't1','f1','f2', 'f3'
    describe 获取表的元数据信息和是否可用的的状态 describe 't1'
    disable 禁用某个表 disable 't1'
    disable_all 禁用所有正则匹配的表 disable_all 't1.*'
    drop 删除表 drop 't1'
    enable 启用表 enable 't1'
    enable_all 启用正则匹配的表 enable_all 't1.*'
    exists 判断表是否存在 exists 't1'
    is_disable 判断表是否是禁用的 is_disable 't1'
    is_enbale 判断表是否是启用的 is_disable 't1'
    show_filter 查看所支持的所有过滤器的名称 show_filters
    list 列出所有表的名称 list

    DML

    1. count
      统计表的总行数
    count 't1'
    count 't1', INTERVAL => 1000
    count 't1', CACHE => 1000,
    count 't1', INTERVAL => 10, CACHE => 1000
    
    
    1. delete
      删除一个单元格
    delete 't1', 'r1', 'c1', ts1
    
    1. deleteall
      删除一行或一列
    deleteall 't1','r1'
    deleteall 't1','r1','c1'
    deleteall 't1', 'r1','c1', ts1
    
    1. get
      单行读
      hbase> get 'ns1:t1', 'r1'
      hbase> get 't1', 'r1'
      hbase> get 't1', 'r1', {TIMERANGE => [ts1, ts2]}
      hbase> get 't1', 'r1', {COLUMN => 'c1'}
      hbase> get 't1', 'r1', {COLUMN => ['c1', 'c2', 'c3']}
      hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1}
      hbase> get 't1', 'r1', {COLUMN => 'c1', TIMERANGE => [ts1, ts2], VERSIONS => 4}
      hbase> get 't1', 'r1', {COLUMN => 'c1', TIMESTAMP => ts1, VERSIONS => 4}
      hbase> get 't1', 'r1', {FILTER => "ValueFilter(=, 'binary:abc')"}
      hbase> get 't1', 'r1', 'c1'
      hbase> get 't1', 'r1', 'c1', 'c2'
      hbase> get 't1', 'r1', ['c1', 'c2']
      hbase> get 't1', 'r1', {COLUMN => 'c1', ATTRIBUTES => {'mykey'=>'myvalue'}}
      hbase> get 't1', 'r1', {COLUMN => 'c1', AUTHORIZATIONS => ['PRIVATE','SECRET']}
      hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE'}
      hbase> get 't1', 'r1', {CONSISTENCY => 'TIMELINE', REGION_REPLICA_ID => 1}
    
    1. get_counter
      读取计数器
      hbase> get_counter 'ns1:t1', 'r1', 'c1'
      hbase> get_counter 't1', 'r1', 'c1'
    
    1. incr
      自增写入
      hbase> incr 'ns1:t1', 'r1', 'c1'
      hbase> incr 't1', 'r1', 'c1'
      hbase> incr 't1', 'r1', 'c1', 1
      hbase> incr 't1', 'r1', 'c1', 10
      hbase> incr 't1', 'r1', 'c1', 10, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
      hbase> incr 't1', 'r1', 'c1', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
      hbase> incr 't1', 'r1', 'c1', 10, {VISIBILITY=>'PRIVATE|SECRET'}
    
    
    1. put
      数据写入
      hbase> put 'ns1:t1', 'r1', 'c1', 'value'
      hbase> put 't1', 'r1', 'c1', 'value'
      hbase> put 't1', 'r1', 'c1', 'value', ts1
      hbase> put 't1', 'r1', 'c1', 'value', {ATTRIBUTES=>{'mykey'=>'myvalue'}}
      hbase> put 't1', 'r1', 'c1', 'value', ts1, {ATTRIBUTES=>{'mykey'=>'myvalue'}}
      hbase> put 't1', 'r1', 'c1', 'value', ts1, {VISIBILITY=>'PRIVATE|SECRET'}
    
    1. scan
      扫描表
      hbase> scan 'hbase:meta'
      // 显示指定列
      hbase> scan 'hbase:meta', {COLUMNS => 'info:regioninfo'}
      
      // limit start
      hbase> scan 'ns1:t1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
      hbase> scan 't1', {COLUMNS => ['c1', 'c2'], LIMIT => 10, STARTROW => 'xyz'}
      
     //  时间范围
      hbase> scan 't1', {COLUMNS => 'c1', TIMERANGE => [1303668804000, 1303668904000]}
      hbase> scan 't1', {REVERSED => true}
      hbase> scan 't1', {ALL_METRICS => true}
      hbase> scan 't1', {METRICS => ['RPC_RETRIES', 'ROWS_FILTERED']}
      
      // 使用过滤器, show_filters查看所有可以使用的过滤器
      hbase> scan 't1', {ROWPREFIXFILTER => 'row2', FILTER => "
        (QualifierFilter (>=, 'binary:xyz')) AND (TimestampsFilter ( 123, 456))"}
      hbase> scan 't1', {FILTER =>
        org.apache.hadoop.hbase.filter.ColumnPaginationFilter.new(1, 0)}
      hbase> scan 't1', {CONSISTENCY => 'TIMELINE'}
    For setting the Operation Attributes
      hbase> scan 't1', { COLUMNS => ['c1', 'c2'], ATTRIBUTES => {'mykey' => 'myvalue'}}
      hbase> scan 't1', { COLUMNS => ['c1', 'c2'], AUTHORIZATIONS => ['PRIVATE','SECRET']}
    
    1. truncate
      清空表
    truncate 't1'
    

    还有其他命令, 就不多介绍了, 自己使用 help查看了

    2.2 go操作 hbase

    介绍一下go操作hbase

    Install

    go get github.com/tsuna/gohbase
    

    Create a client

    client := gohbase.NewClient("localhost")
    

    Insert a cell

    // Values maps a ColumnFamily -> Qualifiers -> Values.
    values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
    putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values)
    rsp, err := client.Put(putRequest)
    

    Get an entire row

    getRequest, err := hrpc.NewGetStr(context.Background(), "table", "row")
    getRsp, err := client.Get(getRequest)
    

    Get a specific cell

    // Perform a get for the cell with key "15", column family "cf" and qualifier "a"
    family := map[string][]string{"cf": []string{"a"}}
    getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
        hrpc.Families(family))
    getRsp, err := client.Get(getRequest)
    

    Get a specific cell with a filter

    pFilter := filter.NewKeyOnlyFilter(true)
    family := map[string][]string{"cf": []string{"a"}}
    getRequest, err := hrpc.NewGetStr(context.Background(), "table", "15",
        hrpc.Families(family), hrpc.Filters(pFilter))
    getRsp, err := client.Get(getRequest)
    

    Scan with a filter

    pFilter := filter.NewPrefixFilter([]byte("7"))
    scanRequest, err := hrpc.NewScanStr(context.Background(), "table",
            hrpc.Filters(pFilter))
    scanRsp, err := client.Scan(scanRequest)
    

    我们看一下代码架构

    ├── AUTHORS
    ├── COPYING
    ├── Makefile
    ├── README.md
    ├── admin_client.go
    ├── caches.go
    ├── check_line_len.awk
    ├── client.go
    ├── discovery_test.go
    ├── filter
    ├── hrpc
    ├── install_ci.sh
    ├── integration_test.go
    ├── metacache_test.go
    ├── pb
    ├── region
    ├── rpc.go
    ├── rpc_test.go
    ├── scanner.go
    ├── scanner_test.go
    ├── table_test.go
    ├── test
    └── zk
    
    

    上面代码的整理的很有条理,
    hrpc主要是rpc调用的方法
    filter是get或scan的filter过滤器
    region是 region的一些接口
    cache是缓存,hbase中为了提高性能,很多地方都采用cache方式。
    zk就是zookeeper相关的。

    我们下面阅读以下源码
    gohbase操作的入口主要是 clientadmin_client

    我们围绕 clientadmin_client

    // AdminClient to perform admistrative operations with HMaster
    type AdminClient interface {
        CreateTable(t *hrpc.CreateTable) error
        DeleteTable(t *hrpc.DeleteTable) error
        EnableTable(t *hrpc.EnableTable) error
        DisableTable(t *hrpc.DisableTable) error
        ClusterStatus() (*pb.ClusterStatus, error)
    }
    
    
    
    // CreateTable represents a CreateTable HBase call
    type CreateTable struct {
        base
    
        families  map[string]map[string]string
        splitKeys [][]byte
    }
    
    // NewCreateTable creates a new CreateTable request that will create the given
    // table in HBase. 'families' is a map of column family name to its attributes.
    // For use by the admin client.
    func NewCreateTable(ctx context.Context, table []byte,
        families map[string]map[string]string,
        options ...func(*CreateTable)) *CreateTable {
        ct := &CreateTable{
            base: base{
                table:    table,
                ctx:      ctx,
                resultch: make(chan RPCResult, 1),
            },
            families: make(map[string]map[string]string, len(families)),
        }
        for _, option := range options {
            option(ct)
        }
        for family, attrs := range families {
            ct.families[family] = make(map[string]string, len(defaultAttributes))
            for k, dv := range defaultAttributes {
                if v, ok := attrs[k]; ok {
                    ct.families[family][k] = v
                } else {
                    ct.families[family][k] = dv
                }
            }
        }
        return ct
    }
    

    主要是DDL

    再看 client

    // Client a regular HBase client
    type Client interface {
        Scan(s *hrpc.Scan) hrpc.Scanner
        Get(g *hrpc.Get) (*hrpc.Result, error)
        Put(p *hrpc.Mutate) (*hrpc.Result, error)
        Delete(d *hrpc.Mutate) (*hrpc.Result, error)
        Append(a *hrpc.Mutate) (*hrpc.Result, error)
        Increment(i *hrpc.Mutate) (int64, error)
        CheckAndPut(p *hrpc.Mutate, family string, qualifier string,
            expectedValue []byte) (bool, error)
        Close()
    }
    
    

    主要是DML相关的。
    我们看一下put

    // NewPut creates a new Mutation request to insert the given
    // family-column-values in the given row key of the given table.
    func NewPut(ctx context.Context, table, key []byte,
        values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
        m, err := baseMutate(ctx, table, key, values, options...)
        if err != nil {
            return nil, err
        }
        m.mutationType = pb.MutationProto_PUT
        return m, nil
    }
    
    // NewPutStr is just like NewPut but takes table and key as strings.
    func NewPutStr(ctx context.Context, table, key string,
        values map[string]map[string][]byte, options ...func(Call) error) (*Mutate, error) {
        return NewPut(ctx, []byte(table), []byte(key), values, options...)
    }
    
    
    

    其中

    // baseMutate returns a Mutate struct without the mutationType filled in.
    func baseMutate(ctx context.Context, table, key []byte, values map[string]map[string][]byte,
        options ...func(Call) error) (*Mutate, error) {
        m := &Mutate{
            base: base{
                table:    table,
                key:      key,
                ctx:      ctx,
                resultch: make(chan RPCResult, 1),
            },
            values:    values,
            timestamp: MaxTimestamp,
        }
        err := applyOptions(m, options...)
        if err != nil {
            return nil, err
        }
        return m, nil
    }
    
    
    // 注意
    func applyOptions(call Call, options ...func(Call) error) error {
        call.(withOptions).setOptions(options)
        for _, option := range options {
            err := option(call)
            if err != nil {
                return err
            }
        }
        return nil
    }
    

    其中option的使用如下:

    
        client := gohbase.NewClient("localhost")
    
        pFilter := filter.NewKeyOnlyFilter(true)
        family := map[string][]string{"cf": []string{"a"}}
        getRequest, _ := hrpc.NewGetStr(context.Background(), "table", "15",
            hrpc.Families(family), hrpc.Filters(pFilter), hrpc.MaxVersions(2))
        _, _ := client.Get(getRequest)
    
    
    
        values := map[string]map[string][]byte{"cf": map[string][]byte{"a": []byte{0}}}
        putRequest, err := hrpc.NewPutStr(context.Background(), "table", "key", values, hrpc.Timestamp(time.Time{}), hrpc.MaxVersions(1))
        rsp, err := client.Put(putRequest)
    }
    

    相关文章

      网友评论

        本文标题:HBase使用

        本文链接:https://www.haomeiwen.com/subject/dggaqctx.html