前面我们从配置的修改是如何更新SoulAdmin本地缓存的,再到网关和SoulAdmin是如何同步数据等,讲解了数据同步的机制,是为了保证我们网关能够正确的处理请求,并针对配置的插件进行正确的处理,接下来我们从一个真正的用户请求http到网关以及如何最后到我们真正请求的整个链路做一下分析
SoulWebHandler,是网关请求的入口。
//org.dromara.soul.web.handler.SoulWebHandler
//实现了WebHandler
public final class SoulWebHandler implements WebHandler {
private final List<SoulPlugin> plugins;
private final Scheduler scheduler;
/**
* 初始化的时候注入所有的SoulPlugin插件
*/
public SoulWebHandler(final List<SoulPlugin> plugins) {
this.plugins = plugins;
String schedulerType = System.getProperty("soul.scheduler.type", "fixed");
if (Objects.equals(schedulerType, "fixed")) {
int threads = Integer.parseInt(System.getProperty(
"soul.work.threads", "" + Math.max((Runtime.getRuntime().availableProcessors() << 1) + 1, 16)));
scheduler = Schedulers.newParallel("soul-work-threads", threads);
} else {
scheduler = Schedulers.elastic();
}
}
}
在我们请求过来的时候,会走到handle
//org.dromara.soul.web.handler.SoulWebHandler#handle
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
//监控相关
MetricsTrackerFacade.getInstance().counterInc(MetricsLabelEnum.REQUEST_TOTAL.getName());
Optional<HistogramMetricsTrackerDelegate> startTimer = MetricsTrackerFacade.getInstance().histogramStartTimer(MetricsLabelEnum.REQUEST_LATENCY.getName());
//构造DefaultSoulPluginChain,默认的插件链进行处理
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler)
.doOnSuccess(t -> startTimer.ifPresent(time -> MetricsTrackerFacade.getInstance().histogramObserveDuration(time)));
}
DefaultSoulPluginChain 使用了责任链的设计模式,针对一个请求,对所有的插件进行过滤
//org.dromara.soul.web.handler.SoulWebHandler.DefaultSoulPluginChain
private static class DefaultSoulPluginChain implements SoulPluginChain {
private int index;
private final List<SoulPlugin> plugins;
DefaultSoulPluginChain(final List<SoulPlugin> plugins) {
this.plugins = plugins;
}
/**
* Delegate to the next {@code WebFilter} in the chain.
*
* @param exchange the current server exchange
* @return {@code Mono<Void>} to indicate when request handling is complete
*/
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
我们看到plugins是按照顺序循环处理的,而且每次的顺序是一致的,GlobalPlugin肯定在第一位,这是怎么实现的,我们看下GlobalPlugin插件
//org.dromara.soul.plugin.global.GlobalPlugin
public class GlobalPlugin implements SoulPlugin {
private final SoulContextBuilder builder;
public GlobalPlugin(final SoulContextBuilder builder) {
this.builder = builder;
}
//通过getOrder保证初始化的顺序
@Override
public int getOrder() {
return 0;
}
}
通过看getOrder的调用方我们发现
//org.dromara.soul.web.configuration.SoulConfiguration
@Configuration
@ComponentScan("org.dromara.soul")
@Import(value = {ErrorHandlerConfiguration.class, SoulExtConfiguration.class, SpringExtConfiguration.class})
@Slf4j
public class SoulConfiguration {
@Bean("webHandler")
public SoulWebHandler soulWebHandler(final ObjectProvider<List<SoulPlugin>> plugins) {
List<SoulPlugin> pluginList = plugins.getIfAvailable(Collections::emptyList);
//在这里进行重排序
final List<SoulPlugin> soulPlugins = pluginList.stream()
.sorted(Comparator.comparingInt(SoulPlugin::getOrder)).collect(Collectors.toList());
soulPlugins.forEach(soulPlugin -> log.info("load plugin:[{}] [{}]", soulPlugin.named(), soulPlugin.getClass().getName()));
return new SoulWebHandler(soulPlugins);
}
}
所以 插件的顺序是定义好的,每次请求的第一个肯定是GlobalPlugin。GlobalPlugin是最先执行的插件
//org.dromara.soul.plugin.global.GlobalPlugin
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
final ServerHttpRequest request = exchange.getRequest();
final HttpHeaders headers = request.getHeaders();
final String upgrade = headers.getFirst("Upgrade");
SoulContext soulContext;
//先忽略Upgrade,普通请求upgrade为空
if (StringUtils.isBlank(upgrade) || !"websocket".equals(upgrade)) {
soulContext = builder.build(exchange);
} else {
final MultiValueMap<String, String> queryParams = request.getQueryParams();
soulContext = transformMap(queryParams);
}
exchange.getAttributes().put(Constants.CONTEXT, soulContext);
return chain.execute(exchange);
}
这里会走到DefaultSoulContextBuilder
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder
@Override
public SoulContext build(final ServerWebExchange exchange) {
final ServerHttpRequest request = exchange.getRequest();
//获取到请求的path
String path = request.getURI().getPath();
//http先不关注metaData
MetaData metaData = MetaDataCache.getInstance().obtain(path);
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
exchange.getAttributes().put(Constants.META_DATA, metaData);
}
//将请求和元数据转换成SoulContext
return transform(request, metaData);
}
//org.dromara.soul.plugin.global.DefaultSoulContextBuilder#transform
//构造Soul的上下文信息
private SoulContext transform(final ServerHttpRequest request, final MetaData metaData) {
//Constants.APP_KEY = appKey
final String appKey = request.getHeaders().getFirst(Constants.APP_KEY);
//Constants.SIGN = sign
final String sign = request.getHeaders().getFirst(Constants.SIGN);
//Constants.TIMESTAMP = timestamp
final String timestamp = request.getHeaders().getFirst(Constants.TIMESTAMP);
//从header获取信息
SoulContext soulContext = new SoulContext();
String path = request.getURI().getPath();
soulContext.setPath(path);
//判断元数据信息,通过元数据来拍断当前的请求是属于什么类型
if (Objects.nonNull(metaData) && metaData.getEnabled()) {
if (RpcTypeEnum.SPRING_CLOUD.getName().equals(metaData.getRpcType())) {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(metaData.getRpcType());
} else if (RpcTypeEnum.DUBBO.getName().equals(metaData.getRpcType())) {
setSoulContextByDubbo(soulContext, metaData);
} else if (RpcTypeEnum.SOFA.getName().equals(metaData.getRpcType())) {
setSoulContextBySofa(soulContext, metaData);
} else if (RpcTypeEnum.TARS.getName().equals(metaData.getRpcType())) {
setSoulContextByTars(soulContext, metaData);
} else {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
}
//默认当成http处理
} else {
setSoulContextByHttp(soulContext, path);
soulContext.setRpcType(RpcTypeEnum.HTTP.getName());
}
//注入必要信息
soulContext.setAppKey(appKey);
soulContext.setSign(sign);
soulContext.setTimestamp(timestamp);
soulContext.setStartDateTime(LocalDateTime.now());
Optional.ofNullable(request.getMethod()).ifPresent(httpMethod -> soulContext.setHttpMethod(httpMethod.name()));
return soulContext;
}
GlobalPlugin是最关键的插件,通过上面流程,构造Soul的上下文,从而使得后面的插件判断才有依据来决定是走哪一个插件
通过责任链,依次循环调用所有的插件,直到中间某个插件匹配调用生效为止。我们来看下http请求,最终会命中Divide插件,我们来看下DividePlugin插件
//org.dromara.soul.plugin.divide.DividePlugin#skip
@Override
public Boolean skip(final ServerWebExchange exchange) {
//GlobalPlugin构造的上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
//DividePlugin会判断当前的SoulContext的RpcType是否是Http
return !Objects.equals(Objects.requireNonNull(soulContext).getRpcType(), RpcTypeEnum.HTTP.getName());
}
发现不需要跳过后,会进入AbstractSoulPlugin的execute。包括刚才的GlobalPlugin也会经过这里
//org.dromara.soul.plugin.base.AbstractSoulPlugin#execute
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
//从本地缓存中获取PluginData数据,这里的本地缓存就是之前我们讲的数据同步所维护的缓存
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
//判断plugin是否为空并且开启
if (pluginData != null && pluginData.getEnabled()) {
//在获取本地Selector数据缓存。获取SelectorData
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
//如果该插件selector为空会走这里,里面逻辑实际上就是调用下一个插件
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//如果Selectors不为空,则去看是否匹配,匹配逻辑暂时先不展开讲,之后再看
final SelectorData selectorData = matchSelector(exchange, selectors);
//如果匹配为空则继续调用下一个插件
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
//在获取规则数据
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
//规则为空则继续执行下一个插件
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
//如果是全流量
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//则获取最后一个规则
rule = rules.get(rules.size() - 1);
} else {
//判断是否有匹配规则,具体匹配校验之后再说
rule = matchRule(exchange, rules);
}
//如果规则为空,继续执行下一个插件
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
//如果规则不为空,则调用doExecute。对应插件的具体实现
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
//org.dromara.soul.plugin.divide.DividePlugin
public class DividePlugin extends AbstractSoulPlugin {
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
//这里拿到之前GlobalPlugin的上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
//获取Divide规则处理器,DivideRuleHandle包含了负载均衡策略以及重试次数和超时时间信息
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
//这里根据之前的探活机制,获取到对应选择器的上游服务列表
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
//如果为空则抛异常,并直接返回WebFlux结果
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
//获取当前调用方IP
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
//通过负载均衡获取对应的上游
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
//如果上游为空则返回异常信息
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// 注入必要信息
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
//在调用下一个插件
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}
}
我们发现Divide插件并没有真正的去调用,而是主要做一些获取上游服务器列表以及根据负载均衡选择一个有效的远端服务,并注入到对应的属性中,供后面使用,真正调用远端的插件式WebClientPlugin插件
//org.dromara.soul.plugin.httpclient.WebClientPlugin#execute
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
//获取上下文
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
//获取url地址
String urlPath = exchange.getAttribute(Constants.HTTP_URL);
if (StringUtils.isEmpty(urlPath)) {
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
long timeout = (long) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_TIME_OUT)).orElse(3000L);
int retryTimes = (int) Optional.ofNullable(exchange.getAttribute(Constants.HTTP_RETRY)).orElse(0);
log.info("The request urlPath is {}, retryTimes is {}", urlPath, retryTimes);
HttpMethod method = HttpMethod.valueOf(exchange.getRequest().getMethodValue());
WebClient.RequestBodySpec requestBodySpec = webClient.method(method).uri(urlPath);
//调用远端服务
return handleRequestBody(requestBodySpec, exchange, timeout, retryTimes, chain);
}
//org.dromara.soul.plugin.httpclient.WebClientPlugin#handleRequestBody
private Mono<Void> handleRequestBody(final WebClient.RequestBodySpec requestBodySpec,
final ServerWebExchange exchange,
final long timeout,
final int retryTimes,
final SoulPluginChain chain) {
//使用异步编程方式调用远端服务并返回结果
return requestBodySpec.headers(httpHeaders -> {
httpHeaders.addAll(exchange.getRequest().getHeaders());
httpHeaders.remove(HttpHeaders.HOST);
})
.contentType(buildMediaType(exchange))
.body(BodyInserters.fromDataBuffers(exchange.getRequest().getBody()))
.exchange()
.doOnError(e -> log.error(e.getMessage()))
.timeout(Duration.ofMillis(timeout))
.retryWhen(Retry.onlyIf(x -> x.exception() instanceof ConnectTimeoutException)
.retryMax(retryTimes)
.backoff(Backoff.exponential(Duration.ofMillis(200), Duration.ofSeconds(20), 2, true)))
.flatMap(e -> doNext(e, exchange, chain));
}
到这里我们就完整走了一遍http的链路跟踪,中间还有很多其他的细节需要之后在讲解
网友评论