美文网首页
Eureka源码分析(十二) 网络通信

Eureka源码分析(十二) 网络通信

作者: skyguard | 来源:发表于2018-11-12 10:30 被阅读0次

    下面我们来说一下eureka的网络通信。eureka主要包含两个方面的网络通信:
    Eureka-Client 请求 Eureka-Server 的网络通信
    Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信
    EurekaJerseyClient ,EurekaHttpClient 接口。主要是基于 Apache HttpClient4 实现的 Jersey Client。关于Jersey,就不在这里介绍了。
    EurekaJerseyClientImpl ,EurekaHttpClient 实现类,看下具体的实现

    public EurekaJerseyClientImpl(int connectionTimeout, int readTimeout, final int connectionIdleTimeout,
                                  ClientConfig clientConfig) {
        try {
            jerseyClientConfig = clientConfig;
            // 创建  ApacheHttpClient
            apacheHttpClient = ApacheHttpClient4.create(jerseyClientConfig);
    
            // 设置 连接参数
            HttpParams params = apacheHttpClient.getClientHandler().getHttpClient().getParams();
            HttpConnectionParams.setConnectionTimeout(params, connectionTimeout);
            HttpConnectionParams.setSoTimeout(params, readTimeout);
    
            // 创建 ApacheHttpClientConnectionCleaner
            this.apacheHttpClientConnectionCleaner = new ApacheHttpClientConnectionCleaner(apacheHttpClient, connectionIdleTimeout);
        } catch (Throwable e) {
            throw new RuntimeException("Cannot create Jersey client", e);
        }
    }
    

    EurekaJerseyClientBuilder ,EurekaJerseyClientImpl 内部类,用于创建 EurekaJerseyClientImpl 。调用 build方法,创建 EurekaJerseyClientImpl 。
    EurekaHttpClient ,Eureka-Server HTTP 访问客户端,定义了具体的 Eureka-Server API 调用方法。
    EurekaHttpResponse ,请求响应对象。
    TransportClientFactory ,创建 EurekaHttpClient 的工厂接口。
    AbstractJerseyEurekaHttpClient ,实现 EurekaHttpClient 的抽象类,真正实现了具体的 Eureka-Server API 调用方法。看下具体的实现

     public EurekaHttpResponse<Void> register(InstanceInfo info) {
        // 设置 请求地址
        String urlPath = "apps/" + info.getAppName();
        ClientResponse response = null;
        try {
            Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
            // 设置 请求头
            addExtraHeaders(resourceBuilder);
            // 请求 Eureka-Server
            response = resourceBuilder
                    .header("Accept-Encoding", "gzip") // GZIP
                    .type(MediaType.APPLICATION_JSON_TYPE) // 请求参数格式 JSON
                    .accept(MediaType.APPLICATION_JSON) // 响应结果格式 JSON
                    .post(ClientResponse.class, info); // 请求参数
            // 创建 EurekaHttpResponse
            return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
        } finally {
            if (logger.isDebugEnabled()) {
                logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
                        response == null ? "N/A" : response.getStatus());
            }
            if (response != null) {
                response.close();
            }
        }
    }
    

    JerseyApplicationClient ,实现 Eureka-Client 请求 Eureka-Server 的网络通信。
    JerseyEurekaHttpClientFactory ,创建 JerseyApplicationClient 的工厂类。
    JerseyEurekaHttpClientFactoryBuilder ,JerseyEurekaHttpClientFactory 内部类,用于创建 JerseyEurekaHttpClientFactory 。
    JerseyReplicationClient ,Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信。
    实现 AbstractJerseyEurekaHttpClient的addExtraHeaders方法,添加自定义头 x-netflix-discovery-replication=true,看下具体的实现

    protected void addExtraHeaders(Builder webResource) {
        webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true");
    }
    

    实现HttpReplicationClient 接口,实现了 submitBatchUpdates方法

    public EurekaHttpResponse<ReplicationListResponse> submitBatchUpdates(ReplicationList replicationList) {
        ClientResponse response = null;
        try {
            response = jerseyApacheClient.resource(serviceUrl)
                    .path(PeerEurekaNode.BATCH_URL_PATH)
                    .accept(MediaType.APPLICATION_JSON_TYPE)
                    .type(MediaType.APPLICATION_JSON_TYPE)
                    .post(ClientResponse.class, replicationList);
            if (!isSuccess(response.getStatus())) {
                return anEurekaHttpResponse(response.getStatus(), ReplicationListResponse.class).build();
            }
            ReplicationListResponse batchResponse = response.getEntity(ReplicationListResponse.class);
            return anEurekaHttpResponse(response.getStatus(), batchResponse).type(MediaType.APPLICATION_JSON_TYPE).build();
        } finally {
            if (response != null) {
                response.close();
            }
        }
    }
    

    MetricsCollectingEurekaHttpClient ,监控指标收集 EurekaHttpClient ,配合 Netflix Servo实现监控信息采集。看下具体的实现

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        // 获得 请求类型 的 请求指标
        EurekaHttpClientRequestMetrics requestMetrics = metricsByRequestType.get(requestExecutor.getRequestType());
        Stopwatch stopwatch = requestMetrics.latencyTimer.start();
        try {
            // 执行请求
            EurekaHttpResponse<R> httpResponse = requestExecutor.execute(delegate);
            // 增加 请求指标
            requestMetrics.countersByStatus.get(mappedStatus(httpResponse)).increment();
            return httpResponse;
        } catch (Exception e) {
            requestMetrics.connectionErrors.increment();
            exceptionsMetric.count(e);
            throw e;
        } finally {
            stopwatch.stop();
        }
    }
    

    RedirectingEurekaHttpClient ,寻找非 302 重定向的 Eureka-Server 的 EurekaHttpClient 。看下具体的实现

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        EurekaHttpClient currentEurekaClient = delegateRef.get();
        if (currentEurekaClient == null) { // 未找到非 302 的 Eureka-Server
            AtomicReference<EurekaHttpClient> currentEurekaClientRef = new AtomicReference<>(factory.newClient(serviceEndpoint));
            try {
                EurekaHttpResponse<R> response = executeOnNewServer(requestExecutor, currentEurekaClientRef);
                // 关闭原有的委托 EurekaHttpClient ,并设置当前成功非 302 请求的 EurekaHttpClient
                TransportUtils.shutdown(delegateRef.getAndSet(currentEurekaClientRef.get()));
                return response;
            } catch (Exception e) {
                logger.error("Request execution error", e);
                TransportUtils.shutdown(currentEurekaClientRef.get());
                throw e;
            }
        } else { // 已经找到非 302 的 Eureka-Server
            try {
                return requestExecutor.execute(currentEurekaClient);
            } catch (Exception e) {
                logger.error("Request execution error", e);
                delegateRef.compareAndSet(currentEurekaClient, null);
                currentEurekaClient.shutdown();
                throw e;
            }
        }
    }
    

    RetryableEurekaHttpClient ,支持向多个 Eureka-Server 请求重试的 EurekaHttpClient 。看下具体的实现

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        List<EurekaEndpoint> candidateHosts = null;
        int endpointIdx = 0;
        for (int retry = 0; retry < numberOfRetries; retry++) {
            EurekaHttpClient currentHttpClient = delegate.get();
            EurekaEndpoint currentEndpoint = null;
    
            // 当前委托的 EurekaHttpClient 不存在
            if (currentHttpClient == null) {
                // 获得候选的 Eureka-Server 地址数组
                if (candidateHosts == null) {
                    candidateHosts = getHostCandidates();
                    if (candidateHosts.isEmpty()) {
                        throw new TransportException("There is no known eureka server; cluster server list is empty");
                    }
                }
    
                // 超过候选的 Eureka-Server 地址数组上限
                if (endpointIdx >= candidateHosts.size()) {
                    throw new TransportException("Cannot execute request on any known server");
                }
    
                // 创建候选的 EurekaHttpClient
                currentEndpoint = candidateHosts.get(endpointIdx++);
                currentHttpClient = clientFactory.newClient(currentEndpoint);
            }
    
            try {
                // 执行请求
                EurekaHttpResponse<R> response = requestExecutor.execute(currentHttpClient);
                // 判断是否为可接受的相应,若是,返回。
                if (serverStatusEvaluator.accept(response.getStatusCode(), requestExecutor.getRequestType())) {
                    delegate.set(currentHttpClient);
                    if (retry > 0) {
                        logger.info("Request execution succeeded on retry #{}", retry);
                    }
                    return response;
                }
                logger.warn("Request execution failure with status code {}; retrying on another server if available", response.getStatusCode());
            } catch (Exception e) {
                logger.warn("Request execution failed with message: {}", e.getMessage());  // just log message as the underlying client should log the stacktrace
            }
    
            // 请求失败,若是 currentHttpClient ,清除 delegate
            // Connection error or 5xx from the server that must be retried on another server
            delegate.compareAndSet(currentHttpClient, null);
    
            // 请求失败,将 currentEndpoint 添加到隔离集合
            if (currentEndpoint != null) {
                quarantineSet.add(currentEndpoint);
            }
        }
        throw new TransportException("Retry limit reached; giving up on completing the request");
    }
    

    SessionedEurekaHttpClient ,支持会话的 EurekaHttpClient 。执行定期的重建会话,防止一个 Eureka-Client 永远只连接一个特定的 Eureka-Server 。反过来,这也保证了 Eureka-Server 集群变更时,Eureka-Client 对 Eureka-Server 连接的负载均衡。看下具体的实现

    protected <R> EurekaHttpResponse<R> execute(RequestExecutor<R> requestExecutor) {
        long now = System.currentTimeMillis();
        long delay = now - lastReconnectTimeStamp;
    
        // 超过 当前会话时间,关闭当前委托的 EurekaHttpClient 。
        if (delay >= currentSessionDurationMs) {
            logger.debug("Ending a session and starting anew");
            lastReconnectTimeStamp = now;
            currentSessionDurationMs = randomizeSessionDuration(sessionDurationMs);
            TransportUtils.shutdown(eurekaHttpClientRef.getAndSet(null));
        }
    
        // 获得委托的 EurekaHttpClient 。若不存在,则创建新的委托的 EurekaHttpClient 。
        EurekaHttpClient eurekaHttpClient = eurekaHttpClientRef.get();
        if (eurekaHttpClient == null) {
            eurekaHttpClient = TransportUtils.getOrSetAnotherClient(eurekaHttpClientRef, clientFactory.newClient());
        }
        return requestExecutor.execute(eurekaHttpClient);
    }
    

    对于 Eureka-Server 来说,调用 JerseyReplicationClient的createReplicationClient静态方法即可创建用于 Eureka-Server 集群内,Eureka-Server 请求 其它的Eureka-Server 的网络通信客户端。
    对于 Eureka-Client 来说,分成用于注册应用实例( registrationClient )和查询注册信息( newQueryClient )的两个不同网络通信客户端。在 DiscoveryClient 初始化时进行创建。看下具体的实现

     private void scheduleServerEndpointTask(EurekaTransport eurekaTransport,
                                            AbstractDiscoveryClientOptionalArgs args) {
    
            
        Collection<?> additionalFilters = args == null
                ? Collections.emptyList()
                : args.additionalFilters;
    
        EurekaJerseyClient providedJerseyClient = args == null
                ? null
                : args.eurekaJerseyClient;
        
        TransportClientFactories argsTransportClientFactories = null;
        if (args != null && args.getTransportClientFactories() != null) {
            argsTransportClientFactories = args.getTransportClientFactories();
        }
        
        // Ignore the raw types warnings since the client filter interface changed between jersey 1/2
        @SuppressWarnings("rawtypes")
        TransportClientFactories transportClientFactories = argsTransportClientFactories == null
                ? new Jersey1TransportClientFactories()
                : argsTransportClientFactories;
    
        // If the transport factory was not supplied with args, assume they are using jersey 1 for passivity
        // noinspection unchecked
        eurekaTransport.transportClientFactory = providedJerseyClient == null
                ? transportClientFactories.newTransportClientFactory(clientConfig, additionalFilters, applicationInfoManager.getInfo())
                : transportClientFactories.newTransportClientFactory(additionalFilters, providedJerseyClient);
    
        // 初始化 应用解析器的应用实例数据源 TODO[0028]写入集群和读取集群
        ApplicationsResolver.ApplicationsSource applicationsSource = new ApplicationsResolver.ApplicationsSource() {
            @Override
            public Applications getApplications(int stalenessThreshold, TimeUnit timeUnit) {
                long thresholdInMs = TimeUnit.MILLISECONDS.convert(stalenessThreshold, timeUnit);
                long delay = getLastSuccessfulRegistryFetchTimePeriod();
                if (delay > thresholdInMs) {
                    logger.info("Local registry is too stale for local lookup. Threshold:{}, actual:{}",
                            thresholdInMs, delay);
                    return null;
                } else {
                    return localRegionApps.get();
                }
            }
        };
    
        // 创建 EndPoint 解析器
        eurekaTransport.bootstrapResolver = EurekaHttpClients.newBootstrapResolver(
                clientConfig,
                transportConfig,
                eurekaTransport.transportClientFactory,
                applicationInfoManager.getInfo(),
                applicationsSource
        );
    
        if (clientConfig.shouldRegisterWithEureka()) {
            EurekaHttpClientFactory newRegistrationClientFactory = null;
            EurekaHttpClient newRegistrationClient = null;
            try {
                newRegistrationClientFactory = EurekaHttpClients.registrationClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        transportConfig
                );
                newRegistrationClient = newRegistrationClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            eurekaTransport.registrationClientFactory = newRegistrationClientFactory;
            eurekaTransport.registrationClient = newRegistrationClient;
        }
    
        // new method (resolve from primary servers for read)
        // Configure new transport layer (candidate for injecting in the future)
        if (clientConfig.shouldFetchRegistry()) {
            EurekaHttpClientFactory newQueryClientFactory = null;
            EurekaHttpClient newQueryClient = null;
            try {
                newQueryClientFactory = EurekaHttpClients.queryClientFactory(
                        eurekaTransport.bootstrapResolver,
                        eurekaTransport.transportClientFactory,
                        clientConfig,
                        transportConfig,
                        applicationInfoManager.getInfo(),
                        applicationsSource
                );
                newQueryClient = newQueryClientFactory.newClient();
            } catch (Exception e) {
                logger.warn("Transport initialization failure", e);
            }
            eurekaTransport.queryClientFactory = newQueryClientFactory;
            eurekaTransport.queryClient = newQueryClient;
        }
    }
    

    eureka的网络通信就介绍到这里了。

    相关文章

      网友评论

          本文标题:Eureka源码分析(十二) 网络通信

          本文链接:https://www.haomeiwen.com/subject/ominxqtx.html