常用的服务注册中心:euraka、Nacos、Zookeeper、Etcd、Consul
比较注册中心差异从这些点出发:
1、服务动态感知实现方式(push/pull),2、存储(是否支持持久化),3、高可用机制(集群特性:是否支持选主、一致性问题),4cap特性(cp、ap)
,5api的提供形式(http协议、netty通信)
延伸:cap解释
c(强一致性)
a(可用性)
p(分区容器性)(要满足这一点必须是集群节点/跨区域高可用)
为什么只能满足cp或者ap
在满足p的情况下,例如集群节点,比如一个请求,为了实现强一致性,则必须让请求等待,等待情况下,则不满足可用性,反之同理说明ap。
Euraka:非持久化存储基于内存,(ap)最终一致性、集群节点角色是一样的
Nacos:一致性raft算法(redis-sentinael/nacos选举)long-poll
Zookeeper:zab协议 push
Consul:一致性raft算法(redis-sentinael/nacos选举)、long-poll
Etcd:一致性raft算法(redis-sentinael/nacos选举)long-poll
服务注册中心的作用:1、服务上下线动态感知(根据心跳的结果剔除服务) 2、服务调用维护方便
消费者服务获取euraka服务列表的方式:euraka注册中心push主动推送,pull定时轮询拉取(存在延迟)
延伸:long-pull(长轮询):客户端发送请求到服务端,服务端hold住请求,等待,直到需要响应时才返回响应
Eureka Server
Eureka Client
Eureka自我保护机制
心跳失败的比例15分钟之内,高于15%的节点,Eureka任务这个服务出现了故障,直接剔除这个问题服务;
这样做的好处:避免网络抖动或者网络不稳定的情况下,误删了服务
Eureka自我保护机制的实现原理
AbstractInstanceRegistry
protected volatile int numberOfRenewsPerMinThreshold; //每一分钟最小的续约数量
protected volatile int expectedNumberOfClientsSendingRenews; //预期每分钟收到的续约的客户端数量
配置eureka心跳失败服务不可用评判标准(默认为0.85)
eureka.server.renewal-percent-threshold: 0.5
源码分析的核心功能
1、Eureka Server如何接收请求?
server端必须实现监听,可以是http+容器(tomcat,jobss)的实现方式,也可以是socket(NIO,Netty)的方式
Eureka Server服务访问的类ApplicationsResource/ApplicationResource(类似于controller角色),使用的jersey框架
Jersey是一个RESTFUL请求服务JAVA框架
2、Eureka Client如何注册?
核心方法EurekaServiceRegistry->register
spring的SmartLifecycle接口,实现这个接口重写start方法(可以在spring容器启动后执行start方法内容)
DisCoveryClient.register -> AbstractJerseyEurekaHttpClient.register
@Override
public EurekaHttpResponse<Void> register(InstanceInfo info) {
String urlPath = "apps/" + info.getAppName();
ClientResponse response = null;
try {
Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder();
addExtraHeaders(resourceBuilder);
response = resourceBuilder
.header("Accept-Encoding", "gzip")
.type(MediaType.APPLICATION_JSON_TYPE)
.accept(MediaType.APPLICATION_JSON)
.post(ClientResponse.class, info);
return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build();
} finally {
if (logger.isDebugEnabled()) {
logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(),
response == null ? "N/A" : response.getStatus());
}
if (response != null) {
response.close();
}
}
}
客户端发送的请求最终会到eureka服务端 ApplicationResource.addInstance内,新增注册实例
3、Eureka Server如何存储服务地址?
请求到达服务端ApplicationResource.addInstance内,会调用
@Override
public void register(final InstanceInfo info, final boolean isReplication) {
int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
leaseDuration = info.getLeaseInfo().getDurationInSecs();
}
//先注册到一个服务节点
super.register(info, leaseDuration, isReplication);
//再复制到其他节点完成完成注册
replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}
调用父类的register方法中可以看到服务实例信息InstanceInfo会缓存在ConcurrentHashMap中
Eureka三级缓存(实现读写分离,解决竞争提高性能)
image.png
服务端节点间地址复制(通过调用node节点的register完成复制)
private void replicateInstanceActionsToPeers(Action action, String appName,
String id, InstanceInfo info, InstanceStatus newStatus,
PeerEurekaNode node) {
try {
InstanceInfo infoFromRegistry;
CurrentRequestVersion.set(Version.V2);
switch (action) {
case Cancel:
node.cancel(appName, id);
break;
case Heartbeat:
InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
break;
case Register:
node.register(info);
break;
case StatusUpdate:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.statusUpdate(appName, id, newStatus, infoFromRegistry);
break;
case DeleteStatusOverride:
infoFromRegistry = getInstanceByAppAndId(appName, id, false);
node.deleteStatusOverride(appName, id, infoFromRegistry);
break;
}
} catch (Throwable t) {
logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
} finally {
CurrentRequestVersion.remove();
}
}
4、Eureka Client如何查询地址?
(1)DiscoveryClient构造方法中会触发
(2)每三十秒执行一次的定时任务DiscoveryClient中cacheRefreshTask
cacheRefreshTask中的衰减任务实现:第一次是30s执行,若失败,第二次60s重试,若再失败180秒重试
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
cacheRefreshTask = new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
);
scheduler.schedule(
cacheRefreshTask,
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
TimedSupervisorTask
@Override
public void run() {
Future<?> future = null;
try {
future = executor.submit(task);
threadPoolLevelGauge.set((long) executor.getActiveCount());
future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout
delay.set(timeoutMillis);
threadPoolLevelGauge.set((long) executor.getActiveCount());
successCounter.increment();
} catch (TimeoutException e) {
logger.warn("task supervisor timed out", e);
timeoutCounter.increment();
long currentDelay = delay.get();
//延时时间重计算
long newDelay = Math.min(maxDelay, currentDelay * 2);
delay.compareAndSet(currentDelay, newDelay);
} catch (RejectedExecutionException e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, reject the task", e);
} else {
logger.warn("task supervisor rejected the task", e);
}
rejectedCounter.increment();
} catch (Throwable e) {
if (executor.isShutdown() || scheduler.isShutdown()) {
logger.warn("task supervisor shutting down, can't accept the task");
} else {
logger.warn("task supervisor threw an exception", e);
}
throwableCounter.increment();
} finally {
if (future != null) {
future.cancel(true);
}
if (!scheduler.isShutdown()) {
//执行定时任务
scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
}
}
}
网友评论