美文网首页
java B2B2C Springcloud多租户电子商城系统-

java B2B2C Springcloud多租户电子商城系统-

作者: IT小跑兵 | 来源:发表于2019-01-24 09:12 被阅读6次

    关于服务与实例列表获取
    EurekaClient端
    我们从Ribbon说起:EurekaClient也存在缓存,应用服务实例列表信息在每个EurekaClient服务消费端都有缓存。一般的,Ribbon的LoadBalancer会读取这个缓存,来知道当前有哪些实例可以调用,从而进行负载均衡。这个loadbalancer同样也有缓存。
    需要JAVA Spring Cloud大型企业分布式微服务云构建的B2B2C电子商务平台源码 一零三八七七四六二六
    首先看这个LoadBalancer的缓存更新机制,相关类是PollingServerListUpdater:

    final Runnable wrapperRunnable = new Runnable() {
        @Override
        public void run() {
            if (!isActive.get()) {
                if (scheduledFuture != null) {
                    scheduledFuture.cancel(true);
                }
                return;
            }
            try {
                //从EurekaClient缓存中获取服务实例列表,保存在本地缓存
                updateAction.doUpdate();
                lastUpdated = System.currentTimeMillis();
            } catch (Exception e) {
                logger.warn("Failed one update cycle", e);
            }
        }
    };
    
    //定时调度
    scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
            wrapperRunnable,
            initialDelayMs,
            refreshIntervalMs,
            TimeUnit.MILLISECONDS
    );
    

    这个updateAction.doUpdate();就是从EurekaClient缓存中获取服务实例列表,保存在BaseLoadBalancer的本地缓存:

    protected volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>());
    
    public void setServersList(List lsrv) {
        //写入allServerList的代码,这里略
    }
    
    @Override
    public List<Server> getAllServers() {
        return Collections.unmodifiableList(allServerList);
    }
    

    这里的getAllServers会在每个负载均衡规则中被调用,例如RoundRobinRule:

    public Server choose(ILoadBalancer lb, Object key) {
        if (lb == null) {
            log.warn("no load balancer");
            return null;
        }
    
        Server server = null;
        int count = 0;
        while (server == null && count++ < 10) {
            List<Server> reachableServers = lb.getReachableServers();
            //获取服务实例列表,调用的就是刚刚提到的getAllServers
            List<Server> allServers = lb.getAllServers();
            int upCount = reachableServers.size();
            int serverCount = allServers.size();
    
            if ((upCount == 0) || (serverCount == 0)) {
                log.warn("No up servers available from load balancer: " + lb);
                return null;
            }
    
            int nextServerIndex = incrementAndGetModulo(serverCount);
            server = allServers.get(nextServerIndex);
    
            if (server == null) {
                /* Transient. */
                Thread.yield();
                continue;
            }
    
            if (server.isAlive() && (server.isReadyToServe())) {
                return (server);
            }
    
            // Next.
            server = null;
        }
    
        if (count >= 10) {
            log.warn("No available alive servers after 10 tries from load balancer: "
                    + lb);
        }
        return server;
    }
    

    这个缓存需要注意下,有时候我们只修改了EurekaClient缓存的更新时间,但是没有修改这个LoadBalancer的刷新本地缓存时间,就是ribbon.ServerListRefreshInterval,这个参数可以设置的很小,因为没有从网络读取,就是从一个本地缓存刷到另一个本地缓存 。
    然后我们来看一下EurekaClient本身的缓存,直接看关键类DiscoveryClient的相关源码,我们这里只关心本地Region的,多Region配置我们先忽略:

    //本地缓存,可以理解为是一个软链接
    private final AtomicReference<Applications> localRegionApps = new AtomicReference<Applications>();
    
    private void initScheduledTasks() {
        //如果配置为需要拉取服务列表,则设置定时拉取任务,这个配置默认是需要拉取服务列表
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        //其他定时任务初始化的代码,忽略
    }
    
    //定时从EurekaServer拉取服务列表的任务
    class CacheRefreshThread implements Runnable {
            public void run() {
                refreshRegistry();
            }
    }
    
    void refreshRegistry() {
        try {
            //多Region配置处理代码,忽略
    
            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }
    
            //日志代码,忽略
        } catch (Throwable e) {
            logger.error("Cannot fetch registry from server", e);
        }        
    }
    
    //定时从EurekaServer拉取服务列表的核心方法
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
    
        try {
            Applications applications = getApplications();
    
            //判断,如果是第一次拉取,或者app列表为空,就进行全量拉取,否则就会进行增量拉取
            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }
    
        //缓存更新完成,发送个event给观察者,目前没啥用 
        onCacheRefreshed();
    
        // 检查下远端的服务实例列表里面包括自己,并且状态是否对,这里我们不关心
        updateInstanceRemoteStatus();
    
        // registry was fetched successfully, so return true
        return true;
    }
    
    //全量拉取代码
    private void getAndStoreFullRegistry() throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications apps = null;
        //访问/eureka/apps接口,拉取所有服务实例信息
        EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
                ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
                : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            apps = httpResponse.getEntity();
        }
        logger.info("The response status is {}", httpResponse.getStatusCode());
    
        if (apps == null) {
            logger.error("The application is null for some reason. Not storing this information");
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            localRegionApps.set(this.filterAndShuffle(apps));
            logger.debug("Got full registry with apps hashcode {}", apps.getAppsHashCode());
        } else {
            logger.warn("Not updating applications as another thread is updating it already");
        }
    }
    
    //增量拉取代码
    
    private void getAndUpdateDelta(Applications applications) throws Throwable {
        long currentUpdateGeneration = fetchRegistryGeneration.get();
    
        Applications delta = null;
        //访问/eureka/delta接口,拉取所有服务实例增量信息
        EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
        if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
            delta = httpResponse.getEntity();
        }
    
        if (delta == null) {
            //如果delta为空,拉取增量失败,就全量拉取
            logger.warn("The server does not allow the delta revision to be applied because it is not safe. "
                    + "Hence got the full registry.");
            getAndStoreFullRegistry();
        } else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
            //这里设置原子锁的原因是怕某次调度网络请求时间过长,导致同一时间有多线程拉取到增量信息并发修改
            //拉取增量成功,检查hashcode是否一样,不一样的话也会全量拉取
            logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
            String reconcileHashCode = "";
            if (fetchRegistryUpdateLock.tryLock()) {
                try {
                    updateDelta(delta);
                    reconcileHashCode = getReconcileHashCode(applications);
                } finally {
                    fetchRegistryUpdateLock.unlock();
                }
            } else {
                logger.warn("Cannot acquire update lock, aborting getAndUpdateDelta");
            }
            // There is a diff in number of instances for some reason
            if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
                reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
            }
        } else {
            logger.warn("Not updating application delta as another thread is updating it already");
            logger.debug("Ignoring delta update with apps hashcode {}, as another thread is updating it already", delta.getAppsHashCode());
        }
    }
    

    以上就是对于EurekaClient拉取服务实例信息的源代码分析,总结EurekaClient 重要缓存如下:

    EurekaClient第一次全量拉取,定时增量拉取应用服务实例信息,保存在缓存中。
    EurekaClient增量拉取失败,或者增量拉取之后对比hashcode发现不一致,就会执行全量拉取,这样避免了网络某时段分片带来的问题。
    同时对于服务调用,如果涉及到ribbon负载均衡,那么ribbon对于这个实例列表也有自己的缓存,这个缓存定时从EurekaClient的缓存更新
    EurekaServer端
    在EurekaServer端,所有的读取请求都是读的ReadOnlyMap(这个可以配置)
    有定时任务会定时从ReadWriteMap同步到ReadOnlyMap这个时间配置是:

    #eureka server刷新readCacheMap的时间,注意,client读取的是readCacheMap,这个时间决定了多久会把readWriteCacheMap的缓存更新到readCacheMap上
    #默认30s
    eureka.server.responseCacheUpdateInvervalMs=3000
    

    相关代码:

    if (shouldUseReadOnlyResponseCache) {
                timer.schedule(getCacheUpdateTask(),
                        new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                                + responseCacheUpdateIntervalMs),
                        responseCacheUpdateIntervalMs);
            }
    private TimerTask getCacheUpdateTask() {
        return new TimerTask() {
            @Override
            public void run() {
                logger.debug("Updating the client cache from response cache");
                for (Key key : readOnlyCacheMap.keySet()) {
                    if (logger.isDebugEnabled()) {
                        Object[] args = {key.getEntityType(), key.getName(), key.getVersion(), key.getType()};
                        logger.debug("Updating the client cache from response cache for key : {} {} {} {}", args);
                    }
                    try {
                        CurrentRequestVersion.set(key.getVersion());
                        Value cacheValue = readWriteCacheMap.get(key);
                        Value currentCacheValue = readOnlyCacheMap.get(key);
                        if (cacheValue != currentCacheValue) {
                            readOnlyCacheMap.put(key, cacheValue);
                        }
                    } catch (Throwable th) {
                        logger.error("Error while updating the client cache from response cache", th);
                    }
                }
            }
        };
    }
    

    ReadWriteMap是一个LoadingCache,将Registry中的服务实例信息封装成要返回的http响应(分别是经过gzip压缩和非压缩的),同时还有两个特殊key,ALL_APPS和ALL_APPS_DELTA
    ALL_APPS就是所有服务实例信息
    ALL_APPS_DELTA就是之前讲注册说的RecentlyChangedQueue里面的实例列表封装的http响应信息 。 java B2B2C源码电子商务平台

    相关文章

      网友评论

          本文标题:java B2B2C Springcloud多租户电子商城系统-

          本文链接:https://www.haomeiwen.com/subject/zeaejqtx.html