美文网首页
hbase使用es做二级索引-使用observer

hbase使用es做二级索引-使用observer

作者: 风一样的存在 | 来源:发表于2020-03-10 18:53 被阅读0次

    开发环境
    sqoop 1.4.7
    hbase 1.3.1
    elasticsearch 7.1.1

    • 编写coprocessor
      hbase使用1.3.1要自定义coprocessor需要继承BaseRegionObserver,重写postPutpostDelete方法
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.Cell;
    import org.apache.hadoop.hbase.CellUtil;
    import org.apache.hadoop.hbase.CoprocessorEnvironment;
    import org.apache.hadoop.hbase.client.Delete;
    import org.apache.hadoop.hbase.client.Durability;
    import org.apache.hadoop.hbase.client.Put;
    import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
    import org.apache.hadoop.hbase.coprocessor.ObserverContext;
    import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
    import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
    import org.apache.hadoop.hbase.util.Bytes;
    
    import java.io.IOException;
    import java.util.*;
    
    public class HbaseDataSyncEsObserver extends BaseRegionObserver {
        private String esAddress;
        private String index;
        private String family;
    
        /**
         * 读取配置
         * @param env
         */
        private void readConfiguration(CoprocessorEnvironment env) {
            Configuration conf = env.getConfiguration();
            esAddress = conf.get("es_address");
            index = conf.get("es_index");
            family = conf.get("family");
    
        }
    
        @Override
        public void start(CoprocessorEnvironment e) throws IOException {
            readConfiguration(e);
            ElasticSearchConfig.restHighLevelClient(esAddress);
        }
    
        @Override
        public void stop(CoprocessorEnvironment e) throws IOException {
            ElasticSearchConfig.client.close();
        }
    
        @Override
        public void postPut(ObserverContext<RegionCoprocessorEnvironment> e, Put put, WALEdit edit, Durability durability) {
            String indexId = Bytes.toString(put.getRow());
            NavigableMap<byte[], List<Cell>> familyMap = put.getFamilyCellMap();
            Map<String,Map<String,Object>> esData = new HashMap<>();
            Map<String, Object> data = new HashMap<>();
            for (Map.Entry<byte[], List<Cell>> entry : familyMap.entrySet()){
                if(Bytes.toString(entry.getKey()).equals(family)){
                    for (Cell cell : entry.getValue()) {
                        String key = Bytes.toString(CellUtil.cloneQualifier(cell));
                        String value = Bytes.toString(CellUtil.cloneValue(cell));
                        data.put(key, value);
                    }
                    break;
                }
            }
            esData.put(indexId,data);
            ElasticSearchUtil.saveEsDataWithBulk(esData,index);
        }
    
        @Override
        public void postDelete(ObserverContext<RegionCoprocessorEnvironment> e, Delete delete, WALEdit edit, Durability durability) throws IOException {
            String indexId = new String(delete.getRow());
            ElasticSearchUtil.delteEsDataWithBulk(Arrays.asList(indexId), index);
        }
    }
    

    es创建连接

    import org.apache.http.HttpHost;
    import org.apache.http.client.config.RequestConfig.Builder;
    import org.elasticsearch.client.Node;
    import org.elasticsearch.client.NodeSelector;
    import org.elasticsearch.client.RestClient;
    import org.elasticsearch.client.RestClientBuilder;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.elasticsearch.client.RestClient.FailureListener;
    import org.elasticsearch.client.RestClientBuilder.RequestConfigCallback;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    public class ElasticSearchConfig {
        private static final Logger log = LoggerFactory.getLogger(ElasticSearchConfig.class);
        public static RestHighLevelClient client;
    
    
        public static RestHighLevelClient restHighLevelClient(String address) {
            String[] hosts = address.split(",");
            if (hosts != null && hosts.length > 0) {
                HttpHost[] httpHosts = new HttpHost[hosts.length];
                int count = 0;
                String[] var4 = hosts;
                int var5 = hosts.length;
    
                for(int var6 = 0; var6 < var5; ++var6) {
                    String host = var4[var6];
                    httpHosts[count] = new HttpHost(host, 9200, "http");
                    ++count;
                }
    
                RestClientBuilder restClientBuilder = RestClient.builder(httpHosts).setRequestConfigCallback(new RequestConfigCallback() {
                    public Builder customizeRequestConfig(Builder requestConfigBuilder) {
                        requestConfigBuilder.setConnectTimeout(0);
                        requestConfigBuilder.setSocketTimeout(300000);
                        requestConfigBuilder.setConnectionRequestTimeout(2000);
                        return requestConfigBuilder;
                    }
                });
                restClientBuilder.setFailureListener(new FailureListener() {
                    public void onFailure(Node node) {
                        ElasticSearchConfig.log.error("************************es 监听器 failure:{}", node.getName());
                    }
                });
                restClientBuilder.setNodeSelector(NodeSelector.SKIP_DEDICATED_MASTERS);
                client = new RestHighLevelClient(restClientBuilder);
                log.info("ElasticSearch client init success ....");
            }
    
            return client;
        }
    }
    

    es实现更新删除操作

    import org.elasticsearch.action.bulk.BulkRequest;
    import org.elasticsearch.action.delete.DeleteRequest;
    import org.elasticsearch.action.update.UpdateRequest;
    import org.elasticsearch.client.RequestOptions;
    import org.elasticsearch.client.RestHighLevelClient;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    
    public class ElasticSearchUtil {
        private static final Logger log = LoggerFactory.getLogger(ElasticSearchUtil.class);
    
        private static RestHighLevelClient getClient() {
            return ElasticSearchConfig.client;
        }
    
        /**
         *
         * @param esBulkData 存储的数据
         * @param index  索引
         */
        public static void saveEsDataWithBulk(Map<String, Map<String, Object>> esBulkData, String index) {
            BulkRequest bulkRequest = new BulkRequest();
            Iterator var3 = esBulkData.entrySet().iterator();
    
            while(var3.hasNext()) {
                Map.Entry<String, Map<String, Object>> oneEsData = (Map.Entry)var3.next();
                String id = oneEsData.getKey();
                Map<String, Object> esData = (Map)oneEsData.getValue();
                UpdateRequest request = new UpdateRequest(index, id);
                request.doc(esData);
                request.docAsUpsert(true);
                bulkRequest.add(request);
            }
    
            try {
                getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
            } catch (Exception var8) {
                log.error("save esData error:", var8);
            }
    
        }
    
        public static void delteEsDataWithBulk(List<String> ids,String index){
            BulkRequest bulkRequest = new BulkRequest();
            ids.stream().forEach(id->{
                DeleteRequest request = new DeleteRequest(index);
                request.id(id);
                bulkRequest.add(request);
            });
            try {
                getClient().bulk(bulkRequest, RequestOptions.DEFAULT);
            } catch (Exception var8) {
                log.error("save esData error:", var8);
            }
        }
    
        private ElasticSearchUtil() {
        }
    }
    
    • 代码打包
      对应的maven依赖包
        <properties>
            <hbase.verson>1.3.1</hbase.verson>
            <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
            <elasticsearch.verson>7.1.1</elasticsearch.verson>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.hbase</groupId>
                <artifactId>hbase-server</artifactId>
                <version>${hbase.verson}</version>
            </dependency>
            <!-- es -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>${elasticsearch.verson}</version>
            </dependency>
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>${elasticsearch.verson}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.logging.log4j</groupId>
                <artifactId>log4j-api</artifactId>
                <version>2.11.1</version>
            </dependency>
            <dependency>
                <groupId>org.apache.httpcomponents</groupId>
                <artifactId>httpclient</artifactId>
                <version>4.5.10</version>
            </dependency>
        </dependencies>
    

    打包的时候需要注意要把所有的依赖打包进去,此处使用maven插件assembly

        <build>
            <plugins>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-compiler-plugin</artifactId>
                    <version>3.8.1</version>
                    <configuration>
                        <source>1.8</source>
                        <target>1.8</target>
                    </configuration>
                </plugin>
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.1.1</version>
                    <configuration>
                        <descriptorRefs>
                            <descriptorRef>jar-with-dependencies</descriptorRef>
                        </descriptorRefs>
                    </configuration>
                    <executions>
                        <execution>
                            <id>make-assemble</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    
    • 上传jar包到hdfs目录


      上传jar包
    • 配置自定义的coprocessor

    alter "blacklist", METHOD => 'table_att', 'coprocessor'=>'hdfs:///coprocessor/hbase-observer-elasticsearch-1.0.0-SNAPSHOT.jar|com.qjdchina.hbase.observer.HbaseDataSyncEsObserver|1001|es_address=10.1.1.235,es_index=blacklist,family=info'

    配置
    • 创建索引


      查看es数据
    • hbase表中插入记录查看同步情况


      hbase中插入数据
      同步es
    • 增加新的列簇验证是否添加到es


      增加列簇和列

    遇到的问题:

    • 1.通过hbase shell测试put和deleteall操作发现es数据同步更新,但是奇怪的是通过sqoop导入数据,同时触发postPut和postDelete操作
      es上文档
      导入命令:
      ./bin/sqoop import
      --connect jdbc:mysql://数据库地址?tinyint1isbit=false
      --username 账户
      --password '密码'
      --query 'select id,card_number,legal_person from tax_info where insert_time >= "2020-03-13 16:00:00" and insert_time <= "2020-03-13 18:00:00" and $CONDITIONS '
      --hbase-create-table
      --columns 'id','card_number','legal_person'
      --column-family info
      --incremental append
      --check-column 'id'
      --hbase-row-key "id"
      --hbase-table tax_info_tmp01
      -m 1
    • 2.使用sqoop同时分批次导入多个列簇,一个列簇当查询条件同步es,其他列簇不同步,发现先导入到es上的数据,在导入其他列簇的时候触发postDelete操作又删除了

    相关文章

      网友评论

          本文标题:hbase使用es做二级索引-使用observer

          本文链接:https://www.haomeiwen.com/subject/umjgdhtx.html