Hbase协处理器

作者: ahzhaojj | 来源:发表于2016-11-13 22:07 被阅读0次

    一、协处理器的介绍

    Hbase可以让用户的部分逻辑在数据存放端及Hbase服务端进行计算的机制(框架)。协处理器允许用户在Hbase服务端上运行自己的代码。

    二、协处理器的分类

    加载角度分类—— 系统协处理器、 表协处理器(用户可以指定某一张表使用协处理器 )
    功能角度分类—— Observer协处理器(相当于关系型数据库中的触发器 )、Endpoint协处理器(动态终端,类似于一个存储过程 )

    2.1Observer分类

    RegionObserver协处理器——允许处理region上的事件
    RegionServerObserver协处理器——处理RegionServer上的事件
    MasterObserver协处理器——专门处理HMaster上的一些事件,比如创建删除表等操作。
    WalObserver协处理器——允许处理日志上的事件

    2.2 Observer的执行流程

    Observer的执行流程图.png

    2.3Endpoint协处理器

    实现代码被部署在HBase服务器服务端,需要自己写一个客户端,去调用服务端上的实现。


    Endpoint.png

    三、演示endpoint服务端编写

    1.创建endpoint.proto文件,生成java文件

    option java_pakage = “com.jkb.coprocessor.endpoint”;
    option java_outer_classname = “Sum”;  
    option java_generic_service = true;
    option java_generate_equals_and_hash = true;
    option optimize_for = SPEED;
    message SumRequence{
    required string family = 1;
    required string column = 2;
    }
    message SumResponse{
    required int64 sum = 1 [default = 0]; 
    }
    //定义rpc服务器类的定义
    service SumService{
    rpc getSum(SumRequest)
    returns (SumResponse);
    }
    

    2.执行命令,将.proto文件生成java代码
    protoc endpoint.proto --java_out=./
    3.将java文件放入eclipse中相应的工程中相应的目录下,然后加载Hbase中的jar包
    4.编写SumEndPoint.java

    Package com.jkb.coprocessor.endpoint;
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.List;
    
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.Coprocessor;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Scan;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorException;
    import org.apache.hadoop.hbase.coprocessor.CoprocessorService;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.protobuf.ResponseConverter;
    import org.apache.hadoop.hbase.regionserver.InternalScanner;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import com.google.protobuf.RpcCallback;
    import com.google.protobuf.RpcController;
    import com.google.protobuf.Service;
    
    import edu.endpoint.Sum.SumRequest;
    import edu.endpoint.Sum.SumResponse;
    import edu.endpoint.Sum.SumService;
    
    public class SumEndPoint extends SumService implements Coprocessor,CoprocessorService{
           private RegionCoprocessorEnvironment env;
    
        @Override
        public void getSum(RpcController controller, SumRequest request,
                RpcCallback<SumResponse> done) {
           Scan scan = new Scan(); 
           scan.addFamily(Bytes.toBytes(request.getFamily()));  
         scan.addColumn(Bytes.toBytes(request.getFamily()), Bytes.toBytes(request.getColumn()));
           SumResponse response = null;
           InternalScanner scanner = null;
           try{
               scanner = env.getRegion().getScanner(scan);
               List<Cell> results = new ArrayList<Cell>(); 
               boolean hasMore = false;
               Long sum = 0L;
               do{
                   hasMore = scanner.next(results);
                   for(Cell cell: results){
                       sum += Long.parseLong(new String(CellUtil.cloneValue(cell)));
                   }
                   results.clear();
               }while(hasMore);
               response = SumResponse.newBuilder().setSum(sum).build();
           }catch(IOException e){
               ResponseConverter.setControllerException(controller, e);
           }finally{
               if(scanner!=null){
                   try {
                    scanner.close();
                } catch (IOException e) {
                }
               }
           }
            done.run(response);
        }
    
        @Override
        public Service getService() {
            return this;
        }
    
        @Override
        public void start(CoprocessorEnvironment env) throws IOException {
            if(env instanceof RegionCoprocessorEnvironment){
                this.env = (RegionCoprocessorEnvironment)env;
            }else{
                throw new CoprocessorException("no load region");
            }
    
        }
    
        @Override
        public void stop(CoprocessorEnvironment arg0) throws IOException {
    
        }
    
    }
    

    相关文章

      网友评论

        本文标题:Hbase协处理器

        本文链接:https://www.haomeiwen.com/subject/dywcpttx.html