美文网首页
(seata源码研究)JAVA代码实现eureka的服务注册、服

(seata源码研究)JAVA代码实现eureka的服务注册、服

作者: 小胖学编程 | 来源:发表于2021-07-01 17:29 被阅读0次

    在阅读seata源码时,将知识点细粒度话,并且能运用到自己的项目中。

    本文不依赖Spring体系,完成eureka的服务注册、服务下线、服务ip列表查询。

    流程:

    1. 技术组件的服务端需要收集处理客户端发送的信息;
    2. 业务代码引入技术组件的pom后,业务代码执行某些操作时向技术组件的服务端发送信息。

    注意点:

    技术组件服务端可以注册eureka、zk、redis、consul、nacos等注册中心。


    本文用法:

    1. 服务端注册在eureka上,客户端去eureka上拉取服务端注册的ip列表信息。然后通过轮询的方式获取ip地址,进行通信。

    1. 引入依赖

        <properties>
            <eureka-clients.version>1.9.5</eureka-clients.version>
            <archaius-core.version>0.7.6</archaius-core.version>
            <javax-inject.version>1</javax-inject.version>
        </properties>
        <dependencies>
    
            <dependency>
                <groupId>com.netflix.eureka</groupId>
                <artifactId>eureka-client</artifactId>
                <version>${eureka-clients.version}</version>
            </dependency>
            <dependency>
                <groupId>com.netflix.archaius</groupId>
                <artifactId>archaius-core</artifactId>
                <version>${archaius-core.version}</version>
            </dependency>
    
            <dependency>
                <groupId>javax.inject</groupId>
                <artifactId>javax.inject</artifactId>
                <version>${javax-inject.version}</version>
            </dependency>
        </dependencies>
    

    2. 实现代码

    2.1 eureka实例配置类

    package com.tellme.registry;
    
    import com.netflix.appinfo.MyDataCenterInstanceConfig;
    import org.apache.commons.lang3.StringUtils;
    
    /**
     * @author: rui_849217@163.com
     * override MyDataCenterInstanceConfig for set value,
     * eg: instanceId \ipAddress \ applicationName...
     */
    public class CustomEurekaInstanceConfig extends MyDataCenterInstanceConfig  {
        private String applicationName;
        private String instanceId;
        private String ipAddress;
        private int port = -1;
    
        @Override
        public String getInstanceId() {
            if (StringUtils.isBlank(instanceId)) {
                return super.getInstanceId();
            }
            return instanceId;
        }
    
        @Override
        public String getIpAddress() {
            if (StringUtils.isBlank(ipAddress)) {
                return super.getIpAddress();
            }
            return ipAddress;
        }
    
        @Override
        public int getNonSecurePort() {
            if (port == -1) {
                return super.getNonSecurePort();
            }
            return port;
        }
    
        @Override
        public String getAppname() {
            if (StringUtils.isBlank(applicationName)) {
                return super.getAppname();
            }
            return applicationName;
        }
    
        @Override
        public String getHostName(boolean refresh) {
            return this.getIpAddress();
        }
    
        public void setInstanceId(String instanceId) {
            this.instanceId = instanceId;
        }
    
        public void setIpAddress(String ipAddress) {
            this.ipAddress = ipAddress;
        }
    
        public void setPort(int port) {
            this.port = port;
        }
    
        public void setApplicationName(String applicationName) {
            this.applicationName = applicationName;
        }
    }
    

    2.2 注册中心接口类

    public interface RegistryService<T> {
    
        void register(InetSocketAddress address) throws Exception;
    
        void unregister(InetSocketAddress address) throws Exception;
    
        void subscribe(String cluster, T listener) throws Exception;
    
        void unsubscribe(String cluster, T listener) throws Exception;
    
        /**
         * 获取注册的服务器列表
         *
         * @param key 注册的服务名
         * @return ip的列表信息
         * @throws Exception
         */
        List<InetSocketAddress> lookup(String key) throws Exception;
    
        void close() throws Exception;
    
    }
    

    2.3 eureka的实现类

    该方法可以完成服务注册、服务下线、服务列表拉取。

    通过EurekaEventListener监听,实现了当服务列表变化后,可动态修改ip列表的功能。

    package com.tellme.registry;
    
    import com.netflix.appinfo.ApplicationInfoManager;
    import com.netflix.appinfo.InstanceInfo;
    import com.netflix.appinfo.providers.EurekaConfigBasedInstanceInfoProvider;
    import com.netflix.config.ConfigurationManager;
    import com.netflix.discovery.DefaultEurekaClientConfig;
    import com.netflix.discovery.DiscoveryClient;
    import com.netflix.discovery.EurekaClient;
    import com.netflix.discovery.EurekaEventListener;
    import com.netflix.discovery.shared.Application;
    import com.tellme.util.NetUtil;
    import org.apache.commons.lang3.StringUtils;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.util.CollectionUtils;
    
    import java.net.InetSocketAddress;
    import java.util.*;
    import java.util.concurrent.ConcurrentHashMap;
    import java.util.concurrent.ConcurrentMap;
    import java.util.stream.Collectors;
    
    /**
     * The type Eureka registry service.
     *
     * @author: rui_849217@163.com
     */
    public class EurekaRegistryServiceImpl2 implements RegistryService<EurekaEventListener> {
        private static final Logger LOGGER = LoggerFactory.getLogger(EurekaRegistryServiceImpl2.class);
    
        private static final String DEFAULT_APPLICATION = "default";
        private static final String PRO_SERVICE_URL_KEY = "serviceUrl";
        private static final String FILE_ROOT_REGISTRY = "registry";
        private static final String FILE_CONFIG_SPLIT_CHAR = ".";
        private static final String REGISTRY_TYPE = "eureka";
        private static final String CLUSTER = "application";
        private static final String REGISTRY_WEIGHT = "weight";
        private static final String EUREKA_CONFIG_SERVER_URL_KEY = "eureka.serviceUrl.default";
        private static final String EUREKA_CONFIG_REFRESH_KEY = "eureka.client.refresh.interval";
        private static final String EUREKA_CONFIG_SHOULD_REGISTER = "eureka.registration.enabled";
        private static final String EUREKA_CONFIG_METADATA_WEIGHT = "eureka.metadata.weight";
        private static final int EUREKA_REFRESH_INTERVAL = 5;
        private static final int MAP_INITIAL_CAPACITY = 8;
        private static final String DEFAULT_WEIGHT = "1";
        private static ConcurrentMap<String, Set<InetSocketAddress>> clusterAddressMap;
    
        private static volatile boolean subscribeListener = false;
        private static volatile ApplicationInfoManager applicationInfoManager;
        private static volatile CustomEurekaInstanceConfig instanceConfig;
        private static volatile EurekaRegistryServiceImpl2 instance;
        private static volatile EurekaClient eurekaClient;
    
        private EurekaRegistryServiceImpl2() {
        }
    
        public static EurekaRegistryServiceImpl2 getInstance() {
            if (instance == null) {
                synchronized (EurekaRegistryServiceImpl2.class) {
                    if (instance == null) {
                        clusterAddressMap = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
                        instanceConfig = new CustomEurekaInstanceConfig();
                        instance = new EurekaRegistryServiceImpl2();
                    }
                }
            }
            return instance;
        }
    
        @Override
        public void register(InetSocketAddress address) throws Exception {
            NetUtil.validAddress(address);
            instanceConfig.setIpAddress(address.getAddress().getHostAddress());
            instanceConfig.setPort(address.getPort());
            instanceConfig.setApplicationName(getApplicationName());
            instanceConfig.setInstanceId(getInstanceId());
            getEurekaClient(true);
            applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.UP);
        }
    
        @Override
        public void unregister(InetSocketAddress address) throws Exception {
            if (eurekaClient == null) {
                return;
            }
            applicationInfoManager.setInstanceStatus(InstanceInfo.InstanceStatus.DOWN);
        }
    
        @Override
        public void subscribe(String cluster, EurekaEventListener listener) throws Exception {
            subscribeListener = true;
            getEurekaClient(false).registerEventListener(listener);
        }
    
        @Override
        public void unsubscribe(String cluster, EurekaEventListener listener) throws Exception {
            subscribeListener = false;
            getEurekaClient(false).unregisterEventListener(listener);
        }
    
        @Override
        public List<InetSocketAddress> lookup(String key) throws Exception {
            String clusterName =key;// getServiceGroup(key);
            if (clusterName == null) {
                return null;
            }
            if (!subscribeListener) {
                refreshCluster();
                subscribe(null, event -> {
                    try {
                        refreshCluster();
                    } catch (Exception e) {
                        LOGGER.error("Eureka event listener refreshCluster error:{}", e.getMessage(), e);
                    }
                });
            }
            return new ArrayList<>(clusterAddressMap.getOrDefault(clusterName.toUpperCase(), Collections.emptySet()));
        }
    
        @Override
        public void close() throws Exception {
            if (eurekaClient != null) {
                eurekaClient.shutdown();
            }
            clean();
        }
    
        private void refreshCluster() {
            List<Application> applications = getEurekaClient(false).getApplications().getRegisteredApplications();
    
            if (CollectionUtils.isEmpty(applications)) {
                clusterAddressMap.clear();
    
                if (LOGGER.isDebugEnabled()) {
                    LOGGER.debug("refreshCluster success, cluster empty!");
                }
                return;
            }
    
            ConcurrentMap<String, Set<InetSocketAddress>> collect = new ConcurrentHashMap<>(MAP_INITIAL_CAPACITY);
    
            for (Application application : applications) {
                List<InstanceInfo> instances = application.getInstances();
    
                if (!CollectionUtils.isEmpty(instances)) {
                    Set<InetSocketAddress> addressSet = instances.stream()
                            .map(instance -> new InetSocketAddress(instance.getIPAddr(), instance.getPort()))
                            .collect(Collectors.toSet());
                    collect.put(application.getName(), addressSet);
                }
            }
    
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("refreshCluster success, cluster: " + collect);
            }
    
            clusterAddressMap = collect;
        }
    
        private Properties getEurekaProperties(boolean needRegister) {
            Properties eurekaProperties = new Properties();
            eurekaProperties.setProperty(EUREKA_CONFIG_REFRESH_KEY, String.valueOf(EUREKA_REFRESH_INTERVAL));
    
            String url = "http://localhost:7001/eureka";
            eurekaProperties.setProperty(EUREKA_CONFIG_SERVER_URL_KEY, url);
            String weight = "1";
            if (StringUtils.isNotBlank(weight)) {
                eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, weight);
            } else {
                eurekaProperties.setProperty(EUREKA_CONFIG_METADATA_WEIGHT, DEFAULT_WEIGHT);
            }
            if (!needRegister) {
                eurekaProperties.setProperty(EUREKA_CONFIG_SHOULD_REGISTER, "false");
            }
    
            return eurekaProperties;
        }
    
        private String getApplicationName() {
            String application = null;
            if (application == null) {
                application = DEFAULT_APPLICATION;
            }
            return application;
        }
    
        private EurekaClient getEurekaClient(boolean needRegister) {
            if (eurekaClient == null) {
                synchronized (EurekaRegistryServiceImpl2.class) {
                    try {
                        if (eurekaClient == null) {
                            if (!needRegister) {
                                instanceConfig = new CustomEurekaInstanceConfig();
                            }
                            ConfigurationManager.loadProperties(getEurekaProperties(needRegister));
                            InstanceInfo instanceInfo = new EurekaConfigBasedInstanceInfoProvider(instanceConfig).get();
                            applicationInfoManager = new ApplicationInfoManager(instanceConfig, instanceInfo);
                            eurekaClient = new DiscoveryClient(applicationInfoManager, new DefaultEurekaClientConfig());
                        }
                    } catch (Exception e) {
                        clean();
                        throw new RuntimeException("register eureka is error!", e);
                    }
                }
            }
            return eurekaClient;
        }
    
        private void clean() {
            eurekaClient = null;
            applicationInfoManager = null;
            instanceConfig = null;
        }
    
        private String getInstanceId() {
            return String.format("%s:%s:%d", instanceConfig.getIpAddress(), instanceConfig.getAppname(),
                instanceConfig.getNonSecurePort());
        }
    
        private String getEurekaServerUrlFileKey() {
            return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, PRO_SERVICE_URL_KEY);
        }
    
        private String getEurekaApplicationFileKey() {
            return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, CLUSTER);
        }
    
        private String getEurekaInstanceWeightFileKey() {
            return String.join(FILE_CONFIG_SPLIT_CHAR, FILE_ROOT_REGISTRY, REGISTRY_TYPE, REGISTRY_WEIGHT);
        }
    }
    
    

    相关文章

      网友评论

          本文标题:(seata源码研究)JAVA代码实现eureka的服务注册、服

          本文链接:https://www.haomeiwen.com/subject/xyzjultx.html