使用插件模式实现延时任务
rabbitmqConfig类:建立路由和队列并且绑定
package com.plugins.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @title rabbitmq配置类
* @author yl
* @time 2019年5月9日
* @Description
*/
@Configuration
public class RabbitConfig {
// 拼团超时延时交换机
public static final String DELAY_GROUP_ACTIVITTY_EXCHANGE = "DELAY_GROUP_ACTIVITTY_EXCHANGE";
// 拼团超时队列
public static final String DELAY_GROUP_ACTIVITTY_QUEUE = "DELAY_GROUP_ACTIVITTY_QUEUE";
// 拼团路由键
public static final String DELAY_GROUP_ACTIVITTY_ROUTING_KEY = "DELAY_GROUP_ACTIVITTY_ROUTING_KEY";
// 创建一个超时消费队列
@Bean
public Queue delayGroupActivityQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue(DELAY_GROUP_ACTIVITTY_QUEUE, true);
}
/**
* 死信交换机
*
* @return
*/
@Bean
public CustomExchange delayGroupActivityExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_GROUP_ACTIVITTY_EXCHANGE, "x-delayed-message", true, false, args);
}
/**
* 将交换机和队列绑定
*
* @return
*/
@Bean
public Binding bindingNotify() {
return BindingBuilder.bind(delayGroupActivityQueue()).to(delayGroupActivityExchange())
.with(DELAY_GROUP_ACTIVITTY_ROUTING_KEY).noargs();
}
}
Receiver:消费者类
package com.plugins.config;
import java.io.IOException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;
import com.rabbitmq.client.Channel;
@Component
public class Receiver {
@RabbitListener(queues = RabbitConfig.DELAY_GROUP_ACTIVITTY_QUEUE)
public void get(Message message, Channel channel) throws IOException {
String msg=new String(message.getBody(),"utf-8");
System.out.println(message.getBody().toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
sender:生产者类
package com.plugins.config;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
public class Sender {
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* 通过延迟消息插件发动延迟消息
* @param msg
* @param expiration
*/
public void sendDelayMessageByPlugins(Long groupActivityId,Long expiration){
//消息发送失败返回到队列中, yml需要配置 publisher-returns: true
// rabbitTemplate.setMandatory(true);
// // 消息返回, yml需要配置 publisher-returns: true
// rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
// String correlationId = message.getMessageProperties().getCorrelationId();
// System.out.println("消息发送失败"+replyText+exchange+routingKey);
// });
//绑定异步监听回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("成功");
} else {
System.out.println("失败");
}
});
rabbitTemplate.convertAndSend(RabbitConfig.DELAY_GROUP_ACTIVITTY_EXCHANGE,RabbitConfig.DELAY_GROUP_ACTIVITTY_ROUTING_KEY, groupActivityId,(message)->{
System.out.println("过期时间"+expiration);
message.getMessageProperties().setHeader("x-delay", expiration);//设置延迟时间
return message;
});
}
}
生产者类注释掉的代码因为即使消息发送成功也会触发setReturnCallback(消息未到队列时,触发该方法)这个方法,不知道怎么解决这个问题?而使用死信队列的方式不会有该问题。
网友评论