一、简介
本文将介绍HBase协处理器中的observer,observer协处理器类似于数据库中的触发器,也类似于程序设计中的AOP。其作用实在数据操作函数中预留钩子函数,使你可以编写pre-hooks或者post-hooks进行拦截操作。
二、请求生命周期
(1)HBase启动的时候会根据元信息加载observer对象,并将他部署在对应的region上的CoprocessorHost(注意部署的对象会被复用,而不是一个请求生成一次对象处理)
(2)observer对象初始化的时候,会调用start(CoprocessorEnvironment e)函数,我们可以在该函数中做资源的初始化。
(3)客户端发送请求后,请求会定位到对应的region,CoprocessorHost会拦截符合条件的请求,进行pre-hook或者post-hook操作(hook函数的执行都是同步操作)
(4)完成数据操作和对应hook操作后,返回响应给客户端。
三、例子:使用observer实现辅助索引
在Hbase中只有单行的事务,同时索引也只有rowkey,如果我们需要建立辅助索引,我们只能使用两个单独的表去维护。例如在一次操作中同时添加两条数据。
public void fun(){
put(a); // (1)
put(b); // (2)
}
因为HBase只有单行事务,所以(1)成功并不能保证(2)能同时成功,假如不能同时成功HBase并不能提供跨行事务回滚的操作。
一个经典的例子是在《HBase实战》中关注与被关注者的实现。情景如下:
当a关注b时,在follower表添加一行数据,rowkey为 hash(a)+hash(b)
同时在followedBy表添加一行数据,rowkey为hash(b)+hash(a)
这样就能实现我关注了谁 和 谁关注了我这样的需求。
明显hbase不提供跨行事务去保证这两张表数据的一致性,但是observer的 hook函数提供了一致性的保证。当postPut函数失败的时候,HBase会自动重试postPut函数,直到postPut函数执行成功,通过同步重试来保证多条数据是同时插入成功的。
以下是代码例子,注HBase的版本是1.15,你需要在Maven中引入hbase-client和hbase-server这两个依赖,同时Java的编译版本需要是1.7
public class FollowerObserver extends BaseRegionObserver {
Logger logger = LoggerFactory.getLogger(FollowerObserver.class);
private FollowedByDAO followedByDAO;
private Connection connection;
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
logger.info("observer start");
try{
String tableNames = e.getEnvironment().getRegionInfo().getTable().getNameAsString();
if (!tableNames.equals("follower")){
return;
}
Cell fromCell = put.get(Bytes.toBytes("f"),Bytes.toBytes("from")).get(0);
byte[] fromb = CellUtil.cloneValue(fromCell);
String from = Bytes.toString(fromb);
Cell toCell = put.get(Bytes.toBytes("f"),Bytes.toBytes("to")).get(0);
byte[] tob = CellUtil.cloneValue(toCell);
String to = Bytes.toString(tob);
logger.info("{} follow {}",from,to);
followedByDAO.addFollower(from,to);
logger.info("create followedby relation successfully!");
}catch (Throwable t){
logger.error(t.getMessage(),t);
throw new IOException(t.getMessage());
}finally {
logger.info("observer end!");
}
}
@Override
public void start(CoprocessorEnvironment e) throws IOException {
logger.info("---------------init---------------");
Configuration conf = e.getConfiguration();
connection = ConnectionFactory.createConnection(conf);
followedByDAO = new FollowedByDAO(connection);
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
logger.info("------------destroy------------------");
connection.close();
followedByDAO.close();
}
}
四、安装observer
(1)将目标项目下执行 mvn clean package
(2)将Jar包上传到服务器或者HDFS
(3)启动hbase shell,执行以下命令
#禁用目标表
disable 'follower'
#将之前载入的协处理器卸载(如果有)
alter 'follower',METHOD => 'table_att_unset',NAME => 'coprocessor$1'
#加载新的协处理器,指定Jar文件的路径 | 目标类的路径 | 优先级
alter 'follower',METHOD => 'table_att' , 'coprocessor'=>'file:///path/to/your/jar|com.sample.hbase.co.FollowerObserver|1001'
#启动目标表
enable 'follower'
五、注意事项
(1) observer的hook函数是同步的,这意味着如果你在hook函数中遭遇死锁(HBase会上行锁,因此要检查你的代码在并发环境下是否可能会发生死锁),或者失败的时候,它会一直阻塞直到hook函数执行成功。这意味着使用hook函数越多,在hook函数中做的数据操作越多,越影响写入的吞吐率。
(2) observer的安装,更新,卸载都需要将对应的表disable后才能进行,这意味着他并不适合编写易变的业务逻辑,例如本文例子中的关注者关系的索引的维护,这种业务逻辑应该尽量避免实现在这一层。(至于如何保证一致性可以在另一文章中阐述)
(3) observer很难调试,只能打log,log输出在执行该对象的服务器上,可以在hbase/log中查看。引入Log的方法是LoggerFactory.getLogger(ClassName)
网友评论