美文网首页
异步消息系统的几种实现

异步消息系统的几种实现

作者: habit_learning | 来源:发表于2019-01-04 13:58 被阅读66次

    在互金行业中,支付系统是连接消费者、商家(或平台)和金融机构的桥梁,管理支付数据,调用第三方支付平台接口,记录支付信息(对应订单号,支付金额等),金额对账等功能。而支付系统中一个重要的环节就是将交易结果异步通知业务方,一般我们会多次通知业务方,直到收到业务方的确认接收回馈或者达到最大通知次数,并且异步通知的时间间隔应该是逐次拉大的。下面,我就介绍几种实现延迟消息的方案。

    1、DelayQueue

    DelayQueue是 JDK 自带的一个延迟队列,它存放的是实现了 Delayed 接口的消息。

    package yongda.vo;
    
    import java.util.concurrent.Delayed;
    import java.util.concurrent.TimeUnit;
    
    /**
     * 延迟执行的消息
     * @author K. L. Mao
     * @create 2018/12/19
     */
    public class Message implements Delayed {
    
        private static final String ROUTER_SUCCESS_CODE = "ROUTE_0000";
    
    
        // 单位 秒
        private static final int[] delayTimeArray = {0, 3, 15, 60, 300, 600, 900, 900, 900, 900};
    
        private static final int TIME_UNIT = 1000;
    
        // 支付流水号
        private String orderNo;
        // 通知数组下标
        private int notifyIndex;
        // 执行时间 纳秒
        private long activeTime;
        // 结果码
        private String resultCode;
    
        public String getOrderNo() {
            return orderNo;
        }
    
        public int getNotifyIndex() {
            return notifyIndex;
        }
    
        public long getActiveTime() {
            return activeTime;
        }
    
        public String getResultCode() {
            return resultCode;
        }
    
        public Message() {
        }
    
        public Message(String orderNo) {
            this(orderNo, ROUTER_SUCCESS_CODE);
        }
    
        public Message(String orderNo, String resultCode) {
            this.orderNo = orderNo;
            this.resultCode = resultCode;
            this.activeTime = System.currentTimeMillis() + delayTimeArray[notifyIndex] * TIME_UNIT;
        }
    
        /**
         * 递增通知次数
         */
        public void increaseNotify() {
            notifyIndex++;
            this.activeTime = System.currentTimeMillis() + delayTimeArray[notifyIndex] * TIME_UNIT;
        }
    
        public int getArrayLength(){
            return delayTimeArray.length;
        }
    
        /**
         * 方法返回一个小于等于 0 的值时,将发生到期
         * @param unit
         * @return
         */
        @Override
        public long getDelay(TimeUnit unit) {
            return this.activeTime - System.currentTimeMillis();
        }
    
        /**
         * 队列元素根据执行时间的大小正序排列,即执行时间最小的在队首
         * @param delayed
         * @return
         */
        @Override
        public int compareTo(Delayed delayed) {
            Message msg = (Message) delayed;
            long timeout = this.activeTime - msg.getActiveTime();
            return timeout > 0 ? 1 : timeout < 0 ? -1 : 0;
        }
    
    }
    
    

    Message 实现 Delayed 接口,并且重写 getDelay 和 compareTo 两个方法。当 getDelay 的值小于等于0时,说明该队列已经不再延迟,可以出队列。compareTo 方法是为了队列元素的排序,我们希望的是队列元素根据执行时间的大小正序排列,即执行时间最小的在队首。
    声明一个抽象类,来定义 DelayQueue:

    package yongda.message;
    
    import yongda.vo.Message;
    
    import java.util.concurrent.DelayQueue;
    
    public abstract class AbstractDelayMessage {
    
        protected static DelayQueue<Message> queue = new DelayQueue<>();
    
        /**
         * 再次通知
         * @param message
         */
        protected void againNotify(Message message) {
            message.increaseNotify();
            queue.add(message);
        }
    
    }
    
    

    消息处理类 DelayMessageService ,是获取到消息之后的通知业务方操作;

    package yongda.baseService;
    
    import com.alibaba.dubbo.config.annotation.Reference;
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.scheduling.annotation.Async;
    import org.springframework.stereotype.Service;
    import yongda.cashier.pay.facade.CmfOrderFacade;
    import yongda.cashier.pay.vo.CmfOrderVO;
    import yongda.constant.Constants;
    import yongda.message.AbstractDelayMessage;
    import yongda.service.OrderService;
    import yongda.vo.Message;
    
    /**
     * @author K. L. Mao
     * @create 2018/12/20
     */
    @Slf4j
    @Service
    public class DelayMessageService extends AbstractDelayMessage {
    
        @Reference(version = Constants.DUBBO_VERSION)
        private CmfOrderFacade cmfOrderFacade;
        @Reference(version = Constants.DUBBO_VERSION)
        private OrderService orderService;
    
        /**
         * 处理消息
         *
         * @param message
         */
        @Async
        public void dealDelayMessage(Message message) {
            log.info("【{}】异步处理延迟消息, 通知次数:{}", message.getOrderNo(), message.getNotifyIndex() + 1);
            String orderNo = message.getOrderNo();
            CmfOrderVO cmfOrderVO = cmfOrderFacade.acquireCmfOrderByPaymentSeqNo(orderNo);
            if (cmfOrderVO == null) {
                return;
            }
            JSONObject jsonObject = orderService.notifyInvoker(cmfOrderVO, message.getResultCode());
            log.info("接收方返回信息:{}", jsonObject);
            if (jsonObject == null || !Constants.CHANNEL_SUCCESS_CODE.equals(jsonObject.getString("code"))) {
                if (message.getNotifyIndex() >= message.getArrayLength() - 1) {
                    log.warn("【{}】异步通知次数大于[{}]次,不再通知。", message.getOrderNo(), message.getArrayLength());
                    return;
                }
                againNotify(message);
                return;
            }
            log.info("【{}】异步处理延迟消息,接收到业务方回馈信息,不再通知。", message.getOrderNo());
        }
    }
    

    这里采用了异步线程的注解@Async,需要添加 AsyncConfig;

    package yongda.config;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.scheduling.annotation.EnableAsync;
    import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
    
    import java.util.concurrent.Executor;
    import java.util.concurrent.ThreadPoolExecutor;
    
    /**
     * 异步处理配置
     */
    @Configuration
    @EnableAsync
    public class AsyncConfig {
        /**
         * 核心线程数,默认情况下核心线程数会一直存活,即使处于闲置状态也不会受keepAliveTime限制
         */
        private int corePoolSize = 10;
        /**
         * 最大线程数
         */
        private int maxPoolSize = 200;
        /**
         * 队列容量
         */
        private int queueCapacity = 100;
    
        private String ThreadNamePrefix = "AsyncExecutor-";
    
        @Bean
        public Executor asyncExecutor() {
            ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(corePoolSize);
            executor.setMaxPoolSize(maxPoolSize);
            executor.setQueueCapacity(queueCapacity);
            executor.setThreadNamePrefix(ThreadNamePrefix);
    
            // rejection-policy:当pool已经达到max size的时候,怎样处理新任务
            // CALLER_RUNS:不在新线程中运行任务。而是由调用者所在的线程来运行
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
            executor.initialize();
            return executor;
        }
    
    }
    

    声明一个消费者,死循环来监听延迟队列,一有元素到了激活时间就消费,监听操作新建一个线程处理:

    package yongda.message;
    
    import lombok.extern.slf4j.Slf4j;
    import yongda.baseService.DelayMessageService;
    import yongda.vo.Message;
    
    /**
     * @author K. L. Mao
     * @create 2018/12/20
     */
    @Slf4j
    public class DelayMessageConsumer extends AbstractDelayMessage implements Runnable {
    
        private DelayMessageService delayMessageService;
    
        public DelayMessageConsumer(DelayMessageService delayMessageService){
            this.delayMessageService = delayMessageService;
        }
    
        @Override
        public void run() {
            Message message = new Message();
            while(true){
                try {
                    message = queue.take();
                    delayMessageService.dealDelayMessage(message);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }catch (Exception e) {
                    log.error("【{}】异步通知异常,message:{}", message.getOrderNo(), e.getMessage());
                    againNotify(message);
                }
            }
        }
    
    }
    

    让监听工作随着程序的启动就开始:

    package yongda.config;
    
    import org.springframework.context.ApplicationListener;
    import org.springframework.context.event.ContextRefreshedEvent;
    import org.springframework.stereotype.Component;
    import yongda.message.DelayMessageConsumer;
    import yongda.baseService.DelayMessageService;
    
    import javax.annotation.Resource;
    
    /**
     * 当spring容器初始化完成后,执行 Consumer 监听
     *
     * @author K. L. Mao
     * @create 2018/11/12
     */
    @Component
    public class ConsumerBootstrap implements ApplicationListener<ContextRefreshedEvent> {
    
        @Resource
        private DelayMessageService delayMessageService;
    
        @Override
        public void onApplicationEvent(ContextRefreshedEvent event) {
            try {
                new Thread(new DelayMessageConsumer(delayMessageService)).start();
            }catch (Exception e){
                e.printStackTrace();
            }
        }
    }
    
    

    至此,我们往 DelayQueue 中添加 Message 即可:queue.add(message);
    注:本示例中关于Dubbo的知识点请自行查阅资料。

    2、Spring-Retry

    spring retry是从spring batch独立出来的一个能功能,主要实现了重试和熔断。在spring retry中可以指定需要重试的异常类型,并设置每次重试的间隔以及如果重试失败是继续重试还是熔断(停止重试)。
    添加 spring retry 依赖:

            <!-- Spring Retry -->
            <dependency>
                <groupId>org.springframework.retry</groupId>
                <artifactId>spring-retry</artifactId>
            </dependency>
    

    入口类:

    package yongda;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.retry.annotation.EnableRetry;
    
    @SpringBootApplication
    @EnableRetry
    public class NotifyApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(NotifyApplication.class, args);
        }
    }
    

    @EnableRetry 开启重试机制。
    服务类,RemoteService:

    package yongda.baseService;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.remoting.RemoteAccessException;
    import org.springframework.retry.annotation.Backoff;
    import org.springframework.retry.annotation.Recover;
    import org.springframework.retry.annotation.Retryable;
    import org.springframework.stereotype.Service;
    
    import java.time.LocalDateTime;
    
    /**
     * @author K. L. Mao
     * @create 2018/12/25
     */
    @Slf4j
    @Service
    public class RemoteService {
    
        /**
         * @Retryable
         * value: 指定发送的异常进行重试,如果未抛出指定异常,则不重试
         * include: 和 value 一样,默认为空,当 exclude 也为空时,所有异常都重试
         * exclude: 指定异常不重试
         * maxAttemps: 重试次数,默认为3
         * backoff: 重试补偿机制,默认没有
         *
         * @Backoff
         * delay: 指定延迟时间,单位毫秒
         * multiplier: 指定延迟的倍数,比如 delay = 1000, multiplier = 2 时,
         * 第一次重试为 1 秒,第二次重试为 2 秒,第三次重试为 4 秒
         *
         */
        @Retryable(value = RemoteAccessException.class, maxAttempts = 30, backoff = @Backoff(delay = 2*60*1000, multiplier = 1))
        public void call() {
            log.info(LocalDateTime.now() + " do something...");
            throw new RemoteAccessException("异步通知异常");
        }
    
        /**
         * 当重试到达指定次数时,被注解的方法将被回调,可以在该方法进行日志处理。
         * 需要注意的是发生的异常和入参类型一致时才会回调。
         * @param e
         */
        @Recover
        public void recover(RemoteAccessException e) {
            log.info(e.getMessage());
        }
    }
    
    

    @Retryable
    value:指定发送的异常进行重试,如果未抛出指定异常,则不重试;
    include: 和 value 一样,默认为空,当 exclude 也为空时,所有异常都重试;
    exclude: 指定异常不重试;
    maxAttemps: 重试次数,默认为3;
    backoff: 重试补偿机制,默认没有。

    @Backoff
    delay: 指定延迟时间,单位毫秒;
    multiplier: 指定延迟的倍数,比如 delay = 1000, multiplier = 2 时, 第一次重试为 1 秒,第二次重试为 2 秒,第三次重试为 4 秒。

    @Recover
    当重试到达指定次数时,被注解的方法将被回调,可以在该方法进行日志处理。
    需要注意的是发生的异常和入参类型一致时才会回调。

    • spring retry的核心原理
      以Cglib作为代理工具,先来写个Callback实现,这也是重试的实现的核心逻辑
    package retry.interceptor;
    import net.sf.cglib.proxy.MethodInterceptor;
    import net.sf.cglib.proxy.MethodProxy;
    import retry.annotation.Retryable;
     
    import java.lang.reflect.Method;
     
    /**
     * @author K. L. Mao
     * @create 2019/1/3
     */
    public class AnnotationAwareRetryOperationsInterceptor implements MethodInterceptor{
     
      //记录重试次数
      private int times = 0;
     
      @Override
      public Object intercept(Object obj, Method method, Object[] args, MethodProxy proxy) throws Throwable {
        //获取拦截的方法中的Retryable注解
        Retryable retryable = method.getAnnotation(Retryable.class);
        if(retryable == null){
          return proxy.invokeSuper(obj,args);
        }else{ //有Retryable注解,加入异常重试逻辑
          int maxAttemps = retryable.maxAttemps();
          try {
            return proxy.invokeSuper(obj,args);
          } catch (Throwable e) {
            if(times++ == maxAttemps){
              System.out.println("已达最大重试次数:" + maxAttemps + ",不再重试!");
            }else{
              System.out.println("调用" + method.getName() + "方法异常,开始第" + times +"次重试。。。");
              //注意这里不是invokeSuper方法,invokeSuper会退出当前interceptor的处理
              proxy.invoke(obj,args);
            }
          }
        }
        return null;
      }
    }
    

    然后是写个代理类,使用AnnotationAwareRetryOperationsInterceptor作为拦截器

    package retry.core;
    import net.sf.cglib.proxy.Enhancer;
    import retry.interceptor.AnnotationAwareRetryOperationsInterceptor;
     
    /**
     * @author K. L. Mao
     * @create 2019/1/3
     */
    public class SpringRetryProxy {
    
        @SuppressWarnings("unchecked")
        public <T> T getProxy(Class<T> clazz) {
            Enhancer enhancer = new Enhancer();
            enhancer.setSuperclass(clazz);
            enhancer.setCallback(new AnnotationAwareRetryOperationsInterceptor());
            return (T) enhancer.create();
        }
    }
    

    3、Redis 的发布订阅

    Redis提供了发布订阅功能,可以用于消息的传输,Redis的发布订阅机制包括三个部分,发布者,订阅者和Channel。



    发布者和订阅者都是Redis客户端,Channel则为Redis服务器端,发布者将消息发送到某个的频道,订阅了这个频道的订阅者就能接收到这条消息。Redis的这种发布订阅机制与基于主题的发布订阅类似,Channel相当于主题。
    所以,我们可以利用 Redis 的发布订阅功能,监听 key 过期事件,从而实现延迟处理消息的场景。

    首先,修改配置文件redis.conf中的:notify-keyspace-events Ex,默认为notify-keyspace-events " ";
    配置参数说明:

    # K    键空间通知,以__keyspace@<db>__为前缀 
    # E    键事件通知,以__keysevent@<db>__为前缀
    # g    del , expipre , rename 等类型无关的通用命令的通知, ...  
    # $    String命令  
    # l    List命令  
    # s    Set命令  
    # h    Hash命令  
    # z    有序集合命令  
    # x    过期事件(每次key过期时生成)  
    # e    驱逐事件(当key在内存满了被清除时生成)  
    # A    g$lshzxe的别名,因此”AKE”意味着所有的事件
    

    所以, Ex 表示监听 key 过期事件。修改好配置文件后,redis会对设置了 expire 的数据进行监听,当redis中的数据超时失效时,将会通知客户端。
    接下来就是要设置客户端代码,让其监听服务端发来的 key 过期事件。由于 Redis 与 Spring 集成有两种方式,一种是 spring-data-redis,一种是 jedis。不同的集成方式,客户端监听方式也不一样。

    • 与 spring-data-redis 集成
      spring-data-redis:通过org.springframework.data.redis.connection.jedis.JedisConnectionFactory来管理,即通过工厂类管理,然后通过配置的模版bean(RedisTemplate),操作redis服务。
      首先,自定义一个监听器,让其实现 KeyExpirationEventMessageListener:
    package yongda.config.redis;
    
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.data.redis.connection.Message;
    import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    import yongda.enums.CacheTypeEnum;
    
    /**
     * Key 过期会触发该事件
     * @author K. L. Mao
     * @create 2019/1/3
     */
    @Slf4j
    public class TopicMessageListener extends KeyExpirationEventMessageListener {
        public TopicMessageListener(RedisMessageListenerContainer listenerContainer) {
            super(listenerContainer);
        }
    
        /**
         * 针对过期事件处理业务逻辑
         * @param message
         * @param pattern
         */
        @Override
        public void onMessage(Message message, byte[] pattern) {
            // message.toString() 可以获取失效的 key
            String expiredKey = message.toString();
            log.info("过期的 key :{}", expiredKey);
            if (expiredKey.startsWith(CacheTypeEnum.DELAY_ORDER.getText())){
                log.info("收到延迟的消息,开始处理...");
    
            }
    
        }
    }
    
    

    然后将自定义的监听器 TopicMessageListener 放入 RedisMessageListenerContainer 容器管理:

    package yongda.config.redis;
    
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    import org.springframework.data.redis.connection.RedisConnectionFactory;
    import org.springframework.data.redis.listener.PatternTopic;
    import org.springframework.data.redis.listener.RedisMessageListenerContainer;
    
    /**
     * 配置监听频道 与spring-data-redis(不建议采用这个,因为代码段中充斥大量与业务无关的模版片段代码,代码冗余,不易维护)
     * @author K. L. Mao
     * @create 2019/1/3
     */
    @Configuration
    public class RedisListenerConfig {
    
        private static final String EXPIRED_KEY_TOPIC = "__keyevent@1__:expired";
    
        @Bean
        public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory){
            RedisMessageListenerContainer container = new RedisMessageListenerContainer();
            container.setConnectionFactory(connectionFactory);
            /**
             *  __keyevent@<db>__:expired
             *  格式是固定的,db是数据库编号,由于订阅开启之后这个库的所有key过期时间都会被推送过来,所以最好单独使用一个数据库来进行隔离,这里用的是数据库1。
             */
            container.addMessageListener(new TopicMessageListener(container), new PatternTopic(EXPIRED_KEY_TOPIC));
            return container;
        }
    
    }
    
    
    • 与 jedis 集成
      jedis:通过redis.clients.jedis.JedisPool来管理,即通过池来管理,通过池对象获取jedis实例,然后通过jedis实例直接操作redis服务,剔除了与业务无关的冗余代码。
      同样自定义一个监听器,实现 JedisPubSub 接口:
    package yongda.config.redis;
    
    import lombok.extern.slf4j.Slf4j;
    import redis.clients.jedis.JedisPubSub;
    import yongda.enums.CacheTypeEnum;
    
    /**
     * 与JedisPool集成,建议使用,代码简单,目标明确。
     * 修改配置文件 redis.conf 中的:notify-keyspace-events 为 Ex,默认为notify-keyspace-events ""
     * 然后在项目启动时,订阅该监听事件,并且 pattern 为:__keyevent@0__:expired:
     * jedis.psubscribe(new KeyExpiredListener(), "__keyevent@0__:expired");
     * @author K. L. Mao
     * @create 2019/1/3
     */
    @Slf4j
    public class KeyExpiredListener extends JedisPubSub {
    
        /**
         * 触发 key 过期事件
         * @param pattern
         * @param channel
         * @param message
         */
        @Override
        public void onPMessage(String pattern, String channel, String message) {
            System.out.println(
                    "pattern = [" + pattern + "], channel = [" + channel + "], message = [" + message + "]");
            log.info("过期的 key :{}", message);
            if (message.startsWith("delayOrder:"){
                log.info("收到延迟的消息,开始处理...");
    
            }
        }
    
        /**
         * 订阅该channel
         * @param channel
         * @param subscribedChannels
         */
        @Override
        public void onSubscribe(String channel, int subscribedChannels) {
            System.out.println("onPSubscribe " + channel + " " + subscribedChannels);
        }
    }
    

    然后在项目启动时,订阅该监听事件,如 pattern 为:__keyevent@1__:expired:
    jedis.psubscribe(new KeyExpiredListener(), "__keyevent@1__:expired");
    小结:现在大多数公司并不使用 spring-data-redis,而是使用 Jedis,因为前者代码段中充斥大量与业务无关的模版片段代码,代码冗余,不易维护。

    4、Rabbitmq 的延迟队列

    Rabbitmq 的延迟队列可以实现延迟的消息,请参考我的另一篇文章《RabbitMQ详解以及和SpringBoot整合》

    各个方案的优缺点

    DelayQueue 和 Spring-Retry 的消息是存储在内存中的,故当程序重启时,会丢失消息,并且是单机模式的,不适用集群环境;
    Redis 的发布订阅功能对于数据量不大的场景,是能够应付的。但是数据量很大的场景,由于发布订阅的功能会影响 Redis 的主功能(缓存)的性能,所以在大数据量场景下是不适合使用的;

    Redis 发布订阅的缺点:
    1、数据可靠性无法保证
    一个redis-cli发布消息n个redis-cli接受消息。消息的发布是无状态的,即发布完消息后该redis-cli便在理会该消息是否被接受到,是否在传输过程中丢失,即对于发布者来说,消息是”即发即失”的.
    2、扩展性太差
    不能通过增加消费者来加快消耗发布者的写入的数据,如果发布者发布的消息很多,则数据阻塞在通道中已等待被消费着来消耗。阻塞时间越久,数据丢失的风险越大(网络或者服务器的一个不稳定就会导致数据的丢失)
    3、资源消耗较高
    在pub/sub中消息发布者不需要独占一个Redis的链接,而消费者则需要单独占用一个Redis的链接,在java中便不得独立出分出一个线程来处理消费者。这种场景一般对应这多个消费者,此时则有着过高的资源消耗。

    Rabbitmq 的延迟消息,其能够适应对数据一致性、稳定性和可靠性要求很高的场景,所以是业界大厂主流的方案。

    相关文章

      网友评论

          本文标题:异步消息系统的几种实现

          本文链接:https://www.haomeiwen.com/subject/gqdqrqtx.html