美文网首页
apache-shenyu之Disruptor如何应用

apache-shenyu之Disruptor如何应用

作者: 二哈_8fd0 | 来源:发表于2022-05-29 00:18 被阅读0次

    (apache-shenyu 2.4.3版本)apache shenyu前身soul网关,是一款java中spring5新引入的project-reactor的webflux,reactor-netty等为基础实现的高性能网关,现已进入apache孵化器,作者yu199195 (xiaoyu) (github.com)

    作者也是国内知名开源社区dromara的创始人,并且作有多个开源产品,apache-shenyu是其中之一apache/incubator-shenyu: ShenYu is High-Performance Java API Gateway. (github.com)

    本文只关心shenyu如何使用disruptor的api以及解决什么问题,也会继续了解一下disruptor提供了哪些api供我们使用

    disruptor是典型的消费者生产者模型,通过无锁的环形数组ringBuffer作为队列,并提供多种消费者等待策略大大提高了性能

    shenyu-disruptor模块
    所有的类了

    对外暴露api的入口为DisruptorProviderManage类,开始撸代码

    // 这是一个泛型化的类,并且对外暴露了构造方法,以实例维度使用当前类
    public class DisruptorProviderManage<T> {
        public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
        // 消费者数量默认使用当前及其计算资源 * 2(可能是核心数或者线程数或者其他,取决于很多因素)
        private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
        private final Integer size;
        private final Integer consumerSize;
        // 我们自己定义的消费者工厂,用于承接不同业务,分别作为shenyu的交互的client和server侧两边的消费者
        private final QueueConsumerFactory<T> consumerFactory;
        // disruptor,也是泛型,绑定的当前实例
        private DisruptorProvider<T> provider;
        public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory, final Integer ringBufferSize) {
            this(consumerFactory,
                    DEFAULT_CONSUMER_SIZE,
                    ringBufferSize);
        }
        public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory) {
            this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
        }
        public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory,
                                       final int consumerSize,
                                       final int ringBufferSize) {
            this.consumerFactory = consumerFactory;
            this.size = ringBufferSize;
            this.consumerSize = consumerSize;
        }
    // 启动disruptor
        public void startup() {
            this.startup(false);
        }
        public void startup(final boolean isOrderly) {
    //一个可排序的任务执行器,当然如果是传入 true才会做真正的可排序逻辑,现在默认是false
            OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(),
    // 可以看出这是消费者的任务执行器
                    DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
            int newConsumerSize = this.consumerSize;
            EventFactory<DataEvent<T>> eventFactory;
            if (isOrderly) {
    // 如果是可排序的任务执行,那么消费者数量就是一个,即一个线程进行消费
                newConsumerSize = 1;
    // 可排序的 disruptor事件工厂
                eventFactory = new OrderlyDisruptorEventFactory<>();
            } else {
                eventFactory = new DisruptorEventFactory<>();
            }
    // disruptor 启动类
            Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                    size,
    // 生产者任务执行器,disruptor是典型的消费者生产者模型,通过
                    DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
    // 多生产者模型
                    ProducerType.MULTI,
    // 生产者阻塞策略,这其实是一个游标阻塞策略,用于等待可用的序列
    // 因为其使用无锁的环形数组,需要先请求一个可用序列即环形数组下标来放入事件
    // 当前默认使用性能较低的策略,但是消耗cpu较少
                    new BlockingWaitStrategy());
            // 初始化消费者数组
            QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
            for (int i = 0; i < newConsumerSize; i++) {
                consumers[i] = new QueueConsumer<>(executor, consumerFactory);
            }
            disruptor.handleEventsWithWorkerPool(consumers);
    // 事件处理器,也就是消费者执行侧报错的钩子,默认忽略
            disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
    // 启动
            disruptor.start();
    // 获取队列
            RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
    // 将disruptor要素放入我们封装好的 对外暴露的操作类
            provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
        }
        public DisruptorProvider<T> getProvider() {
            return provider;
        }
    }
    

    上述代码可以看出DisruptorProviderManage类是一个引导类,帮助我们去构造disruptor实例,并将当前实例通过泛型与其绑定。
    下面来看disruptor的实例DisruptorProvider类

    // 也是泛型,当前类为真正disruptor的封装
    public class DisruptorProvider<T> {
        // 大名鼎鼎的 ringBuffer
        private final RingBuffer<DataEvent<T>> ringBuffer;
        // disruptor的对外api类,我们这里将他封装到当前类
        private final Disruptor<DataEvent<T>> disruptor;
        // 是否可排序。shenyu提供了一个可排序的disruptor运行机制
        private final boolean isOrderly;
        // 这个是一个转换器,disruptor封装的ringBuffer环形数组区别于普通的队列。
    // 这里的生产者发布一个事件需要先从ringBuffer获取sequence,也就是获取到环形数组可用的下标才可以发布事件到ringBuffer中
    // 发布事件时直接传入sequence,并不会传入数据,通过sequence和要发布的数据关联。消费时也去拿可消费的sequence。
    // 因为 RingBuffer提供泛型化的参数,但是,传入的参数不一定是指定的泛型化的类型,disruptor提供这个函数式接口来转换参数
    // shenyu转换方式是又定义了一个 DataEvent的泛型,通过参数转换set到DataEvent中
        private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
        // 同上,通过转换set了排序用的hash值
        private final EventTranslatorTwoArg<DataEvent<T>, T, String> orderlyArg = (event, sequence, t, orderly) -> {
            if (event instanceof OrderlyDataEvent) {
                ((OrderlyDataEvent<T>) event).setHash(orderly);
            }
            event.setData(t);
        };
        public DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor, final boolean isOrderly) {
            this.ringBuffer = ringBuffer;
            this.disruptor = disruptor;
            this.isOrderly = isOrderly;
        }
        public void onData(final T data) {
    //是否可排序 不可变,初始化时就指定了
            if (isOrderly) {
                throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
            }
            try {
    // 调用ringBuffer发布事件,传入参数和转换器
                ringBuffer.publishEvent(translatorOneArg, data);
            } catch (Exception ex) {
                logger.error("ex", ex);
            }
        }
        public void onOrderlyData(final T data, final String... hashArray) {
            if (!this.isOrderly) {
                throw new IllegalArgumentException("The current provider is not of orderly type. Please use onData() method.");
            }
            try {
    // 多加一个 用于排序的 hash值
                String hash = String.join(":", hashArray);
                ringBuffer.publishEvent(orderlyArg, data, hash);
            } catch (Exception ex) {
                logger.error("ex", ex);
            }
        }
    //记得关闭资源
        public void shutdown() {
            if (null != disruptor) {
                disruptor.shutdown();
            }
        }
    }
    
    

    看到上面生产者操作类后,再看看 消费者类的构建QueueConsumer

    // WorkHandler是 disruptor抽象出来的 消费者处理类,提供给我们实现来做消费者的业务逻辑
    public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
        // 可排序的 执行器,也可以用作正常逻辑的执行器
        private final OrderlyExecutor executor;
        // shenyu 自己封装的 消费者工厂,用于创建消费者执行器(实际就是实现runnable的执行单元)
        private final QueueConsumerFactory<T> factory;
    
        public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
            this.executor = executor;
            this.factory = factory;
        }
        
        @Override
        public void onEvent(final DataEvent<T> t) {
            if (t != null) {
                ThreadPoolExecutor executor = orderly(t);
    // 获取传入的 执行业务的执行器
                QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
    // 放入事件
                queueConsumerExecutor.setData(t.getData());
                // help gc 切断引用链,具体逻辑不是很清楚,应该是防止一直存在引用链导致无法被gc回收
                t.setData(null);
    // 执行业务
                executor.execute(queueConsumerExecutor);
            }
        }
        // 如果是 可排序的事件,并且hash值不为空则使用可排序执行器选择出一个SingletonExecutor执行器
        private ThreadPoolExecutor orderly(final DataEvent<T> t) {
            if (t instanceof OrderlyDataEvent && !isEmpty(((OrderlyDataEvent<T>) t).getHash())) {
                return executor.select(((OrderlyDataEvent<T>) t).getHash());
            } else {
                return executor;
            }
        }
        
        private boolean isEmpty(final String t) {
            return t == null || t.isEmpty();
        }
    }
    //---------------shenyu封装的可排序的线程池--------
    public class OrderlyExecutor extends ThreadPoolExecutor {
        // 线程安全的 可排序的map,目前jdk内只有这一个可排序的concurrentMap的实现
        private final ConcurrentSkipListMap<Long, SingletonExecutor> virtualExecutors = new ConcurrentSkipListMap<>();
        // 线程选择器
        private final ThreadSelector threadSelector = new ThreadSelector();
        
        public OrderlyExecutor(
                final boolean isOrderly,
                final int corePoolSize,
                final int maximumPoolSize,
                final long keepAliveTime,
                final TimeUnit unit,
                final BlockingQueue<Runnable> workQueue,
                final ThreadFactory threadFactory,
                final RejectedExecutionHandler handler) {
            super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
            orderlyThreadPool(isOrderly, corePoolSize, threadFactory);
        }
        
        private void orderlyThreadPool(final boolean isOrderly, final int corePoolSize, final ThreadFactory threadFactory) {
    // 如果是可排序则特殊处理一下
            if (isOrderly) {
    // 将设置的线程池的数量按下标分别初始化一个 单个线程池放入ConcurrentSkipListMap中
                IntStream.range(0, corePoolSize).forEach(index -> {
                    SingletonExecutor singletonExecutor = new SingletonExecutor(threadFactory);
    // 将当前单线程的线程池 + 下标 进行hash
                    String hash = singletonExecutor.hashCode() + ":" + index;
                    byte[] bytes = threadSelector.sha(hash);
                    for (int i = 0; i < 4; i++) {
    // 将hash过的值 再从 0 ~ 3 哈希一次存入。也就是map中有4个key执行同一个线程池执行器。为什么这么做不太清楚
                        this.virtualExecutors.put(threadSelector.hash(bytes, i), singletonExecutor);
                    }
                });
            }
        }
        
        public SingletonExecutor select(final String hash) {
            long select = threadSelector.select(hash);
    // 通过hash值命中 线程池
            if (!virtualExecutors.containsKey(select)) {
    // 如果没有命中,则选择最靠近自己的hash值的线程池,但是是向后取(也就是hash值大于等于当前值的第一个)
                SortedMap<Long, SingletonExecutor> tailMap = virtualExecutors.tailMap(select);
                if (tailMap.isEmpty()) {
    // 如果计算出来的hash值已经是最后一个了,则取当前map的第一个值
                    select = virtualExecutors.firstKey();
                } else {
                    select = tailMap.firstKey();
                }
            }
            return virtualExecutors.get(select);
        }
    
        private static final class ThreadSelector {
            
            public long select(final String hash) {
                byte[] digest = sha(hash);
                return hash(digest, 0);
            }
            
            private long hash(final byte[] digest, final int number) {
                return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                        | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                        | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                        | (digest[number * 4] & 0xFF))
                        & 0xFFFFFFFFL;
            }
            
            private byte[] sha(final String hash) {
                byte[] bytes = hash.getBytes(StandardCharsets.UTF_8);
                return Hashing
                        .sha256()
                        .newHasher()
                        .putBytes(bytes)
                        .hash().asBytes();
            }
        }
    }
    
    

    看到上面的可排序的线程池逻辑,其实就是结合了thread的priority属性的一个优先级线程池逻辑。
    下面再来看看关于event的封装

    // 自己封装的 事件类,基于泛型
    public class DataEvent<T> {
        private T data;
        public T getData() {
            return data;
        }
        public void setData(final T data) {
            this.data = data;
        }
    }
    // ---------------事件工厂,实现了disruptor的事件工厂---------------
    public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
        
        @Override
        public DataEvent<T> newInstance() {
            return new DataEvent<>();
        }
    }
    //---------带有优先级逻辑的事件-------
    public class OrderlyDataEvent<T> extends DataEvent<T> {
      
        private String hash;
        public String getHash() {
            return hash;
        }
        public void setHash(final String hash) {
            this.hash = hash;
        }
    }
    // -------- 带有优先级逻辑的 事件工厂------
    public class OrderlyDisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
        
        @Override
        public OrderlyDataEvent<T> newInstance() {
            return new OrderlyDataEvent<>();
        }
    }
    

    上面看完了shenyu对于 disruptor的封装然后来看看两个入口,消费者和生产者

    生产者入口:DisruptorProvider#onData 消费者入口:QueueConsumerExecutor#run(实现了runnable作为消费者执行任务单元)

    DisruptorProvider


    生产者

    先看ShenyuClientRegisterEventPublisher#publishEvent,这是一个shenyuClient端注册事件发布器,用于发布客户端(也就是被shenyu路由的后端服务们)的注册事件

    public class ShenyuClientRegisterEventPublisher {
        // 静态的单例
        private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
        //这里其实是一个可替换的变量,用于存放disruptor生产者,泛型是shenyu封装的数据接口,两个子类分别是元数据,和接口。
        private DisruptorProviderManage<DataTypeParent> providerManage;
        public static ShenyuClientRegisterEventPublisher getInstance() {
            return INSTANCE;
        }
    //这里的start 理论上多个地方同时使用会造成providerManage 更改。这里可以这么用是因为不同的调用方是互斥的,也就是客户端会选择一种方式上报注册事件
        public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
    // disruptor的消费者 实现(针对于客户端)
            RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
    // 添加消费者钩子,我们的业务逻辑没有直接写在消费者实现的run方法中,而是嵌入了一个Set<ExecutorSubscriber<T>> subscribers,观察者模式的调用
    // 注册一个 元数据钩子
            factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
    // 注册一个 资源路径上报
            factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
            providerManage = new DisruptorProviderManage<>(factory);
    // 启动disruptor
            providerManage.startup();
        }
       // 发布事件 生产者
        public void publishEvent(final DataTypeParent data) {
            DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
            provider.onData(data);
        }
    }
    
    上述类的生成者方法调用

    如上图可以看出是,server端接收client端 通过http调用后直接复用了生产者逻辑


    还是生产者

    如上图,有三对,成对是因为都是 uri和metadata注册。
    随便点进去看一个类 都是ShenyuClientServerRegisterRepository接口的子类,并且是一个SPI接口,用于各种客户端上报数据后放入disruptor中。也就是对于disruptor他是一个生产者接口,但是对于client端,是一个接收client上报的的server端,我们分别看看他们如何运作


    所有client上报数据的server端
    // consul 通过applicationEvent监听到数据变化再放入 disruptor
        @EventListener
        public void onMetadataChange(final ConsulConfigChangedEvent event) {
            Map<String, GetValue> metadataMap = event.getMetadataMap();
            metadataMap.forEach((path, getValue) -> {
                long modifyIndex = getValue.getModifyIndex();
                if (metadataChanged(path, modifyIndex)) {
                    publishMetadata(getValue.getDecodedValue());
                }
            });
        }
    // ---------etcd 通过一个添加一个监听到 etcd客户端来处理disruptor----
        private void subscribeMetaData(final String rpcType) {
            String rpcPath = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
            List<String> metadataPaths = client.getChildren(rpcPath);
            for (String metadataPath: metadataPaths) {
                String data = client.read(metadataPath);
                publishMetadata(data);
            }
            LOGGER.info("subscribe metadata change: {}", rpcPath);
            client.subscribeChildChanges(rpcPath, new EtcdListenHandler() {
                @Override
                public void updateHandler(final String path, final String value) {
                    publishMetadata(client.read(path));
                }
                @Override
                public void deleteHandler(final String path, final String value) {
                }
            });
        }
    //-----------nacos也是通过添加监听,但是元数据监听的是configServer,uri数据监听的是nameServer,如下代码是nameServer-----------
        private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
            final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
            try {
                Map<String, List<URIRegisterDTO>> services = new HashMap<>();
                List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
                healthyInstances.forEach(healthyInstance -> {
                    String contextPath = healthyInstance.getMetadata().get("contextPath");
                    String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
                    subscribeMetadata(serviceConfigName);
                    metadataConfigCache.add(serviceConfigName);
                    String metadata = healthyInstance.getMetadata().get("uriMetadata");
                    URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
                    services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
                    uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
                });
                if (RPC_URI_TYPE_SET.contains(rpcType)) {
                    services.values().forEach(this::publishRegisterURI);
                }
                LOGGER.info("subscribe uri : {}", serviceName);
                namingService.subscribe(serviceName, event -> {
                    if (event instanceof NamingEvent) {
                        List<Instance> instances = ((NamingEvent) event).getInstances();
                        instances.forEach(instance -> {
                            String contextPath = instance.getMetadata().get("contextPath");
                            uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
                        });
                        refreshURIService(rpcType, serviceName);
                    }
                });
            } catch (NacosException e) {
                throw new ShenyuException(e);
            }
        }
    //-------http比较简单---------
        @PostMapping("/register-metadata")
        @ResponseBody
        public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
            publisher.publish(metaDataRegisterDTO);
            return ShenyuResultMessage.SUCCESS;
        }
    // ----------zk 通过zk客户端添加监听-------
        private void subscribeMetaData(final String rpcType) {
            String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
            CuratorCacheListener listener = CuratorCacheListener.builder()
                    .forCreatesAndChanges((oldNode, node) -> {
                        if (PathMatchUtils.match(RegisterPathConstants.REGISTER_METADATA_INSTANCE_PATH, node.getPath())) {
                            String data = new String(node.getData(), StandardCharsets.UTF_8);
                            publishMetadata(data);
                            LOGGER.info("zookeeper register metadata success: {}", data);
                        }
                    }).build();
            client.addCache(contextPathParent, listener);
        }
    

    这里不具体讨论shenyu的数据上报的client和server端逻辑,这里可以看到所有server端在接收到client端上报或者监听到client端的数据变化都会第一时间放入disruptor处理,提供并发处理能力,也可以有削峰的效果,但是可能会带来一些延迟。

    下面再看看 消费者 QueueConsumerExecutor类
    // 实现了 runnable接口,可以作为disruptor的消费者执行任务
    public abstract class QueueConsumerExecutor<T> implements Runnable {
    
        private T data;
    
        public T getData() {
            return data;
        }
    
        public void setData(final T data) {
            this.data = data;
        }
    }
    

    再回过头看看 消费者工厂的实现


    client和server都会使用disrupotr的消费者真正的处理

    而QueueConsumerExecutor就是client和server两边的消费者执行逻辑


    image.png
    消费者工厂不用看了,就是创建不同的QueueConsumerExecutor实现
    那么来看看QueueConsumerExecutor不同的实现

    RegisterClientConsumerExecutor

    public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
        
        private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
        
        private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
            this.subscribers = new HashMap<>(executorSubscriberMap);
        }
    
        @Override
        public void run() {
            final T data = getData();
    // client端消费者通过   工厂中的回调钩子来执行,其实通过disruptor这一步相当于做了限流,通过一个ringBuffer来防止client端过多的并发打进来
    //我们可以做一个缓冲作用,通过抽象出来的回调钩子调用真正的业务,例如http请求,nacos请求,zk的数据变化请求等等      subscribers.get(data.getType()).executor(Lists.newArrayList(data));
        }
    // 这里也是 client端消费者的工厂类,创建当前client端的消费者
        public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
            
            @Override
            public RegisterClientConsumerExecutor<T> create() {
                Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
                        .stream()
                        .map(e -> (ExecutorTypeSubscriber<T>) e)
                        .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
                return new RegisterClientConsumerExecutor<>(map);
            }
    
            @Override
            public String fixName() {
                return "shenyu_register_client";
            }
        }
    }
    

    RegisterServerConsumerExecutor逻辑大同小异,是client端的数据变化到了server端后,也经过一层disruptor来解耦,并且削峰,由消费者中回调钩子执行业务

    ExecutorSubscriber 接口为shenyu中disrupor的回调钩子接口

    然后通过ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T>区分不同数据类型,来处理不同数据类型的消费者


    不同数据类型

    其实就是分为了 client的metadata,uri,即ShenyuClientRegisterRepository,用于client端的uri和metadata与shenyu的数据交互(这个是shenyu提供的多种数据同步机制)由client端注册uri和metadata
    ShenyuClientRegisterService client是以什么服务形式注册到admin,让shenyu的网关基于什么样的client来调用后端服务(这里取决于用户使用什么后端服务提供shenyu的网关调用),然后通过这个注册器来注册metadata和uri
    这里有点绕。需要一个流程图~
    期待下一篇文章梳理一下apache-shenyu的被路由的后端服务,以及shenyu-admin,shenyu-bootstrap的注册以及数据动态感知的设计,这部分算是apache-shenyu支持多种协议的核心。

    相关文章

      网友评论

          本文标题:apache-shenyu之Disruptor如何应用

          本文链接:https://www.haomeiwen.com/subject/tyjsprtx.html