Apache Hbase

作者: 麦田里的守望者_e392 | 来源:发表于2019-04-17 16:10 被阅读65次

    作者:jiangzz 电话:15652034180 微信:jiangzz_wx 微信公众账号:jiangzz_wy

    Hbase

    概述

    Hbase是一个基于Hadoop之上的数据库服务,该数据库是一个分布式、可扩展的大的数据仓库。当您需要对大数据进行随机,实时读/写访问时,请使用Apache HBase™(HDFS虽然可以存储海量数据,但是对数据的管理粒度比较粗糙,只支持对文件的上传下载,并不支持对文件内容行记录级别的修改)。Apache HBase是一个开源,分布式,版本化,非关系型数据库,模仿了谷歌的Bigtable,正如Bigtable利用Google文件系统提供的分布式数据存储一样,Apache HBase在Hadoop和HDFS之上提供类似Bigtable的功能。

    HBase和HDFS关系&区别?

    在这里插入图片描述

    Hbase是构建在HDFS之上的一个数据库服务,能够使得用户通过HBase数据库服务间接的操作HDFS,能够使得用户对HDFS上的数据实现CRUD操作(细粒度操作)。

    Hbase特性-官方

    • 线性和模块化扩展。
    • 严格一致 reads 和 writes.
    • 表的自动和可配置分片(自动分区)
    • RegionServers之间的自动故障转移支持。
    • 方便的基类,用于使用Apache HBase表支持Hadoop MapReduce作业。
    • 易于使用的Java API,用于客户端访问。
    • Block cache 和 Bloom Filters 以进行实时查询。

    列存储

    NoSQL:泛指非关系型数据通常包含以下类型:key-value型文档型-JSON基于列型图形关系存储。每一种NoSQL产品彼此之间没有任何关系,差异很大基本上彼此之间不能够相互替换。

    基于列型使用场景:

    在这里插入图片描述

    hbase支持存储数十亿级别的数据,但是Hbase不支持复杂查询和事物操作。因此Hbase虽然存储海量数据,但是基于海量数据的查询是非常有限的。

    列存储和行存储区别?

    在这里插入图片描述

    Hbase安装

    • 安装好HDFS,并且保证HDFS正常运行
    • 必须配置HADOOP_HOME,因为HBase需要通过该变量定位HADOOP服务
    • 安装Zookeeper(存储集群的元数据信息HMaster和HRegionServer)
    [root@CentOS ~]# tar -zxf zookeeper-3.4.6.tar.gz  -C /usr/
    [root@CentOS ~]# mkdir zkdata
    [root@CentOS ~]# touch /usr/zookeeper-3.4.6/conf/zoo.cfg
    [root@CentOS ~]# vi /usr/zookeeper-3.4.6/conf/zoo.cfg
    tickTime=2000
    dataDir=/root/zkdata
    clientPort=2181
    [root@CentOS ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh start zoo.cfg
    JMX enabled by default
    Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Starting zookeeper ... STARTED
    [root@CentOS ~]# /usr/zookeeper-3.4.6/bin/zkServer.sh status zoo.cfg
    JMX enabled by default
    Using config: /usr/zookeeper-3.4.6/bin/../conf/zoo.cfg
    Mode: standalone
    
    • 安装配置hbase
    [root@CentOS ~]# tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/
    [root@CentOS ~]# vi /usr/hbase-1.2.4/conf/hbase-site.xml
    
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
                    <name>hbase.rootdir</name>
                    <value>hdfs://CentOS:9000/hbase</value>
        </property>
        <property>
                    <name>hbase.cluster.distributed</name>
                    <value>true</value>
        </property>
        <property>
                    <name>hbase.zookeeper.quorum</name>
                    <value>CentOS</value>
        </property>
        <property>
                    <name>hbase.zookeeper.property.clientPort</name>
                    <value>2181</value>
        </property>
    </configuration>
    
    

    修改regionservers文本配置文件

    [root@CentOS ~]# vi /usr/hbase-1.2.4/conf/regionservers
    CentOS
    

    配置.bashrc文件

    [root@CentOS ~]# vi .bashrc
    
    HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.46.jar
    HADOOP_HOME=/usr/hadoop-2.6.0
    JAVA_HOME=/usr/java/latest
    PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    CLASSPATH=.
    HBASE_MANAGES_ZK=false
    export JAVA_HOME
    export PATH
    export CLASSPATH
    export HADOOP_HOME
    export HADOOP_CLASSPATH
    export HBASE_MANAGES_ZK
    [root@CentOS ~]# source .bashrc
    
    

    启动HBase服务

    [root@CentOS ~]# cd /usr/hbase-1.2.4/
    [root@CentOS hbase-1.2.4]# ./bin/start-hbase.sh
    [root@CentOS hbase-1.2.4]# jps
    1667 DataNode
    1844 SecondaryNameNode
    1429 QuorumPeerMain
    2533 Jps
    2245 HRegionServer
    2118 HMaster
    1578 NameNode
    
    

    地址栏输入:http://ip:16010 查看启动UI 界面

    在这里插入图片描述

    Hbase Shell

    • 链接Hbase Shell
    [root@CentOS hbase-1.2.4]# ./bin/hbase shell
    SLF4J: Class path contains multiple SLF4J bindings.
    SLF4J: Found binding in [jar:file:/usr/hbase-1.2.4/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: Found binding in [jar:file:/usr/hadoop-2.6.0/share/hadoop/common/lib/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
    SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
    SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
    HBase Shell; enter 'help<RETURN>' for list of supported commands.
    Type "exit<RETURN>" to leave the HBase Shell
    Version 1.2.4, rUnknown, Wed Feb 15 18:58:00 CST 2017
    
    hbase(main):001:0>
    
    

    用户可以通过help查看系统脚本命令hbase(main):001:0> help

    常用命令

    status, table_help, version, whoami

    hbase(main):003:0> status
    1 active master, 0 backup masters, 1 servers, 0 dead, 2.0000 average load
    hbase(main):004:0> whoami
    root (auth:SIMPLE)
        groups: root
    
    hbase(main):005:0> version
    1.2.4, rUnknown, Wed Feb 15 18:58:00 CST 2017
    

    namespace:命名空间等价传统数据中的database

    alter_namespace, create_namespace, describe_namespace, drop_namespace, list_namespace, list_namespace_tables

    hbase(main):007:0> create_namespace 'baizhi',{'user'=>'zs'}
    0 row(s) in 0.3920 seconds
    
    hbase(main):009:0> alter_namespace 'baizhi', {METHOD => 'set', 'sex' => 'true'}
    0 row(s) in 0.1430 seconds
    
    hbase(main):010:0> describe_namespace 'baizhi'
    DESCRIPTION
    {NAME => 'baizhi', sex => 'true', user => 'zs'}
    1 row(s) in 0.0050 seconds
    
    hbase(main):011:0>  alter_namespace 'baizhi',{METHOD => 'unset', NAME=>'sex'}
    0 row(s) in 0.1140 seconds
    
    hbase(main):013:0> list_namespace
    NAMESPACE
    baizhi
    default
    hbase
    3 row(s) in 0.1790 seconds
    
    hbase(main):015:0> list_namespace '^b.*'
    NAMESPACE
    baizhi
    1 row(s) in 0.0160 seconds
    
    hbase(main):016:0> list_namespace_tables 'hbase'
    TABLE
    meta
    namespace
    2 row(s) in 0.1510 seconds
    
    

    ddl命名:data define languge 数据定义命令,涵盖建表、建库命令

    alter, alter_async, alter_status, create, describe, disable, disable_all, drop, drop_all, enable, enable_all, exists, get_table, is_disabled, is_enabled, list, locate_region, show_filters

    hbase(main):019:0> create 'baizhi:t_user',{NAME=>'cf1',VERSIONS=>3},{NAME=>'cf2',TTL=>300}
    0 row(s) in 2.9600 seconds
    
    => Hbase::Table - baizhi:t_user
    
    hbase(main):024:0> list
    TABLE
    baizhi:t_user
    1 row(s) in 0.0560 seconds
    
    => ["baizhi:t_user"]
    
    hbase(main):028:0> disable_all 'baizhi:t_u.*'
    baizhi:t_user
    
    Disable the above 1 tables (y/n)?
    y
    1 tables successfully disabled
    
    hbase(main):029:0> drop
    drop             drop_all         drop_namespace
    hbase(main):029:0> drop_all 'baizhi:t_u.*'
    baizhi:t_user
    
    Drop the above 1 tables (y/n)?
    y
    1 tables successfully dropped
    
    hbase(main):030:0> list
    TABLE
    0 row(s) in 0.0070 seconds
    
    => []
    
    hbase(main):032:0> exists 'baizhi:t_user'
    Table baizhi:t_user does not exist
    0 row(s) in 0.0210 seconds
    
    

    dml data manage language 数据管理语言,通常是一些数据库的CRUD操作

    append, count, delete, deleteall, get, get_counter, get_splits, incr, put, scan, truncate, truncate_preserve

    
    hbase(main):001:0> count 'baizhi:t_user'
    0 row(s) in 1.8630 seconds
    
    => 0
    
    hbase(main):002:0> t = get_table 'baizhi:t_user'
    0 row(s) in 0.0000 seconds
    
    => Hbase::Table - baizhi:t_user
    hbase(main):003:0> t.count
    0 row(s) in 0.1140 seconds
    => 0
    
    

    put

    hbase(main):004:0> put 'baizhi:t_user','001','cf1:name','zhangsan'
    0 row(s) in 0.7430 seconds
    
    hbase(main):005:0> put 'baizhi:t_user','001','cf1:age',18
    0 row(s) in 0.1120 seconds
    # 修改
    hbase(main):006:0> put 'baizhi:t_user','001','cf1:age',20 
    0 row(s) in 0.0720 seconds
    
    

    get

    hbase(main):008:0> get 'baizhi:t_user','001'
    COLUMN                              CELL
     cf1:age                            timestamp=1553961219305, value=20
     cf1:name                           timestamp=1553961181804, value=zhangsan
    
    hbase(main):009:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',VERSIONS=>3}
    COLUMN                              CELL
     cf1:age                            timestamp=1553961219305, value=20
     cf1:age                            timestamp=1553961198084, value=18
     cf1:name                           timestamp=1553961181804, value=zhangsan
    3 row(s) in 0.1540 seconds
    
    hbase(main):010:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',TIMESTAMP=>1553961198084}
    COLUMN                              CELL
     cf1:age                            timestamp=1553961198084, value=18
    1 row(s) in 0.0900 seconds
    
    hbase(main):015:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',TIMERANGE=>[1553961198084,1553961219306],VERSIONS=>3}
    COLUMN                              CELL
     cf1:age                            timestamp=1553961219305, value=20
     cf1:age                            timestamp=1553961198084, value=18
    2 row(s) in 0.0180 seconds
    
    hbase(main):018:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',FILTER => "ValueFilter(=, 'binary:zhangsan')"}
    COLUMN                              CELL
     cf1:name                           timestamp=1553961181804, value=zhangsan
    1 row(s) in 0.0550 seconds
    
    hbase(main):019:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',FILTER => "ValueFilter(=, 'substring:zhang')"}
    COLUMN                              CELL
     cf1:name                           timestamp=1553961181804, value=zhangsan
    1 row(s) in 0.0780 seconds
    

    delete/deleteall

    # 删除指定版本之前的所以cell
    hbase(main):027:0> delete 'baizhi:t_user','001','cf1:age',1553961899630
    0 row(s) in 0.1020 seconds
    # 删除cf1:age的所有单元格
    hbase(main):031:0> delete 'baizhi:t_user','001','cf1:age'
    0 row(s) in 0.0180 seconds
    
    hbase(main):034:0> deleteall 'baizhi:t_user','001'
    0 row(s) in 0.0360 seconds
    
    hbase(main):035:0> t.count
    0 row(s) in 0.0450 seconds
    => 0
    hbase(main):036:0> get 'baizhi:t_user','001',{COLUMN=>'cf1',VERSIONS=>3}
    COLUMN                              CELL
    0 row(s) in 0.0130 seconds
    
    

    scan

    hbase(main):045:0> scan 'baizhi:t_user'
    ROW                                 COLUMN+CELL
     001                                column=cf1:age, timestamp=1553962118964, value=21
     001                                column=cf1:name, timestamp=1553962147916, value=zs
     002                                column=cf1:age, timestamp=1553962166894, value=19
     002                                column=cf1:name, timestamp=1553962157743, value=ls
     003                                column=cf1:name, timestamp=1553962203754, value=zl
     005                                column=cf1:age, timestamp=1553962179379, value=19
     005                                column=cf1:name, timestamp=1553962192054, value=ww
    
    hbase(main):054:0> scan 'baizhi:t_user',{ LIMIT => 2,STARTROW=>"003",REVERSED=>true}
    ROW                                 COLUMN+CELL
     003                                column=cf1:name, timestamp=1553962203754, value=zl
     002                                column=cf1:age, timestamp=1553962166894, value=19
     002                                column=cf1:name, timestamp=1553962157743, value=ls
    
    hbase(main):058:0> scan 'baizhi:t_user',{ LIMIT => 2,STARTROW=>"003",REVERSED=>true,VERSIONS=>3,TIMERANGE=>[1553962157743,1553962203790]}
    ROW                                 COLUMN+CELL
     003                                column=cf1:name, timestamp=1553962203754, value=zl
     002                                column=cf1:age, timestamp=1553962166894, value=19
     002                                column=cf1:name, timestamp=1553962157743, value=ls
    2 row(s) in 0.0810 seconds
    
    

    truncate- 截断表

    hbase(main):072:0> truncate 'baizhi:t_user'
    Truncating 'baizhi:t_user' table (it may take a while):
     - Disabling table...
    
    
    

    Java API操作HBase

    maven

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.4</version>
    </dependency>
    

    创建和Hbase链接参数

    private static Admin admin;//负责执行DDL
    private static Connection conn;//负责执行DML
    static {
        try {
            Configuration conf = new Configuration();
            conf.set("hbase.zookeeper.quorum","CentOS");
            conn= ConnectionFactory.createConnection(conf);
            admin=conn.getAdmin();
    
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    public static void close() throws IOException {
        admin.close();
        conn.close();
    }
    

    Namespace操作

    //创建
    NamespaceDescriptor nd = NamespaceDescriptor.create("zpark")
        .addConfiguration("user","zhansgan")
        .build();
    admin.createNamespace(nd);
    //查看
    NamespaceDescriptor[] nds = admin.listNamespaceDescriptors();
    for (NamespaceDescriptor nd : nds) {
        System.out.println(nd.getName());
    }
    //删除
     admin.deleteNamespace("zpark");
    

    Table先关操作(重点)

    TableName tname=TableName.valueOf("zpark:t_user");
    HTableDescriptor td = new HTableDescriptor(tname);
    
    //构建cf1、cf2
    HColumnDescriptor cf1 = new HColumnDescriptor("cf1");
    cf1.setMaxVersions(3);
    //设置ROW+COL索引方式,比默认ROW占用更多的内存信息
    cf1.setBloomFilterType(BloomType.ROWCOL);
    
    HColumnDescriptor cf2 = new HColumnDescriptor("cf2");
    //设置失效时常5min
    cf2.setTimeToLive(300);
    cf2.setInMemory(true);
    
    //设置column family
    td.addFamily(cf1);
    td.addFamily(cf2);
    
    
    admin.createTable(td);
    

    数据的DML(重点)

    //2.447 秒
    TableName tname = TableName.valueOf("zpark:t_user");
    Table table = conn.getTable(tname);
    //构建PUT指令
    for(int i=0;i<1000;i++){
        DecimalFormat df = new DecimalFormat("0000");
        String rowKey = df.format(i);
    
        Put put=new Put(rowKey.getBytes());
        put.addColumn("cf1".getBytes(),"name".getBytes(), Bytes.toBytes("USER"+rowKey));
        put.addColumn("cf1".getBytes(),"age".getBytes(), Bytes.toBytes(i+""));
        put.addColumn("cf1".getBytes(),"sex".getBytes(), Bytes.toBytes((i%4==0)+""));
        put.addColumn("cf1".getBytes(),"salary".getBytes(), Bytes.toBytes(1000+(i/100.0)*100+""));
    
        table.put(put);
    }
    table.close();
    

    批量插入

    TableName tname = TableName.valueOf("zpark:t_user");
    BufferedMutator bufferedMutator=conn.getBufferedMutator(tname);
    //构建PUT指令 0.549 秒
    long begin=System.currentTimeMillis();
    for(int i=0;i<1000;i++){
        DecimalFormat df = new DecimalFormat("0000");
        String rowKey = df.format(i);
    
        Put put=new Put(rowKey.getBytes());
        put.addColumn("cf1".getBytes(),"name".getBytes(), Bytes.toBytes("USER"+rowKey));
        put.addColumn("cf1".getBytes(),"age".getBytes(), Bytes.toBytes(i+""));
        put.addColumn("cf1".getBytes(),"sex".getBytes(), Bytes.toBytes((i%4==0)+""));
        put.addColumn("cf1".getBytes(),"salary".getBytes(), Bytes.toBytes(1000+(i/100.0)*100+""));
    
        bufferedMutator.mutate(put);
        if(i%500==0){
            bufferedMutator.flush();
        }
    }
    long end=System.currentTimeMillis();
    bufferedMutator.close();
    System.out.println(((end-begin)/1000.0)+" 秒");
    

    GET

    TableName tname = TableName.valueOf("zpark:t_user");
    Table table = conn.getTable(tname);
    
    Get get=new Get("0010".getBytes());
    
    Result result = table.get(get);
    while (result.advance()){
        Cell cell = result.current();
        String row = Bytes.toString(CellUtil.cloneRow(cell));
        String cf = Bytes.toString(CellUtil.cloneFamily(cell));
        String col = Bytes.toString(CellUtil.cloneQualifier(cell));
        String v = Bytes.toString(CellUtil.cloneValue(cell));
        long ts=cell.getTimestamp();
        System.out.println(row+"=>"+cf+":"+col+"\t"+v+" ts:"+ts);
    }
    table.close();
    
    TableName tname = TableName.valueOf("zpark:t_user");
    Table table = conn.getTable(tname);
    
    Get get=new Get("0010".getBytes());
    
    Result result = table.get(get);
    String row=Bytes.toString(result.getRow());
    String name = Bytes.toString(result.getValue("cf1".getBytes(), "name".getBytes()));
    String age = Bytes.toString(result.getValue("cf1".getBytes(), "age".getBytes()));
    String sex = Bytes.toString(result.getValue("cf1".getBytes(), "sex".getBytes()));
    String salary = Bytes.toString(result.getValue("cf1".getBytes(), "salary".getBytes()));
    System.out.println(row+"\t"+name+" "+age+" "+sex+" "+salary);
    table.close();
    

    Scan

    TableName tname = TableName.valueOf("zpark:t_user");
    Table table = conn.getTable(tname);
    Scan scan = new Scan();
    
    scan.setStartRow("0000".getBytes());
    scan.setStopRow("0200".getBytes());
    scan.addFamily("cf1".getBytes());
    Filter filter1=new RowFilter(CompareFilter.CompareOp.EQUAL,new RegexStringComparator("09$"));
    Filter filter2=new RowFilter(CompareFilter.CompareOp.EQUAL,new SubstringComparator("80"));
    FilterList filter=new FilterList(FilterList.Operator.MUST_PASS_ONE,filter1,filter2);
    scan.setFilter(filter);
    
    ResultScanner rs = table.getScanner(scan);
    
    for (Result result : rs) {
        String row=Bytes.toString(result.getRow());
        String name = Bytes.toString(result.getValue("cf1".getBytes(), "name".getBytes()));
        String age = Bytes.toString(result.getValue("cf1".getBytes(), "age".getBytes()));
        String sex = Bytes.toString(result.getValue("cf1".getBytes(), "sex".getBytes()));
        String salary = Bytes.toString(result.getValue("cf1".getBytes(), "salary".getBytes()));
        System.out.println(row+"\t"+name+" "+age+" "+sex+" "+salary);
    }
    
    table.close();
    

    MapReduce 集成 Hbase(重点)

    Jar包依赖

    Hbase 0.90.x版本以后,程序可以自主解决运行时依赖,底层通过conf.set("tmpjars",'....'),所以用户无需使用-libjars参数,但是用户需要解决系统的提交依赖,因为系统如果读取HBase上的数据在任务初期需要计算任务切片,此时需要配置HADOOP_CLASSPATH

    [root@CentOS ~]# vi .bashrc
    
    HADOOP_HOME=/usr/hadoop-2.6.0
    JAVA_HOME=/usr/java/latest
    PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    CLASSPATH=.
    HBASE_MANAGES_ZK=false
    export JAVA_HOME
    export PATH
    export CLASSPATH
    export HADOOP_HOME
    export HBASE_MANAGES_ZK
    HADOOP_CLASSPATH=/root/mysql-connector-java-5.1.46.jar:`/usr/hbase-1.2.4/bin/hbase classpath`
    export HADOOP_CLASSPATH
    [root@CentOS ~]# source .bashrc
    

    Maven

    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.6.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.6.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-jobclient</artifactId>
        <version>2.6.0</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-mapreduce-client-core</artifactId>
        <version>2.6.0</version>
    </dependency>
    <!--Hbase依赖-->
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-client</artifactId>
        <version>1.2.4</version>
    </dependency>
    
    <dependency>
        <groupId>org.apache.hbase</groupId>
        <artifactId>hbase-server</artifactId>
        <version>1.2.4</version>
    </dependency>
    

    任务提交

    public class CustomJobsubmitter extends Configured implements Tool {
        public int run(String[] args) throws Exception {
            //1.创建Job实例
            Configuration conf = getConf();
            //开启Map端压缩
            conf.setBoolean("mapreduce.map.output.compress",true);
            conf.setClass("mapreduce.map.output.compress.codec", GzipCodec.class, CompressionCodec.class);
            //设置hbase的链接参数
            conf.set("hbase.zookeeper.quorum","CentOS");
    
            Job job=Job.getInstance(conf);
    
            job.setJarByClass(CustomJobsubmitter.class);
    
            //2.设置数据读入和写出格式化
            job.setInputFormatClass(TableInputFormat.class);
            job.setOutputFormatClass(TableOutputFormat.class);
            Scan scan = new Scan();
            scan.addFamily("cf1".getBytes());
            TableMapReduceUtil.initTableMapperJob(
                "zpark:t_user",
                scan,
                UserMapper.class,
                Text.class,
                DoubleWritable.class,
                job);
    
            TableMapReduceUtil.initTableReducerJob(
                "zpark:t_result",
                UserReducer.class,
                job
            );
            job.setNumReduceTasks(1);
            job.setCombinerClass(UserCombiner.class);
    
            job.waitForCompletion(true);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            ToolRunner.run(new CustomJobsubmitter(),args);
        }
    }
    
    

    UserMapper

    public class UserMapper extends TableMapper<Text, DoubleWritable> {
        @Override
        protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
            String sex = Bytes.toString(value.getValue("cf1".getBytes(), "sex".getBytes()));
            Double salary = Double.parseDouble(Bytes.toString(value.getValue("cf1".getBytes(), "salary".getBytes())));
            context.write(new Text(sex),new DoubleWritable(salary));
        }
    }
    
    

    UserReduce

    public class UserReducer extends TableReducer<Text, DoubleWritable,NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double totalSalary=0.0;
            for (DoubleWritable value : values) {
                totalSalary+=value.get();
            }
            Put put =new Put(key.getBytes());
            put.addColumn("cf1".getBytes(),"totalSalary".getBytes(), Bytes.toBytes(totalSalary+""));
            context.write(null,put);
        }
    }
    

    UserCombiner

    public class UserCombiner extends Reducer<Text, DoubleWritable,Text, DoubleWritable> {
        @Override
        protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
            double totalSalary=0.0;
            for (DoubleWritable value : values) {
                totalSalary+=value.get();
            }
            context.write(key,new DoubleWritable(totalSalary));
        }
    }
    

    HBase集群构建

    • 保证所有物理主机的时钟同步,否则集群搭建失败
    [root@CentOSX ~]# date -s '2019-04-01 16:24:00'
    Mon Apr  1 16:24:00 CST 2019
    [root@CentOSX ~]# clock -w
    
    • 确保HDFS正常启动(参考HDFS集群构建)
    • 搭建HBase集群
    [root@CentOSX ~]# tar -zxf hbase-1.2.4-bin.tar.gz -C /usr/
    [root@CentOSX ~]# vi /usr/hbase-1.2.4/conf/hbase-site.xml
    
    <?xml version="1.0"?>
    <?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
    <configuration>
        <property>
                    <name>hbase.rootdir</name>
                    <value>hdfs://mycluster/hbase</value>
        </property>
        <property>
                    <name>hbase.cluster.distributed</name>
                    <value>true</value>
        </property>
        <property>
                    <name>hbase.zookeeper.quorum</name>
                    <value>CentOSA,CentOSB,CentOSC</value>
        </property>
        <property>
                    <name>hbase.zookeeper.property.clientPort</name>
                    <value>2181</value>
        </property>
    </configuration>
    
    
    • 修改RegionServers
    [root@CentOSX ~]# vi /usr/hbase-1.2.4/conf/regionservers
    CentOSA
    CentOSB
    CentOSC
    
    • 修改环境变量
    [root@CentOS ~]# vi .bashrc
    
    HADOOP_HOME=/usr/hadoop-2.6.0
    JAVA_HOME=/usr/java/latest
    PATH=$PATH:$JAVA_HOME/bin:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
    CLASSPATH=.
    export JAVA_HOME
    export PATH
    export CLASSPATH
    export HADOOP_HOME
    
    HBASE_MANAGES_ZK=false
    HADOOP_CLASSPATH=`/usr/hbase-1.2.4/bin/hbase classpath`
    export HBASE_MANAGES_ZK
    export HADOOP_CLASSPATH
    
    [root@CentOS ~]# source .bashrc
    
    • 启动Hbase服务
    [root@CentOSX hbase-1.2.4]# ./bin/hbase-daemon.sh start master
    [root@CentOSX hbase-1.2.4]# ./bin/hbase-daemon.sh start regionserver
    

    更多精彩内容关注

    微信公众账号

    相关文章

      网友评论

        本文标题:Apache Hbase

        本文链接:https://www.haomeiwen.com/subject/ggunwqtx.html