参考视频:https://www.imooc.com/video/17854
实现思路:
一个order表和一个记录消息的表。
每次新增一条订单时新增一条订单记录和一条消息记录,设置初始的重试状态为0次,投递状态为投递中,
写一个定时器每10秒钟扫描,将过期的消息并且投递状态为投递中的重新投递,超过三次投递的将投递状
态设置为投递失败,人为的去处理该消息。
利用rabbitmq的confirm消息确认机制。每次ack成功则将borker_msg的投递状态改为投递成功
个人疑问:
这样的设计方案并没有感觉什么必要,不如写一个定时器定时扫描未处理的订单,执行方法去修改而已,这样可以防止未处理的订单。说到底,投递消息也不过是要处理订单而已。
以下是实现代码:
sql文件
CREATE TABLE orders (
id BIGINT AUTO_INCREMENT PRIMARY KEY COMMENT '订单id',
msg VARCHAR(100)
)
CREATE TABLE borker_msg(
msg_id VARCHAR(64) COMMENT '消息id',
order_id BIGINT COMMENT '订单id',
retry int COMMENT '重试次数',
next_retry_time datetime COMMENT '下次重试时间',
send_status int DEFAULT 0 COMMENT '投递状态 0:投递中,1:投递成功,2:投递失败'
)
rabbitconfig文件
package com.ganlong.config;
import java.util.HashMap;
import java.util.Map;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @title rabbitmq配置类
* @author yl
* @time 2019年5月9日
* @Description
*/
@Configuration
public class RabbitConfig {
// 创建一个队列
@Bean
public Queue demoQueue() {
// 第一个参数是创建的queue的名字,第二个参数是是否支持持久化
return new Queue("demo_queue", true);
}
/**
* 交换机
*
* @return
*/
@Bean
public DirectExchange demoExchange() {
return new DirectExchange("demo_exchange");
}
/**
* 将交换机和队列绑定
*
* @return
*/
@Bean
public Binding bindingNotify() {
return BindingBuilder.bind(demoQueue()).to(demoExchange()).with("key");
}
}
消费者
@Component
public class Receiver {
@RabbitListener(queues = "demo_queue")
public void get(Message message, Channel channel) throws IOException {
String msg=new String(message.getBody(),"utf-8");
System.out.println(message.getBody().toString());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}
发送消息方法
package com.ganlong.config;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.ganlong.meta.DO.BorkerMsg;
import com.ganlong.repo.BrokerMsgRepo;
@Component
public class SendMsg {
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private BrokerMsgRepo brokerMsgRepo;
public void sendDelayMessageByPlugins(Integer orderId,String msg){
//绑定异步监听回调函数
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
//修改状态为已投递
BorkerMsg borkerMsg=brokerMsgRepo.findByOrderId(orderId);
borkerMsg.setSendStatus(1);
brokerMsgRepo.save(borkerMsg);
System.out.println("投递成功");
} else {
System.out.println("投递失败");
}
});
CorrelationData correlationData=new CorrelationData();
correlationData.setId(orderId+"");
rabbitTemplate.convertAndSend("demo_exchange", "key", msg, correlationData);
}
}
BrokerMsgRepo
package com.ganlong.repo;
import java.util.List;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.stereotype.Repository;
import com.ganlong.meta.DO.BorkerMsg;
@Repository
public interface BrokerMsgRepo extends JpaRepository<BorkerMsg, String>{
BorkerMsg findByOrderId(Integer orderId);
@Query(value="SELECT * FROM `borker_msg` WHERE borker_msg.`send_status`=0 AND `next_retry_time` <NOW() ",nativeQuery=true)
List<BorkerMsg> findByNextRetryTime();
}
OrderRepo
package com.ganlong.repo;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;
import com.ganlong.meta.DO.OrdersDO;
@Repository
public interface OrderRepo extends JpaRepository<OrdersDO,Integer>{
}
orderService
@Service
@Transactional(rollbackOn= {Exception.class})
public class OrderService {
@Autowired
private OrderRepo orderRepo;
@Autowired
private BrokerMsgRepo brokerMsgRepo;
@Autowired
private SendMsg sendMsg;
public String createOrder() {
OrdersDO ordersDO=new OrdersDO();
ordersDO.setMsg("消息");
OrdersDO order=orderRepo.save(ordersDO);
BorkerMsg borkerMsg=new BorkerMsg();
borkerMsg.setMsgId(UUID.randomUUID().toString());
borkerMsg.setOrderId(order.getId());
borkerMsg.setRetry(0);
borkerMsg.setSendStatus(0);
borkerMsg.setNextRetryTime(LocalDateTime.now().plusMinutes(1L));
System.out.println("超时="+borkerMsg.getNextRetryTime());
brokerMsgRepo.save(borkerMsg);
sendMsg.sendDelayMessageByPlugins(order.getId(), "消息");
return "创建成功";
}
}
timer定时器
package com.ganlong.repo;
import java.time.LocalDateTime;
import java.util.List;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.EnableScheduling;
import org.springframework.scheduling.annotation.Scheduled;
import com.ganlong.config.SendMsg;
import com.ganlong.meta.DO.BorkerMsg;
/**
* 定时任务:每10秒钟扫描未投递成功的
*/
@Configuration
@EnableScheduling
public class Timer {
@Autowired
private BrokerMsgRepo brokerMsgRepo;
@Scheduled(cron = "*/10 * * * * ?")
public void reSend() {
System.out.println("定时任务执行");
//将正在投递的,且超时了的消息取出。
List<BorkerMsg> timeOutMsgList=brokerMsgRepo.findByNextRetryTime();
for (BorkerMsg borkerMsg : timeOutMsgList) {
if(borkerMsg.getRetry()<3) {
//重新投递
System.out.println("borkerMsg="+borkerMsg.getOrderId());
new SendMsg().sendDelayMessageByPlugins(borkerMsg.getOrderId(), "消息");
//投递次数增加1,超时时间往后推一分钟
borkerMsg.setRetry(borkerMsg.getRetry()+1);
borkerMsg.setNextRetryTime(LocalDateTime.now().plusMinutes(1));
}else {
borkerMsg.setSendStatus(2);
}
brokerMsgRepo.save(borkerMsg);
}
}
}
网友评论