美文网首页
Es封装RestHighLevelClient和BulkProc

Es封装RestHighLevelClient和BulkProc

作者: 小胖学编程 | 来源:发表于2021-05-17 10:44 被阅读0次

    Es的基础工具类,可以获取单例的RestHighLevelClient类和BulkProcessor类。

    1. 引入依赖

            <!--解决:java.lang.NoClassDefFoundError: org/elasticsearch/common/xcontent/DeprecationHandler-->
            <!-- elasticsearch -->
            <dependency>
                <groupId>org.elasticsearch</groupId>
                <artifactId>elasticsearch</artifactId>
                <version>7.5.1</version>
            </dependency>
    
            <!-- elasticsearch-rest-client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-client</artifactId>
                <version>7.5.1</version>
            </dependency>
    
            <!-- elasticsearch-rest-high-level-client -->
            <dependency>
                <groupId>org.elasticsearch.client</groupId>
                <artifactId>elasticsearch-rest-high-level-client</artifactId>
                <version>7.5.1</version>
                <exclusions>
                    <exclusion>
                        <groupId>org.elasticsearch.client</groupId>
                        <artifactId>elasticsearch-rest-client</artifactId>
                    </exclusion>
                    <exclusion>
                        <groupId>org.elasticsearch</groupId>
                        <artifactId>elasticsearch</artifactId>
                    </exclusion>
                </exclusions>
            </dependency>
    

    2. 工具类

    @Slf4j
    public class EsUtil {
    
        private static RestHighLevelClient restHighLevelClient;
    
        private static BulkProcessor bulkProcessor;
    
        static {
            List<HttpHost> httpHosts = new ArrayList<>();
            //填充数据
            httpHosts.add(new HttpHost("120.0.0.1", 9200));
            httpHosts.add(new HttpHost("120.0.0.1", 9201));
            httpHosts.add(new HttpHost("120.0.0.1", 9202));
            //填充host节点
            RestClientBuilder builder = RestClient.builder(httpHosts.toArray(new HttpHost[0]));
    
            builder.setRequestConfigCallback(requestConfigBuilder -> {
                requestConfigBuilder.setConnectTimeout(1000);
                requestConfigBuilder.setSocketTimeout(1000);
                requestConfigBuilder.setConnectionRequestTimeout(1000);
                return requestConfigBuilder;
            });
    
            //填充用户名密码
            final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
            credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials("username", "password"));
    
            builder.setHttpClientConfigCallback(httpClientBuilder -> {
                httpClientBuilder.setMaxConnTotal(30);
                httpClientBuilder.setMaxConnPerRoute(30);
                httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
                return httpClientBuilder;
            });
    
            restHighLevelClient = new RestHighLevelClient(builder);
        }
    
        static {
            bulkProcessor=createBulkProcessor();
        }
    
        private static BulkProcessor createBulkProcessor() {
    
            BulkProcessor.Listener listener = new BulkProcessor.Listener() {
                @Override
                public void beforeBulk(long executionId, BulkRequest request) {
                    log.info("1. 【beforeBulk】批次[{}] 携带 {} 请求数量", executionId, request.numberOfActions());
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      BulkResponse response) {
                    if (!response.hasFailures()) {
                        log.info("2. 【afterBulk-成功】批量 [{}] 完成在 {} ms", executionId, response.getTook().getMillis());
                    } else {
                        BulkItemResponse[] items = response.getItems();
                        for (BulkItemResponse item : items) {
                            if (item.isFailed()) {
                                log.info("2. 【afterBulk-失败】批量 [{}] 出现异常的原因 : {}", executionId, item.getFailureMessage());
                                break;
                            }
                        }
                    }
                }
    
                @Override
                public void afterBulk(long executionId, BulkRequest request,
                                      Throwable failure) {
    
                    List<DocWriteRequest<?>> requests = request.requests();
                    List<String> esIds = requests.stream().map(DocWriteRequest::id).collect(Collectors.toList());
                    log.error("3. 【afterBulk-failure失败】es执行bluk失败,失败的esId为:{}", esIds, failure);
                }
            };
    
            BulkProcessor.Builder builder = BulkProcessor.builder(((bulkRequest, bulkResponseActionListener) -> {
                restHighLevelClient.bulkAsync(bulkRequest, RequestOptions.DEFAULT, bulkResponseActionListener);
            }), listener);
    
            builder.setBulkActions(10000);
            builder.setBulkSize(new ByteSizeValue(8L, ByteSizeUnit.MB));
            //设置允许执行的并发请求数。
            builder.setConcurrentRequests(8);
            builder.setFlushInterval(TimeValue.timeValueSeconds(1));
            //设置重试策略
            builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1), 3));
            return builder.build();
        }
    
    
        public static RestHighLevelClient getRestHighLevelClient() {
            return restHighLevelClient;
        }
    
        public static BulkProcessor getBulkProcessor() {
            return bulkProcessor;
        }
    
        //远程调用
        public static List<SearchHit> remoteSearch(SearchRequest searchRequest, SearchSourceBuilder searchSourceBuilder) throws IOException {
            List<SearchHit> results = new ArrayList<>();
            searchRequest.indices("test_demo");
            searchRequest.source(searchSourceBuilder);
            log.info("dsl:" + searchSourceBuilder.toString());
            SearchResponse response = EsUtil.getRestHighLevelClient().search(searchRequest, RequestOptions.DEFAULT);
    
            SearchHits hits = response.getHits();
            Iterator<SearchHit> iterator = hits.iterator();
            while (iterator.hasNext()) {
                SearchHit next = iterator.next();
                log.info("输出分数:" + next.getScore());
                log.info("输出数据:" + next.getSourceAsString());
                results.add(next);
            }
            return results;
        }
    }
    

    相关文章

      网友评论

          本文标题:Es封装RestHighLevelClient和BulkProc

          本文链接:https://www.haomeiwen.com/subject/bngqjltx.html