前面的一些内容只讲到了,路由规则如何与处理器生成映射,最后处理的流程。但是里面对于服务的列表是如何动态生成的内容并没有关注到,所以带着疑问翻阅了一下源码。
2.涉及到的类
-
2.1 Server
Server类结构图
该类总共有2个实现类,分别为DiscoveryEnabledServer,DomainExtractingServer。
- 2.1.1DiscoveryEnabledServer
DiscoveryEnabledServer的作用说白了就是将实例信息转化为Server
源码如下
public class DiscoveryEnabledServer extends Server{
private final InstanceInfo instanceInfo;
private final MetaInfo serviceInfo;
public DiscoveryEnabledServer(final InstanceInfo instanceInfo, boolean useSecurePort) {
this(instanceInfo, useSecurePort, false);
}
public DiscoveryEnabledServer(final InstanceInfo instanceInfo, boolean useSecurePort, boolean useIpAddr) {
super(useIpAddr ? instanceInfo.getIPAddr() : instanceInfo.getHostName(), instanceInfo.getPort());
if(useSecurePort && instanceInfo.isPortEnabled(PortType.SECURE))
super.setPort(instanceInfo.getSecurePort());
this.instanceInfo = instanceInfo;
this.serviceInfo = new MetaInfo() {
@Override
public String getAppName() {
return instanceInfo.getAppName();
}
@Override
public String getServerGroup() {
return instanceInfo.getASGName();
}
@Override
public String getServiceIdForDiscovery() {
return instanceInfo.getVIPAddress();
}
@Override
public String getInstanceId() {
return instanceInfo.getId();
}
};
}
public InstanceInfo getInstanceInfo() {
return instanceInfo;
}
@Override
public MetaInfo getMetaInfo() {
return serviceInfo;
}
}
- 2.1.2 DomainExtractingServer
DomainExtractingServer的作用其实就是按照实例信息中的分区,给实例进行分区。
源码如下
class DomainExtractingServer extends DiscoveryEnabledServer {
private String id;
@Override
public String getId() {
return id;
}
@Override
public void setId(String id) {
this.id = id;
}
DomainExtractingServer(DiscoveryEnabledServer server, boolean useSecurePort,
boolean useIpAddr, boolean approximateZoneFromHostname) {
// host and port are set in super()
super(server.getInstanceInfo(), useSecurePort, useIpAddr);
if (server.getInstanceInfo().getMetadata().containsKey("zone")) {
setZone(server.getInstanceInfo().getMetadata().get("zone"));
}
else if (approximateZoneFromHostname) {
setZone(ZoneUtils.extractApproximateZone(server.getHost()));
}
else {
setZone(server.getZone());
}
setId(extractId(server));
setAlive(server.isAlive());
setReadyToServe(server.isReadyToServe());
}
private String extractId(Server server) {
if (server instanceof DiscoveryEnabledServer) {
DiscoveryEnabledServer enabled = (DiscoveryEnabledServer) server;
InstanceInfo instance = enabled.getInstanceInfo();
if (instance.getMetadata().containsKey("instanceId")) {
return instance.getHostName() + ":"
+ instance.getMetadata().get("instanceId");
}
}
return super.getId();
}
}
-
2.2 ServerList
ServerList这个类接口设计也比较简单。具体作用的话,说白了就是用于获取服务列表。
ServerList结构图
- 2.2.1AbstractServerList
其实该类的主要作用是提供了对服务的过滤,通过getFilterImpl方法加载ServerListFilter。最后过滤器肯定是用于对服务的过滤
源码如下
public abstract class AbstractServerList<T extends Server> implements ServerList<T>, IClientConfigAware {
public AbstractServerListFilter<T> getFilterImpl(IClientConfig niwsClientConfig) throws ClientException{
try {
String niwsServerListFilterClassName = niwsClientConfig
.getProperty(
CommonClientConfigKey.NIWSServerListFilterClassName,
ZoneAffinityServerListFilter.class.getName())
.toString();
AbstractServerListFilter<T> abstractNIWSServerListFilter =
(AbstractServerListFilter<T>) ClientFactory.instantiateInstanceWithClientConfig(niwsServerListFilterClassName, niwsClientConfig);
return abstractNIWSServerListFilter;
} catch (Throwable e) {
throw new ClientException(
ClientException.ErrorType.CONFIGURATION,
"Unable to get an instance of CommonClientConfigKey.NIWSServerListFilterClassName. Configured class:"
+ niwsClientConfig
.getProperty(CommonClientConfigKey.NIWSServerListFilterClassName), e);
}
}
}
- 2.2.2ConfigurationBasedServerList
从配置中加载出服务列表 - 2.2.3 DiscoveryEnabledNIWSServerList
通过注册中心的客户端,从注册中心获取服务列表
源码如下
public class DiscoveryEnabledNIWSServerList extends AbstractServerList<DiscoveryEnabledServer>{
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
return obtainServersViaDiscovery();
}
private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
logger.warn("EurekaClient has not been initialized yet, returning an empty list");
return new ArrayList<DiscoveryEnabledServer>();
}
EurekaClient eurekaClient = eurekaClientProvider.get();
if (vipAddresses!=null){
for (String vipAddress : vipAddresses.split(",")) {
List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
for (InstanceInfo ii : listOfInstanceInfo) {
if (ii.getStatus().equals(InstanceStatus.UP)) {
if(shouldUseOverridePort){
if(logger.isDebugEnabled()){
logger.debug("Overriding port on client name: " + clientName + " to " + overridePort);
}
InstanceInfo copy = new InstanceInfo(ii);
if(isSecure){
ii = new InstanceInfo.Builder(copy).setSecurePort(overridePort).build();
}else{
ii = new InstanceInfo.Builder(copy).setPort(overridePort).build();
}
}
DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
serverList.add(des);
}
}
if (serverList.size()>0 && prioritizeVipAddressBasedServers){
break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
}
}
}
return serverList;
}
}
- 2.2.4 DomainExtractingServerList
将通过注册中心发现的服务元数据,进行分区,转化成DomainExtractingServer
源码如下
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
private ServerList<DiscoveryEnabledServer> list;
private final RibbonProperties ribbon;
private boolean approximateZoneFromHostname;
public DomainExtractingServerList(ServerList<DiscoveryEnabledServer> list,
IClientConfig clientConfig, boolean approximateZoneFromHostname) {
this.list = list;
this.ribbon = RibbonProperties.from(clientConfig);
this.approximateZoneFromHostname = approximateZoneFromHostname;
}
@Override
public List<DiscoveryEnabledServer> getInitialListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getInitialListOfServers());
return servers;
}
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
List<DiscoveryEnabledServer> result = new ArrayList<>();
boolean isSecure = this.ribbon.isSecure(true);
boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
for (DiscoveryEnabledServer server : servers) {
result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr,
this.approximateZoneFromHostname));
}
return result;
}
}
- 2.2.5StaticServerList
静态的服务列表,源码如下
public class StaticServerList<T extends Server> implements ServerList<T> {
private final List<T> servers;
public StaticServerList(T... servers) {
this.servers = Arrays.asList(servers);
}
@Override
public List<T> getInitialListOfServers() {
return servers;
}
@Override
public List<T> getUpdatedListOfServers() {
return servers;
}
}
-
2.3 ServerListUpdater
这个类主要是用于控制服务列表更新的调度。比如多久更新一次服务列表。而服务更新的实现,当然是ServerList中已经实现了,所以这个类主要是用于控制调度。
ServerListUpdater结构图 - 2.3.1 PollingServerListUpdater
默认的服务列表更新器 - 2.3.2 EurekaNotificationServerListUpdater
通过EurekaEvent触发的服务列表更新器
3.动态的服务列表的实现原理
首先zuul组件是结合ribbon来实现服务的动态拉取。
其实通过源码也很容易可以得知。
- 3.1 ServerListUpdater的注入
首先来到配置类
@Configuration
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
return new PollingServerListUpdater(config);
}
}
ServerListUpdater 是通过注入容器的方式,从而进行初始化。
- 3.2 PollingServerListUpdater的工作原理
从3.1中可以得知,对应的类型为PollingServerListUpdater,所以下面会关注一下PollingServerListUpdater的实现原理。
先来看看PollingServerListUpdater的相关成员变量
public class PollingServerListUpdater implements ServerListUpdater {
第一次拉取服务列表,延时的时长,1秒钟(默认值)
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs;
更新服务列表的时长,30秒(默认值)
private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
任务是否已经启动
private final AtomicBoolean isActive = new AtomicBoolean(false);
上次更新的时间
private volatile long lastUpdated = System.currentTimeMillis();
任务执行的延时时间
private final long initialDelayMs;
任务执行的间隔
private final long refreshIntervalMs;
任务的回调---为什么用volatile
private volatile ScheduledFuture<?> scheduledFuture;
}
从上面看,其实许多内容都是定时器相关。
- 3.2.1 线程池初始化
public class PollingServerListUpdater implements ServerListUpdater {
private static class LazyHolder {
private final static String CORE_THREAD = "DynamicServerListLoadBalancer.ThreadPoolSize";
private final static DynamicIntProperty poolSizeProp = new DynamicIntProperty(CORE_THREAD, 2);
private static Thread _shutdownThread;
static ScheduledThreadPoolExecutor _serverListRefreshExecutor = null;
static {
int coreSize = poolSizeProp.get();
线程工厂---线程池所使用的线程均为守护线程,当没有用户线程存活的时候,该线程池对应的线程就会被回收。
对于web服务而言,最底层的用户线程应该就是处理请求的线程池。使用守护线程可以减少资源的浪费。
ThreadFactory factory = (new ThreadFactoryBuilder())
.setNameFormat("PollingServerListUpdater-%d")
.setDaemon(true)
.build();
线程池初始化
_serverListRefreshExecutor = new ScheduledThreadPoolExecutor(coreSize, factory);
poolSizeProp.addCallback(new Runnable() {
@Override
public void run() {
_serverListRefreshExecutor.setCorePoolSize(poolSizeProp.get());
}
});
_shutdownThread = new Thread(new Runnable() {
public void run() {
logger.info("Shutting down the Executor Pool for PollingServerListUpdater");
shutdownExecutorPool();
}
});
jvm退出时,优雅地去关闭线程池,避免当线程池中还有任务时直接强行关闭,造成任务丢失。平滑退出
Runtime.getRuntime().addShutdownHook(_shutdownThread);
}
private static void shutdownExecutorPool() {
if (_serverListRefreshExecutor != null) {
_serverListRefreshExecutor.shutdown();
if (_shutdownThread != null) {
try {
Runtime.getRuntime().removeShutdownHook(_shutdownThread);
} catch (IllegalStateException ise) {
}
}
}
}
}
}
- 3.2.2 PollingServerListUpdater的启动
- 3.2.2.1启动时机
@SuppressWarnings("deprecation")
@Configuration
@EnableConfigurationProperties
@Import({ HttpClientConfiguration.class, OkHttpRibbonConfiguration.class,
RestClientRibbonConfiguration.class, HttpClientRibbonConfiguration.class })
public class RibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
return this.propertiesFactory.get(ILoadBalancer.class, config, name);
}
ZoneAwareLoadBalancer是DynamicServerListLoadBalancer的子类,所以会走到父类的构造方法
return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
serverListFilter, serverListUpdater);
}
}
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
ServerList<T> serverList, ServerListFilter<T> filter,
ServerListUpdater serverListUpdater) {
。。。省略部分代码
restOfInit(clientConfig);
}
void restOfInit(IClientConfig clientConfig) {
。。。省略代码
enableAndInitLearnNewServersFeature();
。。。省略代码
}
public void enableAndInitLearnNewServersFeature() {
由于serverListUpdater是从容器中获取的,由前面得知,注入的类为PollingServerListUpdater
serverListUpdater.start(updateAction);
}
}
- 3.2.2.2 启动方法
public class PollingServerListUpdater implements ServerListUpdater {
@Override
public synchronized void start(final UpdateAction updateAction) {
if (isActive.compareAndSet(false, true)) {
final Runnable wrapperRunnable = new Runnable() {
@Override
public void run() {
if (!isActive.get()) {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
return;
}
try {
updateAction.doUpdate();
lastUpdated = System.currentTimeMillis();
} catch (Exception e) {
logger.warn("Failed one update cycle", e);
}
}
};
scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
wrapperRunnable,
initialDelayMs,
refreshIntervalMs,
TimeUnit.MILLISECONDS
);
} else {
logger.info("Already active, no-op");
}
}
}
- 3.2.3 PollingServerListUpdater的工作内容
public class DynamicServerListLoadBalancer<T extends Server> extends BaseLoadBalancer {
volatile ServerListFilter<T> filter;
volatile ServerList<T> serverListImpl;
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
@Override
public void doUpdate() {
更新服务列表
updateListOfServers();
}
};
public void updateListOfServers() {
List<T> servers = new ArrayList<T>();
if (serverListImpl != null) {
通过ServerList获取服务列表。对于ServerList的实现类为DomainExtractingServerList
servers = serverListImpl.getUpdatedListOfServers();
通过ServerListFilter对服务进行过滤
if (filter != null) {
servers = filter.getFilteredListOfServers(servers);
}
}
updateAllServerList(servers);
}
}
从上面得知,最后是通过DomainExtractingServerList来获取服务列表
那么DomainExtractingServerList是如何获取服务列表的
@Configuration
public class EurekaRibbonClientConfiguration {
@Bean
@ConditionalOnMissingBean
public ServerList<?> ribbonServerList(IClientConfig config,
Provider<EurekaClient> eurekaClientProvider) {
if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
return this.propertiesFactory.get(ServerList.class, config, serviceId);
}
DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(
config, eurekaClientProvider);
DomainExtractingServerList serverList = new DomainExtractingServerList(
discoveryServerList, config, this.approximateZoneFromHostname);
return serverList;
}
}
public class DomainExtractingServerList implements ServerList<DiscoveryEnabledServer> {
从构造方法的实现可知,list为DiscoveryEnabledNIWSServerList,即通过eurekaclient来获取服务列表。
private ServerList<DiscoveryEnabledServer> list;
@Override
public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
List<DiscoveryEnabledServer> servers = setZones(
this.list.getUpdatedListOfServers());
return servers;
}
}
流程图
网友评论