在阅读seata源码时,将知识点细粒度话,并且能运用到自己的项目中。
本文不依赖Spring体系,完成eureka的服务注册、服务下线、服务ip列表查询。
流程:
- 技术组件的服务端需要收集处理客户端发送的信息;
- 业务代码引入技术组件的pom后,业务代码执行某些操作时向技术组件的服务端发送信息。
注意点:
技术组件服务端可以注册eureka、zk、redis、consul、nacos等注册中心。
本文用法:
- 服务端注册在eureka上,客户端去eureka上拉取服务端注册的ip列表信息。然后通过轮询的方式获取ip地址,进行通信。
1. 引入依赖
<properties>
<eureka-clients.version>1.9.5</eureka-clients.version>
<archaius-core.version>0.7.6</archaius-core.version>
<javax-inject.version>1</javax-inject.version>
</properties>
<dependencies>
<dependency>
<groupId>com.netflix.eureka</groupId>
<artifactId>eureka-client</artifactId>
<version>${eureka-clients.version}</version>
</dependency>
<dependency>
<groupId>com.netflix.archaius</groupId>
<artifactId>archaius-core</artifactId>
<version>${archaius-core.version}</version>
</dependency>
<dependency>
<groupId>javax.inject</groupId>
<artifactId>javax.inject</artifactId>
<version>${javax-inject.version}</version>
</dependency>
</dependencies>
2. 实现代码
2.1 eureka实例配置类
package com.tellme.registry;
import com.netflix.appinfo.MyDataCenterInstanceConfig;
import org.apache.commons.lang3.StringUtils;
/**
* @author: rui_849217@163.com
* override MyDataCenterInstanceConfig for set value,
* eg: instanceId \ipAddress \ applicationName...
*/
public class CustomEurekaInstanceConfig extends MyDataCenterInstanceConfig {
private String applicationName;
private String instanceId;
private String ipAddress;
private int port = -1;
@Override
public String getInstanceId() {
if (StringUtils.isBlank(instanceId)) {
return super.getInstanceId();
}
return instanceId;
}
@Override
public String getIpAddress() {
if (StringUtils.isBlank(ipAddress)) {
return super.getIpAddress();
}
return ipAddress;
}
@Override
public int getNonSecurePort() {
if (port == -1) {
return super.getNonSecurePort();
}
return port;
}
@Override
public String getAppname() {
if (StringUtils.isBlank(applicationName)) {
return super.getAppname();
}
return applicationName;
}
@Override
public String getHostName(boolean refresh) {
return this.getIpAddress();
}
public void setInstanceId(String instanceId) {
this.instanceId = instanceId;
}
public void setIpAddress(String ipAddress) {
this.ipAddress = ipAddress;
}
public void setPort(int port) {
this.port = port;
}
public void setApplicationName(String applicationName) {
this.applicationName = applicationName;
}
}
2.2 注册中心接口类
public interface RegistryService<T> {
void register(InetSocketAddress address) throws Exception;
void unregister(InetSocketAddress address) throws Exception;
void subscribe(String cluster, T listener) throws Exception;
void unsubscribe(String cluster, T listener) throws Exception;
/**
* 获取注册的服务器列表
*
* @param key 注册的服务名
* @return ip的列表信息
* @throws Exception
*/
List<InetSocketAddress> lookup(String key) throws Exception;
void close() throws Exception;
}
2.3 eureka的实现类
该方法可以完成服务注册、服务下线、服务列表拉取。
通过EurekaEventListener
监听,实现了当服务列表变化后,可动态修改ip列表的功能。
package com.tellme.registry;
import com.netflix.appinfo.ApplicationInfoManager;
import com.netflix.appinfo.InstanceInfo;
import com.netflix.appinfo.providers.EurekaConfigBasedInstanceInfoProvider;
import com.netflix.config.ConfigurationManager;
import com.netflix.discovery.DefaultEurekaClientConfig;
import com.netflix.discovery.DiscoveryClient;
import com.netflix.discovery.EurekaClient;
import com.netflix.discovery.EurekaEventListener;
import com.netflix.discovery.shared.Application;
import com.tellme.util.NetUtil;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.CollectionUtils;
import java.net.InetSocketAddress;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
/**
* The type Eureka registry service.
*
* @author: rui_849217@163.com
*/
public class EurekaRegistryServiceImpl2 implements RegistryService<EurekaEventListener> {
private static final Logger LOGGER = LoggerFactory.getLogger(EurekaRegistryServiceImpl2.class);
private static final String DEFAULT_APPLICATION = "default";
private static final String PRO_SERVICE_URL_KEY = "serviceUrl";
private static final String FILE_ROOT_REGISTRY = "registry";
private static final String FILE_CONFIG_SPLIT_CHAR = ".";
private static final String REGISTRY_TYPE = "eureka";
private static final String CLUSTER = "application";
private static final String REGISTRY_WEIGHT = "weight";
private static final String EUREKA_CONFIG_SERVER_URL_KEY = "eureka.serviceUrl.default";
private static final String EUREKA_CONFIG_REFRESH_KEY = "eureka.client.refresh.interval";
private static final String EUREKA_CONFIG_SHOULD_REGISTER = "eureka.registration.enabled";
private static final String EUREKA_CONFIG_METADATA_WEIGHT = "eureka.metadata.weight";
private static final int EUREKA_REFRESH_INTERVAL = 5;
private static final int MAP_INITIAL_CAPACITY = 8;
private static final String DEFAULT_WEIGHT = "1";
private static ConcurrentMap<String, Set<InetSocketAddress>> clusterAddressMap;
private static volatile boolean subscribeListener = false;
private static volatile ApplicationInfoManager applicationInfoManager;
private static volatile CustomEurekaInstanceConfig instanceConfig;
private static volatile EurekaRegistryServiceImpl2 instance;
private static volatile EurekaClient eurekaClient;
private EurekaRegistryServiceImpl2() {
}
public static EurekaRegistryServiceImpl2 getInstance() {
if (instance == null) {
synchronized (EurekaRegistryServiceImpl2.class) {
if (instance == null) {
clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
instanceConfig = new CustomEurekaInstanceConfig();
instance = new EurekaRegistryServiceImpl2();
}
}
}
return instance;
}
@Override
public void register(InetSocketAddress address) throws Exception {
NetUtil.validAddress(address);
instanceConfig.setIpAddress(address.getAddress().getHostAddress());
instanceConfig.setPort(address.getPort());
instanceConfig.setApplicationName(getApplicationName());
instanceConfig.setInstanceId(getInstanceId());
getEurekaClient(true);
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
}
@Override
public void unregister(InetSocketAddress address) throws Exception {
if (eurekaClient == null) {
return;
}
applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
}
@Override
public void subscribe(String cluster, EurekaEventListener listener) throws Exception {
subscribeListener = true;
getEurekaClient(false).registerEventListener(listener);
}
@Override
public void unsubscribe(String cluster, EurekaEventListener listener) throws Exception {
subscribeListener = false;
getEurekaClient(false).unregisterEventListener(listener);
}
@Override
public List<InetSocketAddress> lookup(String key) throws Exception {
String clusterName =key;// getServiceGroup(key);
if (clusterName == null) {
return null;
}
if (!subscribeListener) {
refreshCluster();
subscribe(null, event -> {
try {
refreshCluster();
} catch (Exception e) {
LOGGER.error("Eureka event listener refreshCluster error:{}", e.getMessage(), e);
}
});
}
return new ArrayList<>(clusterAddressMap.getOrDefault(clusterName.toUpperCase(), Collections.emptySet()));
}
@Override
public void close() throws Exception {
if (eurekaClient != null) {
eurekaClient.shutdown();
}
clean();
}
private void refreshCluster() {
List<Application> applications = getEurekaClient(false).getApplications().getRegisteredApplications();
if (CollectionUtils.isEmpty(applications)) {
clusterAddressMap.clear();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("refreshCluster success, cluster empty!");
}
return;
}
ConcurrentMap<String, Set<InetSocketAddress>> collect = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
for (Application application : applications) {
List<InstanceInfo> instances = application.getInstances();
if (!CollectionUtils.isEmpty(instances)) {
Set<InetSocketAddress> addressSet = instances.stream()
.map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort()))
.collect(Collectors.toSet());
collect.put(application.getName(), addressSet);
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("refreshCluster success, cluster: " + collect);
}
clusterAddressMap = collect;
}
private Properties getEurekaProperties(boolean needRegister) {
Properties eurekaProperties = new Properties();
eurekaProperties.setProperty(EUREKA_CONFIG_REFRESH_KEY, String.valueOf(EUREKA_REFRESH_INTERVAL));
String url = "http://localhost:7001/eureka";
eurekaProperties.setProperty(EUREKA_CONFIG_SERVER_URL_KEY, url);
String weight = "1";
if (StringUtils.isNotBlank(weight)) {
eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, weight);
} else {
eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, DEFAULT_WEIGHT);
}
if (!needRegister) {
eurekaProperties.setProperty(EUREKA_CONFIG_SHOULD_REGISTER, "false");
}
return eurekaProperties;
}
private String getApplicationName() {
String application = null;
if (application == null) {
application = DEFAULT_APPLICATION;
}
return application;
}
private EurekaClient getEurekaClient(boolean needRegister) {
if (eurekaClient == null) {
synchronized (EurekaRegistryServiceImpl2.class) {
try {
if (eurekaClient == null) {
if (!needRegister) {
instanceConfig = new CustomEurekaInstanceConfig();
}
ConfigurationManager.loadProperties(getEurekaProperties(needRegister));
InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
eurekaClient = new DiscoveryClient(applicationInfoManager, new DefaultEurekaClientConfig());
}
} catch (Exception e) {
clean();
throw new RuntimeException("register eureka is error!", e);
}
}
}
return eurekaClient;
}
private void clean() {
eurekaClient = null;
applicationInfoManager = null;
instanceConfig = null;
}
private String getInstanceId() {
return String.format("%s:%s:%d", instanceConfig.getIpAddress(), instanceConfig.getAppname(),
instanceConfig.getNonSecurePort());
}
private String getEurekaServerUrlFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_SERVICE_URL_KEY);
}
private String getEurekaApplicationFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, CLUSTER);
}
private String getEurekaInstanceWeightFileKey() {
return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_WEIGHT);
}
}
网友评论