Spring Cloud Feign
FeignClientsRegistrar 实现包扫描@FeignClient注解的接口逻辑
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
// 找到被注解的类,注册bean的定义
registerDefaultConfiguration(metadata, registry);
registerFeignClients(metadata, registry);
}
public void registerFeignClients(AnnotationMetadata metadata,
BeanDefinitionRegistry registry) {
···省略···
// 遍历packages下的每个bean定义
for (String basePackage : basePackages) {
Set<BeanDefinition> candidateComponents = scanner
.findCandidateComponents(basePackage);
for (BeanDefinition candidateComponent : candidateComponents) {
if (candidateComponent instanceof AnnotatedBeanDefinition) {
// verify annotated class is an interface
AnnotatedBeanDefinition beanDefinition = (AnnotatedBeanDefinition) candidateComponent;
AnnotationMetadata annotationMetadata = beanDefinition.getMetadata();
Assert.isTrue(annotationMetadata.isInterface(),
"@FeignClient can only be specified on an interface");
Map<String, Object> attributes = annotationMetadata
.getAnnotationAttributes(
FeignClient.class.getCanonicalName());
String name = getClientName(attributes);
// 先使用BeanDefinitionBuilder生成该beandefinition
// 然后使用BeanDefinitionRegistry注册BeanDefinition
registerClientConfiguration(registry, name,
attributes.get("configuration"));
registerFeignClient(registry, annotationMetadata, attributes);
}
}
}
}
private void registerFeignClient(BeanDefinitionRegistry registry,
AnnotationMetadata annotationMetadata, Map<String, Object> attributes) {
····省略, 装配BeanDefinition···
// 实例化bean
BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry);
}
装配好后就需要去使用这些BeanDefinition
以jdk代理的方式,将是Feign Client的请求拦截,做处理 -- 处理类:ReflectiveFeign
这里的使用的是io.github.OpenFeign的feign-core包,坐标是:
<dependency>
<groupId>io.github.openfeign</groupId>
<artifactId>feign-core</artifactId>
</dependency>
// Feign
public <T> T target(Target<T> target) {
// 创建ReflectiveFeign
return build().newInstance(target);
}
public Feign build() {
SynchronousMethodHandler.Factory synchronousMethodHandlerFactory =
new SynchronousMethodHandler.Factory(client, retryer, requestInterceptors, logger,
logLevel, decode404, closeAfterDecode, propagationPolicy);
ParseHandlersByName handlersByName =
new ParseHandlersByName(contract, options, encoder, decoder, queryMapEncoder,
errorDecoder, synchronousMethodHandlerFactory);
return new ReflectiveFeign(handlersByName, invocationHandlerFactory, queryMapEncoder);
}
// ReflectiveFeign,生成feign客户端调用的代理对象
@Override
public <T> T newInstance(Target<T> target) {
Map<String, MethodHandler> nameToHandler = targetToHandlersByName.apply(target);
Map<Method, MethodHandler> methodToHandler = new LinkedHashMap<Method, MethodHandler>();
List<DefaultMethodHandler> defaultMethodHandlers = new LinkedList<DefaultMethodHandler>();
for (Method method : target.type().getMethods()) {
if (method.getDeclaringClass() == Object.class) {
continue;
} else if (Util.isDefault(method)) {
DefaultMethodHandler handler = new DefaultMethodHandler(method);
defaultMethodHandlers.add(handler);
methodToHandler.put(method, handler);
} else {
methodToHandler.put(method, nameToHandler.get(Feign.configKey(target.type(), method)));
}
}
InvocationHandler handler = factory.create(target, methodToHandler);
T proxy = (T) Proxy.newProxyInstance(target.type().getClassLoader(),
new Class<?>[] {target.type()}, handler);
for (DefaultMethodHandler defaultMethodHandler : defaultMethodHandlers) {
defaultMethodHandler.bindTo(proxy);
}
return proxy;
}
// SynchronousMethodHandler实现了MethodHandler,通过ReflectiveFeign的invoke传入,执行的调用逻辑
@Override
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = buildTemplateFromArgs.create(argv);
Retryer retryer = this.retryer.clone();
while (true) {
try {
// 通过Client来对request进行调用执行 => response = client.execute(request, options);
// 这里Feign发送request,接收response,都是Client组件完成的。默认的Client.Default,由HttpURLConnnection实现网络请求,除此之外还能使用HttpClient和Okhttp。
return executeAndDecode(template);
} catch (RetryableException e) {
try {
retryer.continueOrPropagate(e);
} catch (RetryableException th) {
Throwable cause = th.getCause();
if (propagationPolicy == UNWRAP && cause != null) {
throw cause;
} else {
throw th;
}
}
if (logLevel != Logger.Level.NONE) {
logger.logRetry(metadata.configKey(), logLevel);
}
continue;
}
}
}
feign如何实现负载均衡 -- Client组件下的LoadBalancerFeignClient
// LoadBalancerFeignClient implements Client
@Override
public Response execute(Request request, Request.Options options) throws IOException {
try {
URI asUri = URI.create(request.url());
String clientName = asUri.getHost();
URI uriWithoutHost = cleanUrl(request.url(), clientName);
// 将代理给ribbon -- delegate
FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
this.delegate, request, uriWithoutHost);
IClientConfig requestConfig = getClientConfig(options, clientName);
// 执行ribbon的负载均衡调用
return lbClient(clientName)
.executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
}
catch (ClientException e) {
IOException io = findIOException(e);
if (io != null) {
throw io;
}
throw new RuntimeException(e);
}
}
// AbstractLoadBalancerAwareClient
public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
try {
// 调用LoadBalancerCommand的submit
return command.submit(
new ServerOperation<T>() {
@Override
public Observable<T> call(Server server) {
URI finalUri = reconstructURIWithServer(server, request.getUri());
S requestForServer = (S) request.replaceUri(finalUri);
try {
return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
}
catch (Exception e) {
return Observable.error(e);
}
}
})
.toBlocking()
.single();
} catch (Exception e) {
Throwable t = e.getCause();
if (t instanceof ClientException) {
throw (ClientException) t;
} else {
throw new ClientException(e);
}
}
}
// LoadBalancerCommand
public Observable<T> submit(final ServerOperation<T> operation) {
// Use the load balancer
Observable<T> o =
(server == null ? selectServer() : Observable.just(server))
.concatMap(···省略···);
···省略···
}
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
@Override
public void call(Subscriber<? super Server> next) {
try {
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
网友评论