一、查询带有restful api的信息,我们发现都是以“Resource”结尾的类
带有api的踪迹首先我们先从NodeResource.java中的"/v1/node"下手,他有一个成员变量:HeartbeatFailureDetector,类中提供了所有节点的执行状态的封装信息:
start
uri
recentRequests
recentFailures
recentSuccesses
lastRequestTime
lastResponseTime
if (coordinator) {
binder.install(new FailureDetectorModule());
jaxrsBinder(binder).bind(NodeResource.class);
}
其中绑定了:
HeartbeatFailureDetector
HeartbeatFailureDetector这个类在PostConstruct阶段(也就是类注入完成之后的阶段),开启了定时器,每隔5s去监听worker的服务状况:
@PostConstruct
public void start()
{
if (isEnabled && started.compareAndSet(false, true)) {
executor.scheduleWithFixedDelay(new Runnable()
{
@Override
public void run()
{
try {
updateMonitoredServices();
}
catch (Throwable e) {
// ignore to avoid getting unscheduled
log.warn(e, "Error updating services");
}
}
}, 0, 5, TimeUnit.SECONDS);
}
}
其中updateMonitoredServices使用httpclient的method为header方法去获取信息,并把异常信息封装起来。
private void ping()
{
try {
stats.recordStart();
//prepareHead为head http请求方法
//public static Builder prepareHead() {
// return new Builder().setMethod("HEAD");
// }
httpClient.executeAsync(prepareHead().setUri(uri).build(), new ResponseHandler<Object, Exception>()
{
@Override
public Exception handleException(Request request, Exception exception)
{
// ignore error
stats.recordFailure(exception);
// TODO: this will technically cause an NPE in httpClient, but it's not triggered because
// we never call get() on the response future. This behavior needs to be fixed in airlift
return null;
}
@Override
public Object handle(Request request, Response response)
throws Exception
{
stats.recordSuccess();
return null;
}
});
}
catch (RuntimeException e) {
log.warn(e, "Error scheduling request for %s", uri);
}
}
上面state实例记录着状态信息,与ServiceDescriptor(各个服务器的描述符:uuid(随机生成)、nodeId、location等信息,从配置文件node.properties中读取)
那,还有一个问题是,协调节点怎么获取其他worker节点的信息的呢?
在updateMonitoredServices方法中:
// selector.selectAllServices()获取所有节点信息
Set<ServiceDescriptor> online = selector.selectAllServices().stream()
.filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId()))
.collect(toImmutableSet());
上面的selector是CachingServiceSelector类,此类在PostConstruct有这样的动作:
@PostConstruct
public void start()
{
if (started.compareAndSet(false, true)) {
Preconditions.checkState(!executor.isShutdown(), "CachingServiceSelector has been destroyed");
// if discovery is available, get the initial set of servers before starting
try {
refresh().get(30, TimeUnit.SECONDS);
}
catch (Exception ignored) {
}
}
}
其中的refresh是:
ServiceDescriptors oldDescriptors = this.serviceDescriptors.get();
ListenableFuture<ServiceDescriptors> future;
if (oldDescriptors == null) {
//DiscoveryLookupClient lookupClient
future = lookupClient.getServices(type, pool);
}
else {
future = lookupClient.refreshServices(oldDescriptors);
}
上面的DiscoveryLookupClient是HttpDiscoveryLookupClient,看其中方法:
private ListenableFuture<ServiceDescriptors> lookup(final String type, final String pool, final ServiceDescriptors serviceDescriptors)
{
Preconditions.checkNotNull(type, "type is null");
URI uri = discoveryServiceURI.get();
if (uri == null) {
return Futures.immediateFailedCheckedFuture(new DiscoveryException("No discovery servers are available"));
}
uri = URI.create(uri + "/v1/service/" + type + "/");
if (pool != null) {
uri = uri.resolve(pool);
}
Builder requestBuilder = prepareGet()
.setUri(uri)
.setHeader("User-Agent", nodeInfo.getNodeId());
从上面可以看出,他会去调用service api,通过get请求将nodeid做为header信息。
网友评论