美文网首页
apache-shenyu之Disruptor如何应用

apache-shenyu之Disruptor如何应用

作者: 二哈_8fd0 | 来源:发表于2022-05-29 00:18 被阅读0次

(apache-shenyu 2.4.3版本)apache shenyu前身soul网关,是一款java中spring5新引入的project-reactor的webflux,reactor-netty等为基础实现的高性能网关,现已进入apache孵化器,作者yu199195 (xiaoyu) (github.com)

作者也是国内知名开源社区dromara的创始人,并且作有多个开源产品,apache-shenyu是其中之一apache/incubator-shenyu: ShenYu is High-Performance Java API Gateway. (github.com)

本文只关心shenyu如何使用disruptor的api以及解决什么问题,也会继续了解一下disruptor提供了哪些api供我们使用

disruptor是典型的消费者生产者模型,通过无锁的环形数组ringBuffer作为队列,并提供多种消费者等待策略大大提高了性能

shenyu-disruptor模块
所有的类了

对外暴露api的入口为DisruptorProviderManage类,开始撸代码

// 这是一个泛型化的类,并且对外暴露了构造方法,以实例维度使用当前类
public class DisruptorProviderManage<T> {
    public static final Integer DEFAULT_SIZE = 4096 << 1 << 1;
    // 消费者数量默认使用当前及其计算资源 * 2(可能是核心数或者线程数或者其他,取决于很多因素)
    private static final Integer DEFAULT_CONSUMER_SIZE = Runtime.getRuntime().availableProcessors() << 1;
    private final Integer size;
    private final Integer consumerSize;
    // 我们自己定义的消费者工厂,用于承接不同业务,分别作为shenyu的交互的client和server侧两边的消费者
    private final QueueConsumerFactory<T> consumerFactory;
    // disruptor,也是泛型,绑定的当前实例
    private DisruptorProvider<T> provider;
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory, final Integer ringBufferSize) {
        this(consumerFactory,
                DEFAULT_CONSUMER_SIZE,
                ringBufferSize);
    }
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory) {
        this(consumerFactory, DEFAULT_CONSUMER_SIZE, DEFAULT_SIZE);
    }
    public DisruptorProviderManage(final QueueConsumerFactory<T> consumerFactory,
                                   final int consumerSize,
                                   final int ringBufferSize) {
        this.consumerFactory = consumerFactory;
        this.size = ringBufferSize;
        this.consumerSize = consumerSize;
    }
// 启动disruptor
    public void startup() {
        this.startup(false);
    }
    public void startup(final boolean isOrderly) {
//一个可排序的任务执行器,当然如果是传入 true才会做真正的可排序逻辑,现在默认是false
        OrderlyExecutor executor = new OrderlyExecutor(isOrderly, consumerSize, consumerSize, 0, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(),
// 可以看出这是消费者的任务执行器
                DisruptorThreadFactory.create("shenyu_disruptor_consumer_", false), new ThreadPoolExecutor.AbortPolicy());
        int newConsumerSize = this.consumerSize;
        EventFactory<DataEvent<T>> eventFactory;
        if (isOrderly) {
// 如果是可排序的任务执行,那么消费者数量就是一个,即一个线程进行消费
            newConsumerSize = 1;
// 可排序的 disruptor事件工厂
            eventFactory = new OrderlyDisruptorEventFactory<>();
        } else {
            eventFactory = new DisruptorEventFactory<>();
        }
// disruptor 启动类
        Disruptor<DataEvent<T>> disruptor = new Disruptor<>(eventFactory,
                size,
// 生产者任务执行器,disruptor是典型的消费者生产者模型,通过
                DisruptorThreadFactory.create("shenyu_disruptor_provider_" + consumerFactory.fixName(), false),
// 多生产者模型
                ProducerType.MULTI,
// 生产者阻塞策略,这其实是一个游标阻塞策略,用于等待可用的序列
// 因为其使用无锁的环形数组,需要先请求一个可用序列即环形数组下标来放入事件
// 当前默认使用性能较低的策略,但是消耗cpu较少
                new BlockingWaitStrategy());
        // 初始化消费者数组
        QueueConsumer<T>[] consumers = new QueueConsumer[newConsumerSize];
        for (int i = 0; i < newConsumerSize; i++) {
            consumers[i] = new QueueConsumer<>(executor, consumerFactory);
        }
        disruptor.handleEventsWithWorkerPool(consumers);
// 事件处理器,也就是消费者执行侧报错的钩子,默认忽略
        disruptor.setDefaultExceptionHandler(new IgnoreExceptionHandler());
// 启动
        disruptor.start();
// 获取队列
        RingBuffer<DataEvent<T>> ringBuffer = disruptor.getRingBuffer();
// 将disruptor要素放入我们封装好的 对外暴露的操作类
        provider = new DisruptorProvider<>(ringBuffer, disruptor, isOrderly);
    }
    public DisruptorProvider<T> getProvider() {
        return provider;
    }
}

上述代码可以看出DisruptorProviderManage类是一个引导类,帮助我们去构造disruptor实例,并将当前实例通过泛型与其绑定。
下面来看disruptor的实例DisruptorProvider类

// 也是泛型,当前类为真正disruptor的封装
public class DisruptorProvider<T> {
    // 大名鼎鼎的 ringBuffer
    private final RingBuffer<DataEvent<T>> ringBuffer;
    // disruptor的对外api类,我们这里将他封装到当前类
    private final Disruptor<DataEvent<T>> disruptor;
    // 是否可排序。shenyu提供了一个可排序的disruptor运行机制
    private final boolean isOrderly;
    // 这个是一个转换器,disruptor封装的ringBuffer环形数组区别于普通的队列。
// 这里的生产者发布一个事件需要先从ringBuffer获取sequence,也就是获取到环形数组可用的下标才可以发布事件到ringBuffer中
// 发布事件时直接传入sequence,并不会传入数据,通过sequence和要发布的数据关联。消费时也去拿可消费的sequence。
// 因为 RingBuffer提供泛型化的参数,但是,传入的参数不一定是指定的泛型化的类型,disruptor提供这个函数式接口来转换参数
// shenyu转换方式是又定义了一个 DataEvent的泛型,通过参数转换set到DataEvent中
    private final EventTranslatorOneArg<DataEvent<T>, T> translatorOneArg = (event, sequence, t) -> event.setData(t);
    // 同上,通过转换set了排序用的hash值
    private final EventTranslatorTwoArg<DataEvent<T>, T, String> orderlyArg = (event, sequence, t, orderly) -> {
        if (event instanceof OrderlyDataEvent) {
            ((OrderlyDataEvent<T>) event).setHash(orderly);
        }
        event.setData(t);
    };
    public DisruptorProvider(final RingBuffer<DataEvent<T>> ringBuffer, final Disruptor<DataEvent<T>> disruptor, final boolean isOrderly) {
        this.ringBuffer = ringBuffer;
        this.disruptor = disruptor;
        this.isOrderly = isOrderly;
    }
    public void onData(final T data) {
//是否可排序 不可变,初始化时就指定了
        if (isOrderly) {
            throw new IllegalArgumentException("The current provider is  of orderly type. Please use onOrderlyData() method.");
        }
        try {
// 调用ringBuffer发布事件,传入参数和转换器
            ringBuffer.publishEvent(translatorOneArg, data);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
    public void onOrderlyData(final T data, final String... hashArray) {
        if (!this.isOrderly) {
            throw new IllegalArgumentException("The current provider is not of orderly type. Please use onData() method.");
        }
        try {
// 多加一个 用于排序的 hash值
            String hash = String.join(":", hashArray);
            ringBuffer.publishEvent(orderlyArg, data, hash);
        } catch (Exception ex) {
            logger.error("ex", ex);
        }
    }
//记得关闭资源
    public void shutdown() {
        if (null != disruptor) {
            disruptor.shutdown();
        }
    }
}

看到上面生产者操作类后,再看看 消费者类的构建QueueConsumer

// WorkHandler是 disruptor抽象出来的 消费者处理类,提供给我们实现来做消费者的业务逻辑
public class QueueConsumer<T> implements WorkHandler<DataEvent<T>> {
    // 可排序的 执行器,也可以用作正常逻辑的执行器
    private final OrderlyExecutor executor;
    // shenyu 自己封装的 消费者工厂,用于创建消费者执行器(实际就是实现runnable的执行单元)
    private final QueueConsumerFactory<T> factory;

    public QueueConsumer(final OrderlyExecutor executor, final QueueConsumerFactory<T> factory) {
        this.executor = executor;
        this.factory = factory;
    }
    
    @Override
    public void onEvent(final DataEvent<T> t) {
        if (t != null) {
            ThreadPoolExecutor executor = orderly(t);
// 获取传入的 执行业务的执行器
            QueueConsumerExecutor<T> queueConsumerExecutor = factory.create();
// 放入事件
            queueConsumerExecutor.setData(t.getData());
            // help gc 切断引用链,具体逻辑不是很清楚,应该是防止一直存在引用链导致无法被gc回收
            t.setData(null);
// 执行业务
            executor.execute(queueConsumerExecutor);
        }
    }
    // 如果是 可排序的事件,并且hash值不为空则使用可排序执行器选择出一个SingletonExecutor执行器
    private ThreadPoolExecutor orderly(final DataEvent<T> t) {
        if (t instanceof OrderlyDataEvent && !isEmpty(((OrderlyDataEvent<T>) t).getHash())) {
            return executor.select(((OrderlyDataEvent<T>) t).getHash());
        } else {
            return executor;
        }
    }
    
    private boolean isEmpty(final String t) {
        return t == null || t.isEmpty();
    }
}
//---------------shenyu封装的可排序的线程池--------
public class OrderlyExecutor extends ThreadPoolExecutor {
    // 线程安全的 可排序的map,目前jdk内只有这一个可排序的concurrentMap的实现
    private final ConcurrentSkipListMap<Long, SingletonExecutor> virtualExecutors = new ConcurrentSkipListMap<>();
    // 线程选择器
    private final ThreadSelector threadSelector = new ThreadSelector();
    
    public OrderlyExecutor(
            final boolean isOrderly,
            final int corePoolSize,
            final int maximumPoolSize,
            final long keepAliveTime,
            final TimeUnit unit,
            final BlockingQueue<Runnable> workQueue,
            final ThreadFactory threadFactory,
            final RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        orderlyThreadPool(isOrderly, corePoolSize, threadFactory);
    }
    
    private void orderlyThreadPool(final boolean isOrderly, final int corePoolSize, final ThreadFactory threadFactory) {
// 如果是可排序则特殊处理一下
        if (isOrderly) {
// 将设置的线程池的数量按下标分别初始化一个 单个线程池放入ConcurrentSkipListMap中
            IntStream.range(0, corePoolSize).forEach(index -> {
                SingletonExecutor singletonExecutor = new SingletonExecutor(threadFactory);
// 将当前单线程的线程池 + 下标 进行hash
                String hash = singletonExecutor.hashCode() + ":" + index;
                byte[] bytes = threadSelector.sha(hash);
                for (int i = 0; i < 4; i++) {
// 将hash过的值 再从 0 ~ 3 哈希一次存入。也就是map中有4个key执行同一个线程池执行器。为什么这么做不太清楚
                    this.virtualExecutors.put(threadSelector.hash(bytes, i), singletonExecutor);
                }
            });
        }
    }
    
    public SingletonExecutor select(final String hash) {
        long select = threadSelector.select(hash);
// 通过hash值命中 线程池
        if (!virtualExecutors.containsKey(select)) {
// 如果没有命中,则选择最靠近自己的hash值的线程池,但是是向后取(也就是hash值大于等于当前值的第一个)
            SortedMap<Long, SingletonExecutor> tailMap = virtualExecutors.tailMap(select);
            if (tailMap.isEmpty()) {
// 如果计算出来的hash值已经是最后一个了,则取当前map的第一个值
                select = virtualExecutors.firstKey();
            } else {
                select = tailMap.firstKey();
            }
        }
        return virtualExecutors.get(select);
    }

    private static final class ThreadSelector {
        
        public long select(final String hash) {
            byte[] digest = sha(hash);
            return hash(digest, 0);
        }
        
        private long hash(final byte[] digest, final int number) {
            return (((long) (digest[3 + number * 4] & 0xFF) << 24)
                    | ((long) (digest[2 + number * 4] & 0xFF) << 16)
                    | ((long) (digest[1 + number * 4] & 0xFF) << 8)
                    | (digest[number * 4] & 0xFF))
                    & 0xFFFFFFFFL;
        }
        
        private byte[] sha(final String hash) {
            byte[] bytes = hash.getBytes(StandardCharsets.UTF_8);
            return Hashing
                    .sha256()
                    .newHasher()
                    .putBytes(bytes)
                    .hash().asBytes();
        }
    }
}

看到上面的可排序的线程池逻辑,其实就是结合了thread的priority属性的一个优先级线程池逻辑。
下面再来看看关于event的封装

// 自己封装的 事件类,基于泛型
public class DataEvent<T> {
    private T data;
    public T getData() {
        return data;
    }
    public void setData(final T data) {
        this.data = data;
    }
}
// ---------------事件工厂,实现了disruptor的事件工厂---------------
public class DisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
    
    @Override
    public DataEvent<T> newInstance() {
        return new DataEvent<>();
    }
}
//---------带有优先级逻辑的事件-------
public class OrderlyDataEvent<T> extends DataEvent<T> {
  
    private String hash;
    public String getHash() {
        return hash;
    }
    public void setHash(final String hash) {
        this.hash = hash;
    }
}
// -------- 带有优先级逻辑的 事件工厂------
public class OrderlyDisruptorEventFactory<T> implements EventFactory<DataEvent<T>> {
    
    @Override
    public OrderlyDataEvent<T> newInstance() {
        return new OrderlyDataEvent<>();
    }
}

上面看完了shenyu对于 disruptor的封装然后来看看两个入口,消费者和生产者

生产者入口:DisruptorProvider#onData 消费者入口:QueueConsumerExecutor#run(实现了runnable作为消费者执行任务单元)

DisruptorProvider


生产者

先看ShenyuClientRegisterEventPublisher#publishEvent,这是一个shenyuClient端注册事件发布器,用于发布客户端(也就是被shenyu路由的后端服务们)的注册事件

public class ShenyuClientRegisterEventPublisher {
    // 静态的单例
    private static final ShenyuClientRegisterEventPublisher INSTANCE = new ShenyuClientRegisterEventPublisher();
    //这里其实是一个可替换的变量,用于存放disruptor生产者,泛型是shenyu封装的数据接口,两个子类分别是元数据,和接口。
    private DisruptorProviderManage<DataTypeParent> providerManage;
    public static ShenyuClientRegisterEventPublisher getInstance() {
        return INSTANCE;
    }
//这里的start 理论上多个地方同时使用会造成providerManage 更改。这里可以这么用是因为不同的调用方是互斥的,也就是客户端会选择一种方式上报注册事件
    public void start(final ShenyuClientRegisterRepository shenyuClientRegisterRepository) {
// disruptor的消费者 实现(针对于客户端)
        RegisterClientExecutorFactory factory = new RegisterClientExecutorFactory();
// 添加消费者钩子,我们的业务逻辑没有直接写在消费者实现的run方法中,而是嵌入了一个Set<ExecutorSubscriber<T>> subscribers,观察者模式的调用
// 注册一个 元数据钩子
        factory.addSubscribers(new ShenyuClientMetadataExecutorSubscriber(shenyuClientRegisterRepository));
// 注册一个 资源路径上报
        factory.addSubscribers(new ShenyuClientURIExecutorSubscriber(shenyuClientRegisterRepository));
        providerManage = new DisruptorProviderManage<>(factory);
// 启动disruptor
        providerManage.startup();
    }
   // 发布事件 生产者
    public void publishEvent(final DataTypeParent data) {
        DisruptorProvider<DataTypeParent> provider = providerManage.getProvider();
        provider.onData(data);
    }
}
上述类的生成者方法调用

如上图可以看出是,server端接收client端 通过http调用后直接复用了生产者逻辑


还是生产者

如上图,有三对,成对是因为都是 uri和metadata注册。
随便点进去看一个类 都是ShenyuClientServerRegisterRepository接口的子类,并且是一个SPI接口,用于各种客户端上报数据后放入disruptor中。也就是对于disruptor他是一个生产者接口,但是对于client端,是一个接收client上报的的server端,我们分别看看他们如何运作


所有client上报数据的server端
// consul 通过applicationEvent监听到数据变化再放入 disruptor
    @EventListener
    public void onMetadataChange(final ConsulConfigChangedEvent event) {
        Map<String, GetValue> metadataMap = event.getMetadataMap();
        metadataMap.forEach((path, getValue) -> {
            long modifyIndex = getValue.getModifyIndex();
            if (metadataChanged(path, modifyIndex)) {
                publishMetadata(getValue.getDecodedValue());
            }
        });
    }
// ---------etcd 通过一个添加一个监听到 etcd客户端来处理disruptor----
    private void subscribeMetaData(final String rpcType) {
        String rpcPath = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
        List<String> metadataPaths = client.getChildren(rpcPath);
        for (String metadataPath: metadataPaths) {
            String data = client.read(metadataPath);
            publishMetadata(data);
        }
        LOGGER.info("subscribe metadata change: {}", rpcPath);
        client.subscribeChildChanges(rpcPath, new EtcdListenHandler() {
            @Override
            public void updateHandler(final String path, final String value) {
                publishMetadata(client.read(path));
            }
            @Override
            public void deleteHandler(final String path, final String value) {
            }
        });
    }
//-----------nacos也是通过添加监听,但是元数据监听的是configServer,uri数据监听的是nameServer,如下代码是nameServer-----------
    private void subscribeRpcTypeService(final RpcTypeEnum rpcType) {
        final String serviceName = RegisterPathConstants.buildServiceInstancePath(rpcType.getName());
        try {
            Map<String, List<URIRegisterDTO>> services = new HashMap<>();
            List<Instance> healthyInstances = namingService.selectInstances(serviceName, true);
            healthyInstances.forEach(healthyInstance -> {
                String contextPath = healthyInstance.getMetadata().get("contextPath");
                String serviceConfigName = RegisterPathConstants.buildServiceConfigPath(rpcType.getName(), contextPath);
                subscribeMetadata(serviceConfigName);
                metadataConfigCache.add(serviceConfigName);
                String metadata = healthyInstance.getMetadata().get("uriMetadata");
                URIRegisterDTO uriRegisterDTO = GsonUtils.getInstance().fromJson(metadata, URIRegisterDTO.class);
                services.computeIfAbsent(contextPath, k -> new ArrayList<>()).add(uriRegisterDTO);
                uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
            });
            if (RPC_URI_TYPE_SET.contains(rpcType)) {
                services.values().forEach(this::publishRegisterURI);
            }
            LOGGER.info("subscribe uri : {}", serviceName);
            namingService.subscribe(serviceName, event -> {
                if (event instanceof NamingEvent) {
                    List<Instance> instances = ((NamingEvent) event).getInstances();
                    instances.forEach(instance -> {
                        String contextPath = instance.getMetadata().get("contextPath");
                        uriServiceCache.computeIfAbsent(serviceName, k -> new ConcurrentSkipListSet<>()).add(contextPath);
                    });
                    refreshURIService(rpcType, serviceName);
                }
            });
        } catch (NacosException e) {
            throw new ShenyuException(e);
        }
    }
//-------http比较简单---------
    @PostMapping("/register-metadata")
    @ResponseBody
    public String registerMetadata(@RequestBody final MetaDataRegisterDTO metaDataRegisterDTO) {
        publisher.publish(metaDataRegisterDTO);
        return ShenyuResultMessage.SUCCESS;
    }
// ----------zk 通过zk客户端添加监听-------
    private void subscribeMetaData(final String rpcType) {
        String contextPathParent = RegisterPathConstants.buildMetaDataContextPathParent(rpcType);
        CuratorCacheListener listener = CuratorCacheListener.builder()
                .forCreatesAndChanges((oldNode, node) -> {
                    if (PathMatchUtils.match(RegisterPathConstants.REGISTER_METADATA_INSTANCE_PATH, node.getPath())) {
                        String data = new String(node.getData(), StandardCharsets.UTF_8);
                        publishMetadata(data);
                        LOGGER.info("zookeeper register metadata success: {}", data);
                    }
                }).build();
        client.addCache(contextPathParent, listener);
    }

这里不具体讨论shenyu的数据上报的client和server端逻辑,这里可以看到所有server端在接收到client端上报或者监听到client端的数据变化都会第一时间放入disruptor处理,提供并发处理能力,也可以有削峰的效果,但是可能会带来一些延迟。

下面再看看 消费者 QueueConsumerExecutor类
// 实现了 runnable接口,可以作为disruptor的消费者执行任务
public abstract class QueueConsumerExecutor<T> implements Runnable {

    private T data;

    public T getData() {
        return data;
    }

    public void setData(final T data) {
        this.data = data;
    }
}

再回过头看看 消费者工厂的实现


client和server都会使用disrupotr的消费者真正的处理

而QueueConsumerExecutor就是client和server两边的消费者执行逻辑


image.png
消费者工厂不用看了,就是创建不同的QueueConsumerExecutor实现
那么来看看QueueConsumerExecutor不同的实现

RegisterClientConsumerExecutor

public final class RegisterClientConsumerExecutor<T extends DataTypeParent> extends QueueConsumerExecutor<T> {
    
    private final Map<DataType, ExecutorTypeSubscriber<T>> subscribers;
    
    private RegisterClientConsumerExecutor(final Map<DataType, ExecutorTypeSubscriber<T>> executorSubscriberMap) {
        this.subscribers = new HashMap<>(executorSubscriberMap);
    }

    @Override
    public void run() {
        final T data = getData();
// client端消费者通过   工厂中的回调钩子来执行,其实通过disruptor这一步相当于做了限流,通过一个ringBuffer来防止client端过多的并发打进来
//我们可以做一个缓冲作用,通过抽象出来的回调钩子调用真正的业务,例如http请求,nacos请求,zk的数据变化请求等等      subscribers.get(data.getType()).executor(Lists.newArrayList(data));
    }
// 这里也是 client端消费者的工厂类,创建当前client端的消费者
    public static class RegisterClientExecutorFactory<T extends DataTypeParent> extends AbstractQueueConsumerFactory<T> {
        
        @Override
        public RegisterClientConsumerExecutor<T> create() {
            Map<DataType, ExecutorTypeSubscriber<T>> map = getSubscribers()
                    .stream()
                    .map(e -> (ExecutorTypeSubscriber<T>) e)
                    .collect(Collectors.toMap(ExecutorTypeSubscriber::getType, e -> e));
            return new RegisterClientConsumerExecutor<>(map);
        }

        @Override
        public String fixName() {
            return "shenyu_register_client";
        }
    }
}

RegisterServerConsumerExecutor逻辑大同小异,是client端的数据变化到了server端后,也经过一层disruptor来解耦,并且削峰,由消费者中回调钩子执行业务

ExecutorSubscriber 接口为shenyu中disrupor的回调钩子接口

然后通过ExecutorTypeSubscriber<T extends DataTypeParent> extends ExecutorSubscriber<T>区分不同数据类型,来处理不同数据类型的消费者


不同数据类型

其实就是分为了 client的metadata,uri,即ShenyuClientRegisterRepository,用于client端的uri和metadata与shenyu的数据交互(这个是shenyu提供的多种数据同步机制)由client端注册uri和metadata
ShenyuClientRegisterService client是以什么服务形式注册到admin,让shenyu的网关基于什么样的client来调用后端服务(这里取决于用户使用什么后端服务提供shenyu的网关调用),然后通过这个注册器来注册metadata和uri
这里有点绕。需要一个流程图~
期待下一篇文章梳理一下apache-shenyu的被路由的后端服务,以及shenyu-admin,shenyu-bootstrap的注册以及数据动态感知的设计,这部分算是apache-shenyu支持多种协议的核心。

相关文章

网友评论

      本文标题:apache-shenyu之Disruptor如何应用

      本文链接:https://www.haomeiwen.com/subject/tyjsprtx.html