日常开发中,可能会遇到一些延迟处理的消息任务,例如以下场景
①订单支付超时未支付
②考试时间结束试卷自动提交
③身份证或其他验证信息超时未提交等场景。
④用户申请退款,一天内没有响应默认自动退款等等。
如何处理这类任务,最简单的方法就是将消息插入到数据库,然后使用定时任务扫描数据库。但是如果如果大量用户请求需要处理,就需要线程频繁的连接数据库,这样可能会对其他数据库请求造成影响,这样情况下我们可以使用延迟队列方式解决此类问题。
1.DelayQueue实现方案
首先使用java自带的DelayQueue完成此方案。
DelayQueue内部使用优先级队列PriorityQueue完成任务存储,而 PriorityQueue 采用二叉堆的思想确保在数据插入到队列中时最小值的排在堆顶,每次从拿数据只要从堆顶取即可。
同时 DelayQueue 还是用了可重入锁 ReentrantLock来确保线程并发安全。
DelayQueue的源码解析可以查看DelayQueue源码解析
使用DelayQueue完成延迟需要定义Delayed 实现类来充当任务元素,具体使用方法:
//DelayQueue的元素必须是Delayed的实现类 class DelayTask implements Delayed { private long time; private Consumer consumer; public DelayTask(long time, Consumer consumer) { this.time = time; this.consumer = consumer; } @Override public long getDelay(TimeUnit unit) { return time - System.currentTimeMillis(); } @Override public int compareTo(Delayed o) { DelayTask delayTask = (DelayTask) o; return (int) (time - delayTask.getTime()); } public long getTime() { return time; } public void call() { this.consumer.accept(this); } } public class QueueTest { public static void main(String[] args) throws InterruptedException { long startTime = System.currentTimeMillis(); DelayTask d1 = new DelayTask(10000 + startTime, (o) -> System.out.println("3333")); DelayTask d2 = new DelayTask(1000 + startTime, (o) -> System.out.println("1111")); DelayTask d3 = new DelayTask(2000 + startTime, (o) -> System.out.println("2222")); DelayQueue<DelayTask> delayQueue = new DelayQueue<>(); delayQueue.add(d1); delayQueue.add(d2); delayQueue.add(d3); while (!delayQueue.isEmpty()) { //阻塞等待,如果有任务到期就取出,如果没有任务到期就等待 DelayTask delayTask = delayQueue.take(); delayTask.call(); } } }
①
getDelay()
方法用于从队列中取任务时查看是否到期,如果小于等于0则表示可以取出,如果大于,当前线程需要根据是否是leader判断等待时间。
②compareTo()
方法用于任务入队时,判断该任务元素在堆位置时比较的逻辑。
③Consumer consumer;
存储实际执行的任务,也可以使用Runnable,Callable以及其他自定义类。
note:该方法支持动态添加和删除任务,而且线程安全,但是只适用于单机环境,而且需要自己定义查询逻辑,实现稍微复杂。
2.定时任务实现方案
通过线程池对象ScheduledExecutorService
也可以实现延迟处理任务的功能,而且操作更简单。ScheduledExecutorService
是jdk提供的类来完成指定时间或定期执行某些任务。代码如下:
class Task implements Callable { private int idx; public Task(Integer idx) { this.idx = idx; } @Override public Object call() throws Exception { System.out.println("---" + this.idx); return null; } } public class DelayedTest { public static void main(String[] args) { ScheduledExecutorService scheduledExecutorService = new ScheduledThreadPoolExecutor(10); scheduledExecutorService.schedule(new Task(3), 1, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(2), 2, TimeUnit.SECONDS); scheduledExecutorService.schedule(new Task(1), 1, TimeUnit.SECONDS); } }
ScheduledExecutorService
继承了ExecutorService
,与ExecutorService
的逻辑大致相同
①schedule()
方法是ScheduledExecutorService
特有的方法,这个方法会将我们定义的Task
封装成ScheduledFutureTask
②然后生成Worker线程(内部存在一个Thread,是真正的执行类)
③Worker线程在执行的时候会先判断当前firstTask(就是要执行的Runnable)属性是否为空,
如果有就先执行firstTask,执行完成firstTask之后,然后再从workQueue中取任务
,红字也是ExecutorService
的执行逻辑,但是ScheduledExecutorService的
schedule()
会先生成null 的firstTask,Worker会直接从workQueue
中阻塞的获取任务Worker类获取task并执行的逻辑 workQueue获取task任务的逻辑
④workQueue是在我们new对象的时候生成的DelayedWorkQueue
,它的逻辑定义和DelayedQueue
基本相同,下面是DelayedWorkQueue
take()方法的代码逻辑
DelayedWorkQueue take()方法代码public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
scheduleAtFixedRate()
是在Worker执行完task后,计算任务下次的执行时间,并重新将任务放入workQueue中来实现循环执行。下面的代码就是ScheduledFutureTask
再执行过程中判断逻辑,如果periodic是true
则执行run方法。如果period是false
则执行计算下次执行时间和重新放入任务的逻辑。
定时器ScheduledExecutorService原理分析
note:ScheduledExecutorService
是jdk提供的非常方便的延迟消息处理类,支持多线程处理消息,前一个任务的阻塞并不会影响下一个任务的运行。内部使用的是类似DelayQueue
的逻辑,而且不需要再实现轮询过程。但是它和DelayQueue
一样,只能单机使用。
3.Redis实现方案
通过Redis的Zset结构也可以实现延迟队列的功能。通过将过期时间的时间戳作为score存入Zset中,然后调用zrangebyscore key 0 当前时间
命令定时扫描Zset数据,如果返回的结果那一定是已经过期的数据,然后再执行删除命令删除指定的key-value。为了防止多线程执行过程中可能存在的问题,需要配置lua脚本使用。
//lua脚本:定义查询zset数据和删除数据的原子操作 //定义查询的最大值和最小值 local minscore = ARGV[1] local maxscore = ARGV[2] local key = KEYS[1] local tables = redis.call("zrangebyscore", key, minscore, maxscore) for i, value in ipairs(tables) do redis.call("zrem", key, value) end return tables
//java代码:定义轮询线程 @Test public void test() throws InterruptedException { String script = "lua脚本"; String key = "test"; long time = System.currentTimeMillis(); redisTemplate.opsForZSet().add(key, "123", time + 1000 * 3); Thread scanExpireThread = new Thread(() -> { System.out.println("开始扫描过期数据..."); while (true) { try { long currentTime = System.currentTimeMillis(); long count = redisTemplate.opsForZSet().count(key, 0, currentTime); if (count == 0) { // 查询最小等待时间并睡眠,减少cpu空转 sleep(key); } System.out.println("获取数据..."); RedisScript redisScript = new DefaultRedisScript(script, List.class); List<String> expireList = (List) redisTemplate.execute(redisScript, Arrays.asList(key), 0, >System.currentTimeMillis()); System.out.println(expireList); if (!CollectionUtils.isEmpty(expireList)) { String msg = RandomStringUtils.random(3, "1234567890"); Integer delayedTime = RandomUtils.nextInt(0, 10); System.out.println("随机生成延迟信息:" + msg + ", 延迟时间:" + delayedTime); redisTemplate.opsForZSet().add(key, msg, System.currentTimeMillis() + 1000 * >delayedTime); } else { // 查询最小等待时间并睡眠,减少cpu空转 sleep(key); } TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } } }); scanExpireThread.start(); TimeUnit.MINUTES.sleep(10); } public void sleep(String key) throws InterruptedException { Set<DefaultTypedTuple> objs = redisTemplate.opsForZSet().rangeWithScores(key, 0, 0); for (DefaultTypedTuple<String> typedTuple : objs) { Long minTime = typedTuple.getScore().longValue(); long diffTime = minTime - System.currentTimeMillis(); TimeUnit.MINUTES.sleep(diffTime / (1000 * 60)); } }
以上的示例代码只是延迟队列的简单时间。并没有考虑任务失败重试的问题。而且上面的方案还可以优化,比如当获取的元素的集合是空的时候,可以使用LockSupport.park()
阻塞线程。只有有延迟任务被推送到redis中时,才重新唤醒轮询线程,避免轮询线程空转。
note:redis实现版本中,并发性高。需要自己定义轮询线程。在消息量较少的时候,会浪费资源,在消息量非常多的时候,又会出现因为轮询间隔设置不合理导致延时时间不准确的问题。
4.Rabbitmq/Rocketmq实现
很多MQ消息中间件自带延迟消息功能,如果系统本身RocketMQ组件,则可以使用MQ来完成。不仅使用方便,而且可能存在的诸多细节问题。
①RocketMQ
RocketMQ本身支持延迟消息功能,但是RocketMQ4.x只支持固定级别的延迟消息,并没有自定义延迟时间的功能。如果想实现自定义延迟消息的功能,可以使用Rocket5.x或者RabbitMQ
实现原理:RocketMQ实现延迟消息的过程是先将消息写入到SCHEDULE_TOPIC_XXXX的topic中,然后根据 level 存入特定的queue,每个queue都有一个调度线程消费消息,如果发现消息到期,就会将消息投递到指定的topic中。
rocketmq延迟消息过程
以下是RocketMQ5.x文档中的示例程序:
//生产者 延时消息发送 MessageBuilder messageBuilder = new MessageBuilderImpl();; //以下示例表示:延迟时间为10分钟之后的Unix时间戳。 Long deliverTimeStamp = System.currentTimeMillis() + 10L * 60 * 1000; Message message = messageBuilder.setTopic("topic") //设置消息索引键,可根据关键字精确查找某条消息。 .setKeys("messageKey") //设置消息Tag,用于消费端根据指定Tag过滤消息。 .setTag("messageTag") .setDeliveryTimestamp(deliverTimeStamp) //消息体 .setBody("messageBody".getBytes()) .build(); try { //发送消息,需要关注发送结果,并捕获失败等异常。 SendReceipt sendReceipt = producer.send(message); System.out.println(sendReceipt.getMessageId()); } catch (ClientException e) { e.printStackTrace(); } //消费示例一:使用PushConsumer消费定时消息,只需要在消费监听器处理即可。 MessageListener messageListener = new MessageListener() { @Override public ConsumeResult consume(MessageView messageView) { System.out.println(messageView.getDeliveryTimestamp()); //根据消费结果返回状态。 return ConsumeResult.SUCCESS; } };
note:RocketMQ的实现版本性能较好,可靠性较高,但是不支持动态添加或删除队列。
RabbitMQ
RabbitMQ也可以根据自身的特性实现延迟消息的功能。比如利用RabbitMQ的TTL和DLX特性
TTL是指存活时间(可以作用在消息中,也可以作用在队列中)
DLX是指死信队列(是指消息被拒绝或者消息过期后存放的)
使用TTL与DLX存在的问题:
1)TTL作用在队列中,需要为每一个延迟时间定义一种队列,灵活性太差。
2)TTL作用在消息上,消息是在即将投递到消费者之前判定是否过期的,所以如果前一个消息阻塞了太长,将导致后面的消息不能即时的被执行。
而且使用上面的方式需要定义普通交换机和死信交换机,所以一般使用延迟消息插件 rabbitmq-delayed-message-exchange
来完成。使用插件生成的消息不会立即进入对应队列,而是先将消息保存至 Mnesia (RabbitMQ中的一种数据存储形式) ,然后插件会尝试确认是否过期,再投递到对应绑定的队列之中
下载地址: rabbitmq-delayed-message-exchange
插件使用步骤:
- 下载延迟插件,然后解压放置到 RabbitMQ 的插件目录。注意一定要解压并且把版本名字取消
如果是使用docker安装的rabbitmq,可以使用docker cp rabbitmq_delayed_message_exchange containerId:/RabbitMQ_HOME/plugins/
拷贝到RabbitMQ容器中。 - 进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
可以使用rabbitmq-plugins list
查看生效的插件,插件前面带上*号的才是真正生效的插件
RabbitMQ启用插件 - 最后重启 RabbitMQ,就可以在管理界面看到Type是
x-delayed-message
exchange
RabbitMQ管理界面
更加详细的安装步骤可以查看链接 RabbitMQ 学习笔记 -- 13 使用插件方式实现延迟队列
接下来就可以测试RabbitMQ的延迟消息功能了,以下是示例代码:
Rabbitmq配置
@Configuration public class RabbitMqConfig { //定义队列交换机,队列,路由 public static final String DELAYED_QUEUE = "delayed_queue"; public static final String DELAYED_EXCHANGE = "delayed_exchange"; public static final String DELAYED_ROUTINGKEY = "delayed_test"; @Bean(DELAYED_EXCHANGE) public Exchange DELAYED_EXCHANGE() { HashMap<String, Object> map = new HashMap<>(1); map.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", true, false, map); } @Bean(DELAYED_QUEUE) public Queue DELAYED_QUEUE() { return new Queue(DELAYED_QUEUE); } //队列绑定交换机, @Bean public Binding >BINDING_DELAYED_QUEUE(@Qualifier(DELAYED_QUEUE) Queue queue, @Qualifier(DELAYED_EXCHANGE) Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTINGKEY).noargs(); } }
延迟消息生产者
@RestController public class TestController { @Autowired private RabbitTemplate rabbitTemplate; @RequestMapping("/test") public String test(@RequestParam(required = false) Integer delay) { rabbitTemplate.convertAndSend(RabbitMqConfig.DELAYED_EXCHANGE, RabbitMqConfig.DELAYED_ROUTINGKEY, "hello", msg -> { //设置消息的延迟时间 msg.getMessageProperties().setDelay(delay * 1000); //设置优先级 msg.getMessageProperties().setPriority(9); //设置消息的持久化方式 >msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //设置唯一标识 msg.getMessageProperties().setMessageId(UUID.randomUUID().toString()); return msg; }); return "success"; } }
延迟消息消费者
@Component public class RabbitmqHandler { //监听delayed_test队列 @RabbitListener(queues = {RabbitMqConfig.DELAYED_QUEUE}) public void receive_delayed_test(Message message, Channel channel) { System.out.println("----delayed_test----"); System.out.println("properties:" + message.getMessageProperties().toString()); System.out.println("body:" + new String(message.getBody())); System.out.println(); } }
note:RabbitMQ支持集群,分布式,高并发场景,性能较好,可靠性高,不需要自己处理轮询线程。
网友评论