1、yml
rabbitmq:
host:192.168.62.129
port:5672
username:gtsy
password:gtsy
virtual-host:vhost
publisher-confirms:true
publisher-returns:true
2配置
2.1
@Configuration
@SuppressWarnings("SpringJavaAutowiringInspection")
public classRabbitConfig {
private staticLoggerlogger= LoggerFactory.getLogger(RabbitConfig.class);
private static finalStringqueueName="order_queue";
public static finalStringexchangeName="order_exchange";
public static finalStringroutingKey="gt.order.notify";
@Autowired
ConnectionFactoryconnectionFactory;
@Bean
publicTopicExchange topicExchange() {
TopicExchange topicExchange =newTopicExchange(exchangeName);
topicExchange.setDelayed(true);
topicExchange.setIgnoreDeclarationExceptions(true);
returntopicExchange;
}
@Bean
publicQueue queue() {
//持久化队列 第二个参数
return newQueue(queueName,true);
}
@Bean
Binding binding() {
returnBindingBuilder.bind(queue()).to(topicExchange()).with(routingKey);
}
@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
publicRabbitTemplate rabbitTemplate() {
RabbitTemplate template =newRabbitTemplate(connectionFactory);
template.setQueue(queueName);
template.setExchange(exchangeName);
returntemplate;
}
@Autowired
RabbitMqSenderrabbitMqSender;
@Bean
/**
* 监听队列 queue
*/
publicSimpleMessageListenerContainer messageContainer() {
SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);
container.setQueues(queue());
container.setExposeListenerChannel(true);
container.setMaxConcurrentConsumers(1);
container.setConcurrentConsumers(1);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认
container.setMessageListener(newChannelAwareMessageListener() {
public voidonMessage(Message message, com.rabbitmq.client.Channel channel)throwsException {
byte[] body = message.getBody();
logger.info("接受收到消息:-->消息Id{}\n\t消息内容:\n{}",message.getMessageProperties().getCorrelationIdString(),newString(body));
//开始处理body
JSONObject notifyJson =JSONObject.parseObject(newString(body));
String notifyUrl = notifyJson.getString("notifyUrl");
String notifyContent = notifyJson.getString("notifyContent");
// String result = HttpConnectionUtil.postMessage(notifyUrl, notifyContent);
String result ="";
if(StringUtils.isEmpty(result)) {// 通知失败 进入重发机制
intnewNotifyCount = notifyJson.getIntValue("notifyCount") +1;//已经通知的次数
if(newNotifyCount <4) {
notifyJson.put("notifyCount", newNotifyCount);
intspacingInterval = EnumNoticeDelayTime.geByCount(newNotifyCount).getSecond();
rabbitMqSender.send(notifyJson.toJSONString(),spacingInterval);
logger.info("发送了延迟消息ID【{}】\n发送时间 {}\n延迟{}秒\n第{}次",message.getMessageProperties().getCorrelationIdString(), DateUtils.simpleDatetimeFormatter().format(newDate()),spacingInterval/1000,newNotifyCount);
}else{
logger.info("通知5次都失败,等待后台手工处理!");
}
}
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费
}
});
returncontainer;
}
}
2.2
@Component
public classRabbitMqSenderimplementsRabbitTemplate.ConfirmCallback {
privateLoggerlogger= LoggerFactory.getLogger(RabbitMqSender.class);
@Autowired
privateRabbitTemplaterabbitTemplate;
public voidsend(String msg,intdelayTime){
CorrelationData correlationId =newCorrelationData("GTSY"+ DateUtils.simpleDatetimeFormatter().format(newDate()));
rabbitTemplate.setBeforePublishPostProcessors(newMessagePostProcessor() {
@Override
publicMessage postProcessMessage(Message message)throwsAmqpException {
message.getMessageProperties().setDelay(delayTime);
message.getMessageProperties().setCorrelationIdString(correlationId+"");
returnmessage;
}
});
rabbitTemplate.setConfirmCallback(this);
logger.info("【消息发送】-->延迟{}\n内容{}",delayTime,msg);
rabbitTemplate.convertAndSend(RabbitConfig.exchangeName, RabbitConfig.routingKey, msg,
correlationId);
}
/**
* 消息的回调,主要是实现RabbitTemplate.ConfirmCallback接口
* 注意,消息回调只能代表成功消息发送到RabbitMQ服务器,不能代表消息被成功处理和接受
*/
@Override
public voidconfirm(CorrelationData correlationData,booleanack, String cause) {
if(ack) {
logger.info(" 回调id:"+ correlationData +"-------"+"消息已到达MQ");
}else{
logger.info(" 回调id:"+ correlationData +"-------"+"消息没到达MQ????");
}
}
}
3使用
@RestController
@RequestMapping("/mq")
public classMqRabbitController {
@Autowired
privateRabbitMqSenderrabbitMqSender;
@RequestMapping("/mq/send/{gameType:^.*$}/{appid:^\\d$}")
@ResponseBody
publicObject send(@PathVariableString gameType,@PathVariableString appid){
rabbitMqSender.send(JSONObject.toJSONString(wxPayRequest), 0);
}
}
网友评论