开发环境
sqoop 1.4.7
hbase 1.3.1
elasticsearch 7.1.1
- 编写coprocessor
hbase使用1.3.1要自定义coprocessor需要继承BaseRegionObserver,重写postPut和postDelete方法
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.util.Bytes;
import java.io.IOException;
import java.util.*;
public class HbaseDataSyncEsObserver extends BaseRegionObserver {
private String esAddress;
private String index;
private String family;
/**
* 读取配置
* @param env
*/
private void readConfiguration(CoprocessorEnvironment env) {
Configuration conf = env.getConfiguration();
esAddress = conf.get("es_address");
index = conf.get("es_index");
family = conf.get("family");
}
@Override
public void start(CoprocessorEnvironment e) throws IOException {
readConfiguration(e);
ElasticSearchConfig.restHighLevelClient(esAddress);
}
@Override
public void stop(CoprocessorEnvironment e) throws IOException {
ElasticSearchConfig.client.close();
}
@Override
public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) {
String indexId = Bytes.toString(put.getRow());
NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
Map<String,Map<String,Object>> esData = new HashMap<>();
Map<String, Object> data = new HashMap<>();
for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()){
if(Bytes.toString(entry.getKey()).equals(family)){
for (Cell cell : entry.getValue()) {
String key = Bytes.toString(CellUtil.cloneQualifier(cell));
String value = Bytes.toString(CellUtil.cloneValue(cell));
data.put(key, value);
}
break;
}
}
esData.put(indexId,data);
ElasticSearchUtil.saveEsDataWithBulk(esData,index);
}
@Override
public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
String indexId = new String(delete.getRow());
ElasticSearchUtil.delteEsDataWithBulk(Arrays.asList(indexId), index);
}
}
es创建连接
import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig.Builder;
import org.elasticsearch.client.Node;
import org.elasticsearch.client.NodeSelector;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.RestClient.FailureListener;
import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ElasticSearchConfig {
private static final Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class);
public static RestHighLevelClient client;
public static RestHighLevelClient restHighLevelClient(String address) {
String[] hosts = address.split(",");
if (hosts != null && hosts.length > 0) {
HttpHost[] httpHosts = new HttpHost[hosts.length];
int count = 0;
String[] var4 = hosts;
int var5 = hosts.length;
for(int var6 = 0; var6 < var5; ++var6) {
String host = var4[var6];
httpHosts[count] = new HttpHost(host, 9200, "http");
++count;
}
RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setRequestConfigCallback(new RequestConfigCallback() {
public Builder customizeRequestConfig(Builder requestConfigBuilder) {
requestConfigBuilder.setConnectTimeout(0);
requestConfigBuilder.setSocketTimeout(300000);
requestConfigBuilder.setConnectionRequestTimeout(2000);
return requestConfigBuilder;
}
});
restClientBuilder.setFailureListener(new FailureListener() {
public void onFailure(Node node) {
ElasticSearchConfig.log.error("************************es 监听器 failure:{}", node.getName());
}
});
restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
client = new RestHighLevelClient(restClientBuilder);
log.info("ElasticSearch client init success ....");
}
return client;
}
}
es实现更新删除操作
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
public class ElasticSearchUtil {
private static final Logger log = LoggerFactory.getLogger(ElasticSearchUtil.class);
private static RestHighLevelClient getClient() {
return ElasticSearchConfig.client;
}
/**
*
* @param esBulkData 存储的数据
* @param index 索引
*/
public static void saveEsDataWithBulk(Map<String, Map<String, Object>> esBulkData, String index) {
BulkRequest bulkRequest = new BulkRequest();
Iterator var3 = esBulkData.entrySet().iterator();
while(var3.hasNext()) {
Map.Entry<String, Map<String, Object>> oneEsData = (Map.Entry)var3.next();
String id = oneEsData.getKey();
Map<String, Object> esData = (Map)oneEsData.getValue();
UpdateRequest request = new UpdateRequest(index, id);
request.doc(esData);
request.docAsUpsert(true);
bulkRequest.add(request);
}
try {
getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (Exception var8) {
log.error("save esData error:", var8);
}
}
public static void delteEsDataWithBulk(List<String> ids,String index){
BulkRequest bulkRequest = new BulkRequest();
ids.stream().forEach(id->{
DeleteRequest request = new DeleteRequest(index);
request.id(id);
bulkRequest.add(request);
});
try {
getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
} catch (Exception var8) {
log.error("save esData error:", var8);
}
}
private ElasticSearchUtil() {
}
}
- 代码打包
对应的maven依赖包
<properties>
<hbase.verson>1.3.1</hbase.verson>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<elasticsearch.verson>7.1.1</elasticsearch.verson>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>${hbase.verson}</version>
</dependency>
<!-- es -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elasticsearch.verson}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.verson}</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.11.1</version>
</dependency>
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>4.5.10</version>
</dependency>
</dependencies>
打包的时候需要注意要把所有的依赖打包进去,此处使用maven插件assembly
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.1.1</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assemble</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
-
上传jar包到hdfs目录
上传jar包 - 配置自定义的coprocessor
配置alter "blacklist", METHOD => 'table_att', 'coprocessor'=>'hdfs:///coprocessor/hbase-observer-elasticsearch-1.0.0-SNAPSHOT.jar|com.qjdchina.hbase.observer.HbaseDataSyncEsObserver|1001|es_address=10.1.1.235,es_index=blacklist,family=info'
-
创建索引
查看es数据 -
hbase表中插入记录查看同步情况
hbase中插入数据
同步es -
增加新的列簇验证是否添加到es
增加列簇和列
遇到的问题:
- 1.通过hbase shell测试put和deleteall操作发现es数据同步更新,但是奇怪的是通过sqoop导入数据,同时触发postPut和postDelete操作
es上文档
导入命令:
./bin/sqoop import
--connect jdbc:mysql://数据库地址?tinyint1isbit=false
--username 账户
--password '密码'
--query 'select id,card_number,legal_person from tax_info where insert_time >= "2020-03-13 16:00:00" and insert_time <= "2020-03-13 18:00:00" and $CONDITIONS '
--hbase-create-table
--columns 'id','card_number','legal_person'
--column-family info
--incremental append
--check-column 'id'
--hbase-row-key "id"
--hbase-table tax_info_tmp01
-m 1- 2.使用sqoop同时分批次导入多个列簇,一个列簇当查询条件同步es,其他列簇不同步,发现先导入到es上的数据,在导入其他列簇的时候触发postDelete操作又删除了
网友评论