美文网首页
HBase协处理器observer

HBase协处理器observer

作者: SolidHeart | 来源:发表于2019-03-28 10:24 被阅读0次

    一、简介

    本文将介绍HBase协处理器中的observer,observer协处理器类似于数据库中的触发器,也类似于程序设计中的AOP。其作用实在数据操作函数中预留钩子函数,使你可以编写pre-hooks或者post-hooks进行拦截操作。

    二、请求生命周期

    (1)HBase启动的时候会根据元信息加载observer对象,并将他部署在对应的region上的CoprocessorHost(注意部署的对象会被复用,而不是一个请求生成一次对象处理)
    (2)observer对象初始化的时候,会调用start(CoprocessorEnvironment e)函数,我们可以在该函数中做资源的初始化。
    (3)客户端发送请求后,请求会定位到对应的region,CoprocessorHost会拦截符合条件的请求,进行pre-hook或者post-hook操作(hook函数的执行都是同步操作)
    (4)完成数据操作和对应hook操作后,返回响应给客户端。

    三、例子:使用observer实现辅助索引

    在Hbase中只有单行的事务,同时索引也只有rowkey,如果我们需要建立辅助索引,我们只能使用两个单独的表去维护。例如在一次操作中同时添加两条数据。

    public void fun(){
      put(a);               // (1)
      put(b);              //  (2)
    }
    

    因为HBase只有单行事务,所以(1)成功并不能保证(2)能同时成功,假如不能同时成功HBase并不能提供跨行事务回滚的操作。

    一个经典的例子是在《HBase实战》中关注与被关注者的实现。情景如下:
    当a关注b时,在follower表添加一行数据,rowkey为 hash(a)+hash(b)
    同时在followedBy表添加一行数据,rowkey为hash(b)+hash(a)
    这样就能实现我关注了谁 和 谁关注了我这样的需求。

    明显hbase不提供跨行事务去保证这两张表数据的一致性,但是observer的 hook函数提供了一致性的保证。当postPut函数失败的时候,HBase会自动重试postPut函数,直到postPut函数执行成功,通过同步重试来保证多条数据是同时插入成功的。

    以下是代码例子,注HBase的版本是1.15,你需要在Maven中引入hbase-client和hbase-server这两个依赖,同时Java的编译版本需要是1.7

    public class FollowerObserver extends BaseRegionObserver {
    
        Logger logger = LoggerFactory.getLogger(FollowerObserver.class);
    
        private FollowedByDAO followedByDAO;
        private Connection connection;
    
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
            logger.info("observer start");
            try{
                String tableNames = e.getEnvironment().getRegionInfo().getTable().getNameAsString();
                if (!tableNames.equals("follower")){
                    return;
                }
                Cell fromCell = put.get(Bytes.toBytes("f"),Bytes.toBytes("from")).get(0);
                byte[] fromb = CellUtil.cloneValue(fromCell);
                String from = Bytes.toString(fromb);
                Cell toCell = put.get(Bytes.toBytes("f"),Bytes.toBytes("to")).get(0);
                byte[] tob = CellUtil.cloneValue(toCell);
                String to = Bytes.toString(tob);
                logger.info("{} follow {}",from,to);
                followedByDAO.addFollower(from,to);
                logger.info("create followedby relation successfully!");
            }catch (Throwable t){
                logger.error(t.getMessage(),t);
                throw new IOException(t.getMessage());
            }finally {
                logger.info("observer end!");
            }
    
        }
    
    
        @Override
        public void start(CoprocessorEnvironment e) throws IOException {
            logger.info("---------------init---------------");
            Configuration conf = e.getConfiguration();
            connection = ConnectionFactory.createConnection(conf);
            followedByDAO = new FollowedByDAO(connection);
        }
    
    
        @Override
        public void stop(CoprocessorEnvironment e) throws IOException {
            logger.info("------------destroy------------------");
            connection.close();
            followedByDAO.close();
        }
    }
    
    

    四、安装observer

    (1)将目标项目下执行 mvn clean package
    (2)将Jar包上传到服务器或者HDFS
    (3)启动hbase shell,执行以下命令

    #禁用目标表
    disable 'follower'
    #将之前载入的协处理器卸载(如果有)
    alter 'follower',METHOD => 'table_att_unset',NAME => 'coprocessor$1'
    #加载新的协处理器,指定Jar文件的路径 | 目标类的路径 | 优先级
    alter 'follower',METHOD => 'table_att' , 'coprocessor'=>'file:///path/to/your/jar|com.sample.hbase.co.FollowerObserver|1001'
    #启动目标表
    enable 'follower'
    

    五、注意事项

    (1) observer的hook函数是同步的,这意味着如果你在hook函数中遭遇死锁(HBase会上行锁,因此要检查你的代码在并发环境下是否可能会发生死锁),或者失败的时候,它会一直阻塞直到hook函数执行成功。这意味着使用hook函数越多,在hook函数中做的数据操作越多,越影响写入的吞吐率。
    (2) observer的安装,更新,卸载都需要将对应的表disable后才能进行,这意味着他并不适合编写易变的业务逻辑,例如本文例子中的关注者关系的索引的维护,这种业务逻辑应该尽量避免实现在这一层。(至于如何保证一致性可以在另一文章中阐述)
    (3) observer很难调试,只能打log,log输出在执行该对象的服务器上,可以在hbase/log中查看。引入Log的方法是LoggerFactory.getLogger(ClassName)

    相关文章

      网友评论

          本文标题:HBase协处理器observer

          本文链接:https://www.haomeiwen.com/subject/zkyfbqtx.html