-
自定义JAVA class并继承BaseRegionObserver
package com.charley.example.hbase2es;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.*;
/**
* author: Charley
*
*/
public class ESObserver extends BaseRegionObserver {
private TransportClient client = null;
private String clusterName = null;
private String nodeHost = null;
private int nodePort = 0;
private String indexName = null;
private String typeName = null;
/**
* 生命周期内,只执行一次
* 适合做一些初始化的操作
*/
@Override
public void start(CoprocessorEnvironment e) throws IOException {
super.start(e);
init(e);
Settings settings = Settings.builder().put("cluster.name", clusterName).put("client.transport.sniff", true).build();
try {
client = new PreBuiltTransportClient(settings).addTransportAddress(new TransportAddress(InetAddress.getByName(nodeHost), nodePort));
} catch (UnknownHostException ex) {
ex.printStackTrace();
}
}
/**
* 初始化参数,这些参数都是自己对hbase表添加Observer的时候传递过来的
*/
private void init(CoprocessorEnvironment e) throws IOException {
clusterName = e.getConfiguration().get("cluster");
nodeHost = e.getConfiguration().get("host");
nodePort = e.getConfiguration().getInt("port", 9300);
indexName = e.getConfiguration().get("index");
typeName = e.getConfiguration().get("type");
}
/**
* 生命周期内执行很多次,每次对hbase对应的表做一次put操作,就会执行一次
* 这是同步数据到ES的核心方法
*/
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) throws IOException {
super.postPut(e, put, edit, durability);
String indexId = new String(put.getRow());//获取rowkey
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String, Object> json = new HashMap<>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()) {
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
json.put(key, value);
}
}
//将数据写入到ES
client.prepareIndex(indexName, typeName, indexId).setSource(json, XContentType.JSON).get();
//也可以将数据写入到其它地方,比如:mysql、自定义log等,可以自由发挥
}
/**
* 生命周期内,只执行一次
* 适合回收资源的操作
*/
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
super.stop(e);
}
}
-
将Java代码打成jar包并拷贝到hdfs上,jar_hdfs_path=hdfs:///jar/hbase2es.jar
-
对hbase的表添加coprocessor
- 添加coprocessor
- 用你想要添加协处理器的hbase中的表替换hbase_table_name
- METHOD =>'table_att'这个是固定的,不需要修改
- 'coprocessor'=>'jar_file_path|observer_class_path|priority|params'
- 一共四个参数,用|分割,最后一个params可省略
- jar_file_path,Java代码打成的jar包存放的绝对路径,最好是HDFS路径
- observer_class_path,observer类的包名加类名
- priority,优先级,就用固定的1001即可
- params,传递给observer的参数信息,相当于map,例如:id=123,name='hahaha',age=18
create 'hbase_table_name','ow'
disable 'hbase_table_name'
alter 'hbase_table_name' , METHOD =>'table_att','coprocessor'=>'hdfs:///jar/hbase2es.jar|com.charley.example.hbase2es.ESObserver|1001|cluster=my_cluster,host=192.168.100.100,port=9300,index=index_test,type=type_test'
enable 'hbase_table_name'
disable 'hbase_table_name'
alter 'hbase_table_name' , METHOD =>'table_att_unset',NAME=>'coprocessor$1'
enable 'hbase_table_name'
-
这个时候coprocessor就已经设置完毕了,直接对hbase的这个表'hbase_table_name'添加一条数据,就会发现,es里面也产生了对应的数据,那么hbase同步数据到es来实现hbase二级索引到问题就完成了
注意:如果第一次添加的observer有问题,第二次在打包上传时,一定要给jar包重新命名,不然可能不会生效。
网友评论