1.maven包引入。此处使用的是spring-data-elasticsearch
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
<exclusions>
<exclusion>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>x-pack-transport</artifactId>
<version>6.3.1</version>
</dependency>
<dependency>
<!-- required by elasticsearch -->
<groupId>org.elasticsearch.plugin</groupId>
<artifactId>transport-netty4-client</artifactId>
<version>6.3.1</version>
<!--<scope>test</scope>-->
</dependency>
<dependency>
<groupId>com.unboundid</groupId>
<artifactId>unboundid-ldapsdk</artifactId>
<version>4.0.9</version>
</dependency>
2.spring-data-elasticsearch改造
原有的是不支持xpack权限的,需要进行改造,需要自己创建TransportClient
配置项参数需要新增:
在这里需要使用PreBuiltXPackTransportClient来替代TransportClient
.put("xpack.security.transport.ssl.enabled", true)
.put("xpack.security.transport.ssl.verification_mode", "certificate")
.put("xpack.security.transport.ssl.keystore.path", newCertPath)
.put("xpack.security.transport.ssl.truststore.path", newCertPath)
关键代码,需要配置ssl。包括证书和地址
.put("cluster.name", cluster_name)
.put("xpack.security.user", user + ":" + password)
已经用户名密码
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.io.FileUtils;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.xpack.client.PreBuiltXPackTransportClient;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.annotation.Order;
import java.io.File;
import java.io.InputStream;
import java.net.UnknownHostException;
import java.util.List;
@Configuration
@Slf4j
@Order(-1)
public class EsConfiguration {
@Value("${spring.data.elasticsearch.cluster-name}")
private String cluster_name;
@Value("${spring.data.elasticsearch.cluster-nodes}")
private String cluster_nodes;
@Value("${spring.data.elasticsearch.properties.client.transport.sniff}")
private boolean sniff;
@Value("${spring.data.elasticsearch.properties.client.transport.ignore_cluster_name}")
private boolean ignore_cluster_name;
@Value("${spring.data.elasticsearch.properties.xpack.security.user}")
private String user;
@Value("${spring.data.elasticsearch.properties.xpack.security.password}")
private String password;
@Value("${spring.data.elasticsearch.properties.xpack.security.certificate}")
private String certificate;
@Bean
public TransportClient transportClient() throws UnknownHostException {
log.info("cluster_nodes----" + cluster_nodes);
log.info("cluster_name-----" + cluster_name);
log.info("sniff----" + sniff);
log.info("ignore_cluster_name---" + ignore_cluster_name);
log.info("certificate---" + certificate);
ClusterNodes clusterNodes = ClusterNodes.of(cluster_nodes);
TransportClient client = null;
// String xpackSwitch = apolloConfigService.getEsXpackSwitch();
// if (StringUtils.equals(xpackSwitch, "false")) {
// client = new PreBuiltTransportClient(Settings.builder()
// .put("cluster.name", cluster_name)
// .put("client.transport.ignore_cluster_name", ignore_cluster_name)
// .put("client.transport.sniff", sniff)
// .put("client.transport.ping_timeout", "5s")
// .put("client.transport.nodes_sampler_interval", "5s")
// .build());
// } else {
InputStream stream = CommonDataSourceConfiguration.class.getClassLoader().getResourceAsStream(certificate);
File targetFile = new File(certificate);
try {
if (certificate.split(File.separator).length > 1) {
FileUtils.forceMkdirParent(targetFile);
}
FileUtils.copyInputStreamToFile(stream, targetFile);
} catch (Exception e) {
e.printStackTrace();
}
String newCertPath = targetFile.getAbsolutePath();
log.info("newCertPath: {}." + newCertPath);
log.info("user----" + user);
log.info("password---" + password);
client = new PreBuiltXPackTransportClient(Settings.builder()
.put("cluster.name", cluster_name)
.put("xpack.security.user", user + ":" + password)
.put("client.transport.ignore_cluster_name", ignore_cluster_name)
.put("client.transport.sniff", sniff)
.put("client.transport.ping_timeout", "5s")
.put("client.transport.nodes_sampler_interval", "5s")
.put("xpack.security.transport.ssl.enabled", true)
.put("xpack.security.transport.ssl.verification_mode", "certificate")
.put("xpack.security.transport.ssl.keystore.path", newCertPath)
.put("xpack.security.transport.ssl.truststore.path", newCertPath)
.build());
// }
clusterNodes.stream() //
.peek(it -> log.info("Adding transport node : " + it.toString())) //
.forEach(client::addTransportAddress);
List<DiscoveryNode> nodes = client.connectedNodes();
log.info("---初始化完成--" + nodes);
return client;
}
}
clusterNodes相关代码:
/*
* Copyright 2018-2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.shuidihuzhu.ad.admin;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.transport.TransportAddress;
import org.springframework.data.util.Streamable;
import org.springframework.util.Assert;
import org.springframework.util.StringUtils;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.stream.Collectors;
/**
* Value object to represent a list of cluster nodes.
*
* @author Oliver Gierke
* @since 3.1
*/
class ClusterNodes implements Streamable<TransportAddress> {
public static ClusterNodes DEFAULT = ClusterNodes.of("127.0.0.1:9300");
private static final String COLON = ":";
private static final String COMMA = ",";
private final List<TransportAddress> clusterNodes;
/**
* Creates a new {@link ClusterNodes} by parsing the given source.
*
* @param source must not be {@literal null} or empty.
*/
private ClusterNodes(String source) {
Assert.hasText(source, "Cluster nodes source must not be null or empty!");
String[] nodes = StringUtils.delimitedListToStringArray(source, COMMA);
this.clusterNodes = Arrays.stream(nodes).map(node -> {
String[] segments = StringUtils.delimitedListToStringArray(node, COLON);
Assert.isTrue(segments.length == 2,
() -> String.format("Invalid cluster node %s in %s! Must be in the format host:port!", node, source));
String host = segments[0].trim();
String port = segments[1].trim();
Assert.hasText(host, () -> String.format("No host name given cluster node %s!", node));
Assert.hasText(port, () -> String.format("No port given in cluster node %s!", node));
return new InetSocketTransportAddress(toInetAddress(host), Integer.valueOf(port));
}).collect(Collectors.toList());
}
/**
* Creates a new {@link ClusterNodes} by parsing the given source. The expected format is a comma separated list of
* host-port-combinations separated by a colon: {@code host:port,host:port,…}.
*
* @param source must not be {@literal null} or empty.
* @return
*/
public static ClusterNodes of(String source) {
return new ClusterNodes(source);
}
/*
* (non-Javadoc)
* @see java.lang.Iterable#iterator()
*/
@Override
public Iterator<TransportAddress> iterator() {
return clusterNodes.iterator();
}
private static InetAddress toInetAddress(String host) {
try {
return InetAddress.getByName(host);
} catch (UnknownHostException o_O) {
throw new IllegalArgumentException(o_O);
}
}
}
网友评论