美文网首页Hadoop
107.HBase之Endpoint Coprocessor的调

107.HBase之Endpoint Coprocessor的调

作者: 大勇任卷舒 | 来源:发表于2022-02-28 09:42 被阅读0次

    Endpoint Coprocessor客户端调用过程:


    107.1 演示环境介绍

    • CM版本:5.14.3
    • CDH版本:5.14.3
    • 编写示例代码及运行为Java

    107.2 操作演示

    HBase中自带的Endpoint的协处理器,所以首先确认hbase-examples-1.2.0-cdh5.14.2.jar是否在

    [root@ip-168-31-8-230 lib]# pwd
    /opt/cloudera/parcels/CDH/lib/hbase/lib
    [root@ip-168-31-8-230 lib]# ll hbase-examples-1.2.0-cdh5.14.2.jar 
    

    生成TestTable测试表及数据

    [root@ip-168-31-8-230 ~]# hbase pe --rows=10000000 randomWrite 1
    
    • 登录CM进入HBase服务进行配置
      • 配置自定义的Endpoint类,因为Endpoint类型的Coprocessor运行在HBase 的RegionServer中,所以这里只需要配置HBase Coprocessor Region类
      • 在这里的配置为全局配置,协处理器有两种使用方式上图的方式是其中的一种,另外一种则是对单个表进行修改
    org.apache.hadoop.hbase.coprocessor.example.RowCountEndpoint
    
    • pom.xml文件内容如下
    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <parent>
            <artifactId>cdh-project</artifactId>
            <groupId>com.cloudera</groupId>
            <version>1.0-SNAPSHOT</version>
        </parent>
        <modelVersion>4.0.0</modelVersion>
        <artifactId>hbase-demo</artifactId>
        <packaging>jar</packaging>
        <name>hbase-demo</name>
        <url>http://maven.apache.org</url>
        <properties>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        </properties>
        <dependencies>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.6.0-cdh5.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.6.0-cdh5.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-client</artifactId>
                <version>1.2.0-cdh5.11.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-examples</artifactId>
                <version>1.2.0-cdh5.11.2</version>
            </dependency>
        </dependencies>
    </project>
    
    • CoprocessorExample.java类,编写内容如下
    package com.cloudera.hbase.coprocessor;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.HBaseConfiguration;
    import org.apache.hadoop.hbase.TableName;
    import org.apache.hadoop.hbase.client.Connection;
    import org.apache.hadoop.hbase.client.ConnectionFactory;
    import org.apache.hadoop.hbase.client.Table;
    import org.apache.hadoop.hbase.client.coprocessor.Batch;
    import org.apache.hadoop.hbase.coprocessor.example.generated.ExampleProtos;
    import org.apache.hadoop.hbase.ipc.BlockingRpcCallback;
    import org.apache.hadoop.hbase.util.Bytes;
    import java.io.IOException;
    import java.util.Map;
    import java.util.concurrent.atomic.AtomicLong;
    /**
     * package: com.cloudera.hbase.coprocessor
     * describe: 客户端如何调用自定义的corprocessor类,Endpoint类型,该示例代码中介绍了几种调用的方式,以及各种调用方式的效率
     * creat_user: Fayson
     * 公众号:碧茂大数据
     */
    public class CoprocessorExample {
        public static void main(String[] args) {
            //初始化HBase配置
            Configuration configuration = HBaseConfiguration.create();
            configuration.set("hbase.zookeeper.property.clientPort", "2181");
            configuration.setStrings("hbase.zookeeper.quorum", "ip-168-31-5-38.ap-southeast-1.compute.internal,ip-168-31-8-230.ap-southeast-1.compute.internal,ip-168-31-5-171.ap-southeast-1.compute.internal");
            try {
                //创建一个HBase的Connection
                Connection connection = ConnectionFactory.createConnection(configuration);
                Table testTable = connection.getTable(TableName.valueOf("TestTable"));
                execBatchEndpointCoprocessor(testTable);
                execEndpointCoprocessor(testTable);
                execFastEndpointCoprocessor(testTable);
                //关闭连接
                connection.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        /**
         * 使用batchCoprocessorService(MethodDescriptor var1, Message var2, byte[] var3, byte[] var4, R var5)的方法调用
         * 使用批量的方式,HBase会自动的将属于同一个RegionServer上的请求打包处理,可以节省网络交互的开销,效率会更高
         * @param table HBase表名
         * @return 返回表的总条数
         */
        public static long execBatchEndpointCoprocessor(Table table) {
            byte[] s= Bytes.toBytes("00000000000000000000000000");
            byte[] e= Bytes.toBytes("00000000000000000000000010");
            long start_t = System.currentTimeMillis();
            Map<byte[], ExampleProtos.CountResponse> batchMap = null;
            try {
                batchMap = table.batchCoprocessorService(
                        ExampleProtos.RowCountService.getDescriptor().findMethodByName("getKeyValueCount"),
                        ExampleProtos.CountRequest.getDefaultInstance(),
                        s,
                        e,
                        ExampleProtos.CountResponse.getDefaultInstance());
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            long batch_count = 0;
            System.out.println("Region Size:" + batchMap.size());
            for (ExampleProtos.CountResponse response : batchMap.values()) {
                batch_count += response.getCount();
            }
            System.out.println("方式一耗时:" + (System.currentTimeMillis() - start_t));
            System.out.println("方式一统计数量:" + batch_count);
            return batch_count;
        }
        /**
         * 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call)方法获取表的条数
         * @param table HBase 表对象
         * @return 返回表的条数
         */
        public static long execEndpointCoprocessor(Table table) {
            try {
                long start_t = System.currentTimeMillis();
                /**
                 * coprocessorService(Class, byte[],byte[],Batch.Call)方法描述:
                 * 参数一:Endpoint Coprocessor类,通过设置Endpoint Coprocessor类可以找到Region相应的协处理器
                 * 参数二和参数三:要调用哪些Region上的服务则有startkey和endkey来决定,通过rowkey范围可以确定多个Region,如果设置为null则为所有的Region
                 * 参数四:接口类Batch.Call定义如何调用协处理器,通过重写call()方法实现客户端的逻辑
                 *
                 * coprocessorService方法返回的是一个Map对象,Map的Key是Region的名字,Value是Batch.Call.call()方法的返回值
                 */
                Map<byte[] , Long> map = table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() {
                    @Override
                    public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
                        ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
                        BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<>();
                        rowCountService.getKeyValueCount(null, requet, rpcCallback);
                        ExampleProtos.CountResponse response = rpcCallback.get();
                        return response.getCount();
                    }
                });
                //对协处理器返回的所有Region的数量累加得出表的总条数
                long count = 0;
                System.out.println("Region Size:" + map.size());
                for(Long count_r : map.values()) {
                    count += count_r;
                }
                System.out.println("方式二耗时:" + (System.currentTimeMillis() - start_t));
                System.out.println("方式二统计数量:" + count);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            return 0l;
        }
        /**
         * 效率最高的方式,在方式二的基础上优化
         * 通过HBase的coprocessorService(Class, byte[],byte[],Batch.Call,Callback<R>)方法获取表的总条数
         * @param table HBase表名
         * @return 返回表的总条数
         */
        public static long execFastEndpointCoprocessor(Table table) {
            long start_t = System.currentTimeMillis();
            //定义总的 rowCount 变量
            AtomicLong totalRowCount = new AtomicLong();
            try {
                Batch.Callback<Long> callback = new Batch.Callback<Long>() {
                    @Override
                    public void update(byte[] region, byte[] row, Long result) {
                        totalRowCount.getAndAdd(result);
                    }
                };
                table.coprocessorService(ExampleProtos.RowCountService.class, null, null, new Batch.Call<ExampleProtos.RowCountService, Long>() {
                    @Override
                    public Long call(ExampleProtos.RowCountService rowCountService) throws IOException {
                        ExampleProtos.CountRequest requet = ExampleProtos.CountRequest.getDefaultInstance();
                        BlockingRpcCallback<ExampleProtos.CountResponse> rpcCallback = new BlockingRpcCallback<>();
                        rowCountService.getKeyValueCount(null, requet, rpcCallback);
                        ExampleProtos.CountResponse response = rpcCallback.get();
                        return response.getCount();
                    }
                }, callback);
            } catch (Throwable throwable) {
                throwable.printStackTrace();
            }
            System.out.println("方式三耗时:" + (System.currentTimeMillis() - start_t));
            System.out.println("方式三统计数量:" + totalRowCount.longValue());
            return totalRowCount.longValue();
        }
    }
    
    • HBase表统计效率对比
      • 使用HBase的count来统计测试表的总条数
    [root@ip-168-31-8-230 ~]# hbase shell
    hbase(main):001:0> count 'TestTable', INTERVAL => 1000000, CACHE => 10000
    
    • 使用HBase提供的MapReduce方式统计测试表的总条数
    [root@ip-168-31-8-230 ~]# hbase org.apache.hadoop.hbase.mapreduce.RowCounter TestTable
    
    • 查看MapReduce耗时
    • 使用HBase协处理器执行测试表统计
    • 耗时统计:


    • 结论:
      • 在使用HBase的coprocessor方法是如果传入startkey和endkey是会根据rowkey的访问检索出符合条件的region并统计每个region上数据量
      • HBase的Endpoint Coprocessor协处理器可以通过CM的方式配置全局的也可以通过客户端或hbase shell的方式来指定某一个表使用比较灵活

    大数据视频推荐:
    腾讯课堂
    CSDN
    大数据语音推荐:
    企业级大数据技术应用
    大数据机器学习案例之推荐系统
    自然语言处理
    大数据基础
    人工智能:深度学习入门到精通

    相关文章

      网友评论

        本文标题:107.HBase之Endpoint Coprocessor的调

        本文链接:https://www.haomeiwen.com/subject/ohrxrrtx.html