(apache-shenyu 2.4.3版本)apache shenyu前身soul网关,是一款java中spring5新引入的project-reactor的webflux,reactor-netty等为基础实现的高性能网关,现已进入apache孵化器,作者yu199195 (xiaoyu) (github.com)
作者也是国内知名开源社区dromara的创始人,并且作有多个开源产品,apache-shenyu是其中之一apache/incubator-shenyu: ShenYu is High-Performance Java API Gateway. (github.com)
本文只关心shenyu如何使用disruptor的api以及解决什么问题,也会继续了解一下disruptor提供了哪些api供我们使用
disruptor是典型的消费者生产者模型,通过无锁的环形数组ringBuffer作为队列,并提供多种消费者等待策略大大提高了性能
shenyu-disruptor模块
所有的类了对外暴露api的入口为DisruptorProviderManage类,开始撸代码
// 这是一个泛型化的类,并且对外暴露了构造方法,以实例维度使用当前类
public class DisruptorProviderManage<T> {
public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
// 消费者数量默认使用当前及其计算资源 * 2(可能是核心数或者线程数或者其他,取决于很多因素)
private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
private final Integer size;
private final Integer consumerSize;
// 我们自己定义的消费者工厂,用于承接不同业务,分别作为shenyu的交互的client和server侧两边的消费者
private final QueueConsumerFactory<T> consumerFactory;
// disruptor,也是泛型,绑定的当前实例
private DisruptorProvider<T> provider;
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory, final Integer ringBufferSize) {
this(consumerFactory,
DEFAULT_CONSUMER_SIZE,
ringBufferSize);
}
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory) {
this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
}
public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory,
final int consumerSize,
final int ringBufferSize) {
this.consumerFactory = consumerFactory;
this.size = ringBufferSize;
this.consumerSize = consumerSize;
}
// 启动disruptor
public void startup() {
this.startup(false);
}
public void startup(final boolean isOrderly) {
//一个可排序的任务执行器,当然如果是传入 true才会做真正的可排序逻辑,现在默认是false
OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
// 可以看出这是消费者的任务执行器
DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
int newConsumerSize = this.consumerSize;
EventFactory<DataEvent<T>> eventFactory;
if (isOrderly) {
// 如果是可排序的任务执行,那么消费者数量就是一个,即一个线程进行消费
newConsumerSize = 1;
// 可排序的 disruptor事件工厂
eventFactory = new OrderlyDisruptorEventFactory<>();
} else {
eventFactory = new DisruptorEventFactory<>();
}
// disruptor 启动类
Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
size,
// 生产者任务执行器,disruptor是典型的消费者生产者模型,通过
DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
// 多生产者模型
ProducerType.MULTI,
// 生产者阻塞策略,这其实是一个游标阻塞策略,用于等待可用的序列
// 因为其使用无锁的环形数组,需要先请求一个可用序列即环形数组下标来放入事件
// 当前默认使用性能较低的策略,但是消耗cpu较少
new BlockingWaitStrategy());
// 初始化消费者数组
QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
for (int i = 0; i < newConsumerSize; i++) {
consumers[i] = new QueueConsumer<>(executor, consumerFactory);
}
disruptor.handleEventsWithWorkerPool(consumers);
// 事件处理器,也就是消费者执行侧报错的钩子,默认忽略
disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 启动
disruptor.start();
// 获取队列
RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
// 将disruptor要素放入我们封装好的 对外暴露的操作类
provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
}
public DisruptorProvider<T> getProvider() {
return provider;
}
}
上述代码可以看出DisruptorProviderManage类是一个引导类,帮助我们去构造disruptor实例,并将当前实例通过泛型与其绑定。
下面来看disruptor的实例DisruptorProvider类
// 也是泛型,当前类为真正disruptor的封装
public class DisruptorProvider<T> {
// 大名鼎鼎的 ringBuffer
private final RingBuffer<DataEvent<T>> ringBuffer;
// disruptor的对外api类,我们这里将他封装到当前类
private final Disruptor<DataEvent<T>> disruptor;
// 是否可排序。shenyu提供了一个可排序的disruptor运行机制
private final boolean isOrderly;
// 这个是一个转换器,disruptor封装的ringBuffer环形数组区别于普通的队列。
// 这里的生产者发布一个事件需要先从ringBuffer获取sequence,也就是获取到环形数组可用的下标才可以发布事件到ringBuffer中
// 发布事件时直接传入sequence,并不会传入数据,通过sequence和要发布的数据关联。消费时也去拿可消费的sequence。
// 因为 RingBuffer提供泛型化的参数,但是,传入的参数不一定是指定的泛型化的类型,disruptor提供这个函数式接口来转换参数
// shenyu转换方式是又定义了一个 DataEvent的泛型,通过参数转换set到DataEvent中
private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
// 同上,通过转换set了排序用的hash值
private final EventTranslatorTwoArg<DataEvent<T>, T, String> orderlyArg = (event, sequence, t, orderly) -> {
if (event instanceof OrderlyDataEvent) {
((OrderlyDataEvent<T>) event).setHash(orderly);
}
event.setData(t);
};
public DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor, final boolean isOrderly) {
this.ringBuffer = ringBuffer;
this.disruptor = disruptor;
this.isOrderly = isOrderly;
}
public void onData(final T data) {
//是否可排序 不可变,初始化时就指定了
if (isOrderly) {
throw new IllegalArgumentException("The current provider is of orderly type. Please use onOrderlyData() method.");
}
try {
// 调用ringBuffer发布事件,传入参数和转换器
ringBuffer.publishEvent(translatorOneArg, data);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
public void onOrderlyData(final T data, final String... hashArray) {
if (!this.isOrderly) {
throw new IllegalArgumentException("The current provider is not of orderly type. Please use onData() method.");
}
try {
// 多加一个 用于排序的 hash值
String hash = String.join(":", hashArray);
ringBuffer.publishEvent(orderlyArg, data, hash);
} catch (Exception ex) {
logger.error("ex", ex);
}
}
//记得关闭资源
public void shutdown() {
if (null != disruptor) {
disruptor.shutdown();
}
}
}
看到上面生产者操作类后,再看看 消费者类的构建QueueConsumer
// WorkHandler是 disruptor抽象出来的 消费者处理类,提供给我们实现来做消费者的业务逻辑
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
// 可排序的 执行器,也可以用作正常逻辑的执行器
private final OrderlyExecutor executor;
// shenyu 自己封装的 消费者工厂,用于创建消费者执行器(实际就是实现runnable的执行单元)
private final QueueConsumerFactory<T> factory;
public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
this.executor = executor;
this.factory = factory;
}
@Override
public void onEvent(final DataEvent<T> t) {
if (t != null) {
ThreadPoolExecutor executor = orderly(t);
// 获取传入的 执行业务的执行器
QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 放入事件
queueConsumerExecutor.setData(t.getData());
// help gc 切断引用链,具体逻辑不是很清楚,应该是防止一直存在引用链导致无法被gc回收
t.setData(null);
// 执行业务
executor.execute(queueConsumerExecutor);
}
}
// 如果是 可排序的事件,并且hash值不为空则使用可排序执行器选择出一个SingletonExecutor执行器
private ThreadPoolExecutor orderly(final DataEvent<T> t) {
if (t instanceof OrderlyDataEvent && !isEmpty(((OrderlyDataEvent<T>) t).getHash())) {
return executor.select(((OrderlyDataEvent<T>) t).getHash());
} else {
return executor;
}
}
private boolean isEmpty(final String t) {
return t == null || t.isEmpty();
}
}
//---------------shenyu封装的可排序的线程池--------
public class OrderlyExecutor extends ThreadPoolExecutor {
// 线程安全的 可排序的map,目前jdk内只有这一个可排序的concurrentMap的实现
private final ConcurrentSkipListMap<Long, SingletonExecutor> virtualExecutors = new ConcurrentSkipListMap<>();
// 线程选择器
private final ThreadSelector threadSelector = new ThreadSelector();
public OrderlyExecutor(
final boolean isOrderly,
final int corePoolSize,
final int maximumPoolSize,
final long keepAliveTime,
final TimeUnit unit,
final BlockingQueue<Runnable> workQueue,
final ThreadFactory threadFactory,
final RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
orderlyThreadPool(isOrderly, corePoolSize, threadFactory);
}
private void orderlyThreadPool(final boolean isOrderly, final int corePoolSize, final ThreadFactory threadFactory) {
// 如果是可排序则特殊处理一下
if (isOrderly) {
// 将设置的线程池的数量按下标分别初始化一个 单个线程池放入ConcurrentSkipListMap中
IntStream.range(0, corePoolSize).forEach(index -> {
SingletonExecutor singletonExecutor = new SingletonExecutor(threadFactory);
// 将当前单线程的线程池 + 下标 进行hash
String hash = singletonExecutor.hashCode() + ":" + index;
byte[] bytes = threadSelector.sha(hash);
for (int i = 0; i < 4; i++) {
// 将hash过的值 再从 0 ~ 3 哈希一次存入。也就是map中有4个key执行同一个线程池执行器。为什么这么做不太清楚
this.virtualExecutors.put(threadSelector.hash(bytes, i), singletonExecutor);
}
});
}
}
public SingletonExecutor select(final String hash) {
long select = threadSelector.select(hash);
// 通过hash值命中 线程池
if (!virtualExecutors.containsKey(select)) {
// 如果没有命中,则选择最靠近自己的hash值的线程池,但是是向后取(也就是hash值大于等于当前值的第一个)
SortedMap<Long, SingletonExecutor> tailMap = virtualExecutors.tailMap(select);
if (tailMap.isEmpty()) {
// 如果计算出来的hash值已经是最后一个了,则取当前map的第一个值
select = virtualExecutors.firstKey();
} else {
select = tailMap.firstKey();
}
}
return virtualExecutors.get(select);
}
private static final class ThreadSelector {
public long select(final String hash) {
byte[] digest = sha(hash);
return hash(digest, 0);
}
private long hash(final byte[] digest, final int number) {
return (((long) (digest[3 + number * 4] & 0xFF) << 24)
| ((long) (digest[2 + number * 4] & 0xFF) << 16)
| ((long) (digest[1 + number * 4] & 0xFF) << 8)
| (digest[number * 4] & 0xFF))
& 0xFFFFFFFFL;
}
private byte[] sha(final String hash) {
byte[] bytes = hash.getBytes(StandardCharsets.UTF_8);
return Hashing
.sha256()
.newHasher()
.putBytes(bytes)
.hash().asBytes();
}
}
}
看到上面的可排序的线程池逻辑,其实就是结合了thread的priority属性的一个优先级线程池逻辑。
下面再来看看关于event的封装
// 自己封装的 事件类,基于泛型
public class DataEvent<T> {
private T data;
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}
// ---------------事件工厂,实现了disruptor的事件工厂---------------
public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
@Override
public DataEvent<T> newInstance() {
return new DataEvent<>();
}
}
//---------带有优先级逻辑的事件-------
public class OrderlyDataEvent<T> extends DataEvent<T> {
private String hash;
public String getHash() {
return hash;
}
public void setHash(final String hash) {
this.hash = hash;
}
}
// -------- 带有优先级逻辑的 事件工厂------
public class OrderlyDisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
@Override
public OrderlyDataEvent<T> newInstance() {
return new OrderlyDataEvent<>();
}
}
上面看完了shenyu对于 disruptor的封装然后来看看两个入口,消费者和生产者
生产者入口:DisruptorProvider#onData 消费者入口:QueueConsumerExecutor#run(实现了runnable作为消费者执行任务单元)
DisruptorProvider
生产者
先看ShenyuClientRegisterEventPublisher#publishEvent,这是一个shenyuClient端注册事件发布器,用于发布客户端(也就是被shenyu路由的后端服务们)的注册事件
public class ShenyuClientRegisterEventPublisher {
// 静态的单例
private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
//这里其实是一个可替换的变量,用于存放disruptor生产者,泛型是shenyu封装的数据接口,两个子类分别是元数据,和接口。
private DisruptorProviderManage<DataTypeParent> providerManage;
public static ShenyuClientRegisterEventPublisher getInstance() {
return INSTANCE;
}
//这里的start 理论上多个地方同时使用会造成providerManage 更改。这里可以这么用是因为不同的调用方是互斥的,也就是客户端会选择一种方式上报注册事件
public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// disruptor的消费者 实现(针对于客户端)
RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加消费者钩子,我们的业务逻辑没有直接写在消费者实现的run方法中,而是嵌入了一个Set<ExecutorSubscriber<T>> subscribers,观察者模式的调用
// 注册一个 元数据钩子
factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 注册一个 资源路径上报
factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
providerManage = new DisruptorProviderManage<>(factory);
// 启动disruptor
providerManage.startup();
}
// 发布事件 生产者
public void publishEvent(final DataTypeParent data) {
DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
provider.onData(data);
}
}
上述类的生成者方法调用
如上图可以看出是,server端接收client端 通过http调用后直接复用了生产者逻辑
还是生产者
如上图,有三对,成对是因为都是 uri和metadata注册。
随便点进去看一个类 都是ShenyuClientServerRegisterRepository接口的子类,并且是一个SPI接口,用于各种客户端上报数据后放入disruptor中。也就是对于disruptor他是一个生产者接口,但是对于client端,是一个接收client上报的的server端,我们分别看看他们如何运作
所有client上报数据的server端
// consul 通过applicationEvent监听到数据变化再放入 disruptor
@EventListener
public void onMetadataChange(final ConsulConfigChangedEvent event) {
Map<String, GetValue> metadataMap = event.getMetadataMap();
metadataMap.forEach((path, getValue) -> {
long modifyIndex = getValue.getModifyIndex();
if (metadataChanged(path, modifyIndex)) {
publishMetadata(getValue.getDecodedValue());
}
});
}
// ---------etcd 通过一个添加一个监听到 etcd客户端来处理disruptor----
private void subscribeMetaData(final String rpcType) {
String rpcPath = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
List<String> metadataPaths = client.getChildren(rpcPath);
for (String metadataPath: metadataPaths) {
String data = client.read(metadataPath);
publishMetadata(data);
}
LOGGER.info("subscribe metadata change: {}", rpcPath);
client.subscribeChildChanges(rpcPath, new EtcdListenHandler() {
@Override
public void updateHandler(final String path, final String value) {
publishMetadata(client.read(path));
}
@Override
public void deleteHandler(final String path, final String value) {
}
});
}
//-----------nacos也是通过添加监听,但是元数据监听的是configServer,uri数据监听的是nameServer,如下代码是nameServer-----------
private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
try {
Map<String, List<URIRegisterDTO>> services = new HashMap<>();
List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
healthyInstances.forEach(healthyInstance -> {
String contextPath = healthyInstance.getMetadata().get("contextPath");
String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
subscribeMetadata(serviceConfigName);
metadataConfigCache.add(serviceConfigName);
String metadata = healthyInstance.getMetadata().get("uriMetadata");
URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
if (RPC_URI_TYPE_SET.contains(rpcType)) {
services.values().forEach(this::publishRegisterURI);
}
LOGGER.info("subscribe uri : {}", serviceName);
namingService.subscribe(serviceName, event -> {
if (event instanceof NamingEvent) {
List<Instance> instances = ((NamingEvent) event).getInstances();
instances.forEach(instance -> {
String contextPath = instance.getMetadata().get("contextPath");
uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
});
refreshURIService(rpcType, serviceName);
}
});
} catch (NacosException e) {
throw new ShenyuException(e);
}
}
//-------http比较简单---------
@PostMapping("/register-metadata")
@ResponseBody
public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
publisher.publish(metaDataRegisterDTO);
return ShenyuResultMessage.SUCCESS;
}
// ----------zk 通过zk客户端添加监听-------
private void subscribeMetaData(final String rpcType) {
String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
CuratorCacheListener listener = CuratorCacheListener.builder()
.forCreatesAndChanges((oldNode, node) -> {
if (PathMatchUtils.match(RegisterPathConstants.REGISTER_METADATA_INSTANCE_PATH, node.getPath())) {
String data = new String(node.getData(), StandardCharsets.UTF_8);
publishMetadata(data);
LOGGER.info("zookeeper register metadata success: {}", data);
}
}).build();
client.addCache(contextPathParent, listener);
}
这里不具体讨论shenyu的数据上报的client和server端逻辑,这里可以看到所有server端在接收到client端上报或者监听到client端的数据变化都会第一时间放入disruptor处理,提供并发处理能力,也可以有削峰的效果,但是可能会带来一些延迟。
下面再看看 消费者 QueueConsumerExecutor类
// 实现了 runnable接口,可以作为disruptor的消费者执行任务
public abstract class QueueConsumerExecutor<T> implements Runnable {
private T data;
public T getData() {
return data;
}
public void setData(final T data) {
this.data = data;
}
}
再回过头看看 消费者工厂的实现
client和server都会使用disrupotr的消费者真正的处理
而QueueConsumerExecutor就是client和server两边的消费者执行逻辑
image.png
消费者工厂不用看了,就是创建不同的QueueConsumerExecutor实现
那么来看看QueueConsumerExecutor不同的实现
RegisterClientConsumerExecutor
public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
this.subscribers = new HashMap<>(executorSubscriberMap);
}
@Override
public void run() {
final T data = getData();
// client端消费者通过 工厂中的回调钩子来执行,其实通过disruptor这一步相当于做了限流,通过一个ringBuffer来防止client端过多的并发打进来
//我们可以做一个缓冲作用,通过抽象出来的回调钩子调用真正的业务,例如http请求,nacos请求,zk的数据变化请求等等 subscribers.get(data.getType()).executor(Lists.newArrayList(data));
}
// 这里也是 client端消费者的工厂类,创建当前client端的消费者
public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
@Override
public RegisterClientConsumerExecutor<T> create() {
Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
.stream()
.map(e -> (ExecutorTypeSubscriber<T>) e)
.collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
return new RegisterClientConsumerExecutor<>(map);
}
@Override
public String fixName() {
return "shenyu_register_client";
}
}
}
RegisterServerConsumerExecutor逻辑大同小异,是client端的数据变化到了server端后,也经过一层disruptor来解耦,并且削峰,由消费者中回调钩子执行业务
ExecutorSubscriber 接口为shenyu中disrupor的回调钩子接口
然后通过ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T>区分不同数据类型,来处理不同数据类型的消费者
不同数据类型
其实就是分为了 client的metadata,uri,即ShenyuClientRegisterRepository,用于client端的uri和metadata与shenyu的数据交互(这个是shenyu提供的多种数据同步机制)由client端注册uri和metadata
ShenyuClientRegisterService client是以什么服务形式注册到admin,让shenyu的网关基于什么样的client来调用后端服务(这里取决于用户使用什么后端服务提供shenyu的网关调用),然后通过这个注册器来注册metadata和uri
这里有点绕。需要一个流程图~
期待下一篇文章梳理一下apache-shenyu的被路由的后端服务,以及shenyu-admin,shenyu-bootstrap的注册以及数据动态感知的设计,这部分算是apache-shenyu支持多种协议的核心。
网友评论