1、过滤器
get() scan()不能对行键、列名或列值进行过滤,可通过过滤器实现
所有过滤器都在服务端生效,叫谓词下推

层次结构
Filter接口、FilterBase抽象类是基础
定义好的过滤器实例传递给Get或Scan实例setFilter(filter)
比较运算符
CompareFilter比基类多compare()方法,传入参数定义比较操作过程。

比较器
comparator提供了多种方法来比较不同的键值

比较过滤器
CompareFilter(CompareOp valueCompareOp, WriteableByteArrayComparable valueComparator)
行过滤器
Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
Filter filter1 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(Bytes.toBytes(".*-.5")));
列族过滤器
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
值过滤器
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4"));
专用过滤器
直接继承自FilterBase
单列值过滤器
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, byte[] value)
SingleColumnValueFilter(byte[] family, byte[] qualifier, CompareOp compareOp, WritableByteArrayComparable comparator)
前缀过滤器
public PrefixFilter(byte[] prefix)
分页过滤器
客户端会记录本次扫描的最后一行,并在下一次获取数据时把记录的上次扫描的最后一行设为这次扫描的起始行。
PageFilter(int pageSize)
行键过滤器
KeyOnlyFilter提供了可以修改扫描出的列和单元格的功能,这个过滤器通过KeyValue.convertToKeyOnly(boolean)方法帮助调用只返回键不返回值。
首次行键过滤器
访问第一行中的第一列,则这种过滤器可以满足需求,这种过滤器通常在行数统计的应用场景中使用
时间戳过滤器
TimeStampFilter(List<Long> timestamps)
列计数过滤器
指定每行最多取回多少列
ColumnCountGetFilter(int n)
附加过滤器
跳转过滤器
当过滤器发现某一行中的一列需要过滤时,整行数据都被过滤
Filter filter1 = new ValueFilter(CompareFilter.CompareOp.NOT_EQUAL, new BinaryCOmparator(Bytes.toBytes("val-0")));
Filter filter2 = new SkipFilter(filter1);
全匹配过滤器
当一跳数据被过滤时,就会直接放弃这次扫描操作
FilterList
FilterList(List<Filter> rowFilters)
FilterList(Operator operator)
FilterList(Opearator operator, List<Filter> rowFilters)

自定义过滤器
public interface Filter extends Writables {
public enum ReturnCode {
INCLUDE, SKIP, NEXT_ROW, SEEK_NEXT_USING_HINT
}
public void reset()
public boolean filterRowKey(byte[] buffer, int offset, int length)
public boolean filterAllRemaining()
public ReturnCode filterKeyValue(KeyValue v)
public void filterRow(List<KeyValue> kvs)
public boolean hasFilter()
public boolean filterRow()
public KeyValue getNextHint(KeyValue currentKV)
}

- filterRowKey 检查行键
- filterKeyValue(KeyValue v) 检查这一行中的每个实例
- filterRow(List<KeyValue> kvs) 所有行列经过前两个方法过滤后,调用这个方法可访问之前两个方法筛选出的实例
- filterRow() 以上方法调用后,filterRow执行,PageFilter使用这个方法检查行数
- reset() 在迭代扫描中为每个新行重置过滤器
-
filterAllRemaining() 当这个方法返回true,可以结束整个扫描过程
过滤器处理一行数据的逻辑过程
public class CustomFilter extends FilterBase {
private byte[] value = null;
private boolean filterRow = true;
...
@Override
public void reset() {
this.filterRow = true;
}
@Override
public ReturnCode filterKeyValue(KeyValue kv) {
if (Bytes,compareTo(value, kv.getValue()) == 0) filterRow = false; // 默认false是需要过滤这个
return ReturnCode.INCLUDE;
}
}
...

2、计数器
HBase有一种机制可以将列当做计数器
客户端API提供了专门方法来完成读取并修改操作,同时在单独一次客户端的调用过程中保证原子性。
incrr '<table>', '<row>', '<column>', [<increment-value>]
单计数器
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount) throws IOException
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, boolean writeToWAL) throws IOException
多计数器
Result increment(Increment increment) throws IOException
Increment(byte[] row)
Increment addColumn(byte[] family, byte[] qulifier, long amount)
3、协处理器
简介
协处理器允许用户在region服务器上运行自己的代码,允许用户执行region级的操作,并且可以使用类似触发器的功能。
observer
与触发器类似,回调函数在一些特定事件发生时被执行。
- RegionObserver,可以用这种处理器处理数据修改事件
- MasterObserver,可以被用作管理或DDL类型的操作,集群级事件
- WALObserver
endpoint
将用户自定义操作添加到服务端,类似存储过程
Coprocessor
所有协处理器的类都必须实现这个接口
void start(CoprocessorEnviroment env) throws IOException;
void stop(CoprocessorEnvironment env) throws IOException;
Coprocessor、CoprocessorEnvironment和CoprocessorHost这3个类形成了协处理器类的基础。

RegionObserver
处理region声明周期事件
当一个特定的region级别的操作发生时,它们的钩子函数会被触发

void preOpen()
void postOpen()
处理客户端API事件
所有的客户端API调用都显式地从客户端应用传输到region服务器,用户可以在这些调用前或刚刚执行后拦截。
void preGet()
void postGet
RegionCoprocessorEnviroment
RegionObserver类的协处理器环境的实例基于RegionCoprocessor Environment类
RegionCoprocessor Environment实现了Coprocessor Environment接口
observerContext类
RegionObserver类提供的所有回调函数都需要一个特殊的上下文作为共同参数,就是ObserverContext类
BaseRegionObserver
实现了RegionObserver接口的空方法
public void preGet(final ObserverContext<RegionCoprocessorEnvironment> e, final Get get, final List<KeyValue> results) throws IOException {
if (Bytes.equals(get.getRow(), FIXED_ROW)) {
xxxx
}
}
MasterObserver类
处理master服务器所有回调函数

MasterCoprosessorEnvironment
封装了一个MasterObserver实例,实现了CoprocessorEnvironment接口
BaseMasterObserver
为MasterObserver提供了空实现
public void postCreateTable(ObserverContext<MasterCoprocessorEnvironment> env, HRegionInfo[] regions, boolean sync) throws IOException {
MasterService services = env.getEnvironment().getMasterServices();
...
}
endpoint
可以运行在所有region上的存储过程,比如计算某个列的聚合结果就可以用到
CoprocessorProtocol接口
系统提供了一个协处理器实现来定义扩展CoprocessorProtocol接口。通过这个接口可以定义协处理器希望暴露给用户的任意方法。
CoprocessorProtocol实例和表中单个region联系在一起,所以客户端的RPC调用必须定义region,这个region会在CoprocessorProtocol方法的调用中被使用到。
客户端实现Batch.Call方法来调用CoprocessorProtocol实例方法。
BaseEndpointCoprocessor
实现一个endpoint需要扩展CoprocessorProtocol接口,然后扩展BaseEndpointCoprocessor类
https://github.com/larsgeorge/hbase-book/blob/master/ch04/src/main/java/coprocessor/RowCountEndpoint.java
try {
final RowCounterProtos.CountRequest request =
RowCounterProtos.CountRequest.getDefaultInstance();
Map<byte[], Long> results = table.coprocessorService(
RowCounterProtos.RowCountService.class, // co EndpointExample-1-ClassName Define the protocol interface being invoked.
null, null, // co EndpointExample-2-Rows Set start and end row key to "null" to count all rows.
new Batch.Call<RowCounterProtos.RowCountService, Long>() { // co EndpointExample-3-Batch Create an anonymous class to be sent to all region servers.
public Long call(RowCounterProtos.RowCountService counter)
throws IOException {
BlockingRpcCallback<RowCounterProtos.CountResponse> rpcCallback =
new BlockingRpcCallback<RowCounterProtos.CountResponse>();
counter.getRowCount(null, request, rpcCallback); // co EndpointExample-4-Call The call() method is executing the endpoint functions.
RowCounterProtos.CountResponse response = rpcCallback.get();
return response.hasCount() ? response.getCount() : 0;
}
}
);

4、HTablePool
减少HTable实例的创建
HTable不是线程安全的,本地的写缓冲区并不能保证一致性,需要为每个线程创建一个HTable实例
客户端可以通过HTablePool类来解决这个问题
HTablePool(Configuration config, int maxSize)
HTablePool(Configuration config, int maxSize, HTableInterfaceFactory tableFactory)
HTableInterface getTable(String tableName)
HTableInterface getTable(byte[]tableName)
Configuration conf = HBaseConfiguration.create();
HTablePool pool = new HTablePool(conf, 5);
HTableInterface[] tables = new HTableInterface[10];
for () {
tables[n] = pool.getTable("");
}
5、连接管理
HTable与远程主机连接,在内部使用HConnection类标识,被HConnectionManager类管理并共享。
HBase内部使用键值映射来存储连接,使用Configuration实例作为键值映射的键。如果configuration相同,那么使用的HConnection实例时一样的。
共享ZooKeeper连接、缓存通用资源,共享连接缺点在于如果不显示关闭连接,会一直存在直到客户端退出。
网友评论