美文网首页
SpringBoot SpringAmqp(rabbitmq)

SpringBoot SpringAmqp(rabbitmq)

作者: a朋友请叫我苏哥 | 来源:发表于2017-05-26 17:39 被阅读0次

    1、yml

    rabbitmq:

      host:192.168.62.129

      port:5672

      username:gtsy

      password:gtsy

      virtual-host:vhost

      publisher-confirms:true

      publisher-returns:true

    2配置

    2.1

    @Configuration

    @SuppressWarnings("SpringJavaAutowiringInspection")

    public classRabbitConfig {

    private staticLoggerlogger= LoggerFactory.getLogger(RabbitConfig.class);

    private static finalStringqueueName="order_queue";

    public static finalStringexchangeName="order_exchange";

    public static finalStringroutingKey="gt.order.notify";

    @Autowired

    ConnectionFactoryconnectionFactory;

    @Bean

    publicTopicExchange topicExchange() {

    TopicExchange topicExchange =newTopicExchange(exchangeName);

    topicExchange.setDelayed(true);

    topicExchange.setIgnoreDeclarationExceptions(true);

    returntopicExchange;

    }

    @Bean

    publicQueue queue() {

    //持久化队列 第二个参数

    return newQueue(queueName,true);

    }

    @Bean

    Binding binding() {

    returnBindingBuilder.bind(queue()).to(topicExchange()).with(routingKey);

    }

    @Bean

    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)

    publicRabbitTemplate rabbitTemplate() {

    RabbitTemplate template =newRabbitTemplate(connectionFactory);

    template.setQueue(queueName);

    template.setExchange(exchangeName);

    returntemplate;

    }

    @Autowired

    RabbitMqSenderrabbitMqSender;

    @Bean

    /**

    * 监听队列 queue

    */

    publicSimpleMessageListenerContainer messageContainer() {

    SimpleMessageListenerContainer container =newSimpleMessageListenerContainer(connectionFactory);

    container.setQueues(queue());

    container.setExposeListenerChannel(true);

    container.setMaxConcurrentConsumers(1);

    container.setConcurrentConsumers(1);

    container.setAcknowledgeMode(AcknowledgeMode.MANUAL);//设置确认模式手工确认

    container.setMessageListener(newChannelAwareMessageListener() {

    public voidonMessage(Message message, com.rabbitmq.client.Channel channel)throwsException {

    byte[] body = message.getBody();

    logger.info("接受收到消息:-->消息Id{}\n\t消息内容:\n{}",message.getMessageProperties().getCorrelationIdString(),newString(body));

    //开始处理body

    JSONObject notifyJson =JSONObject.parseObject(newString(body));

    String notifyUrl = notifyJson.getString("notifyUrl");

    String notifyContent = notifyJson.getString("notifyContent");

    //            String result = HttpConnectionUtil.postMessage(notifyUrl, notifyContent);

    String result ="";

    if(StringUtils.isEmpty(result)) {// 通知失败 进入重发机制

    intnewNotifyCount = notifyJson.getIntValue("notifyCount") +1;//已经通知的次数

    if(newNotifyCount <4) {

    notifyJson.put("notifyCount", newNotifyCount);

    intspacingInterval = EnumNoticeDelayTime.geByCount(newNotifyCount).getSecond();

    rabbitMqSender.send(notifyJson.toJSONString(),spacingInterval);

    logger.info("发送了延迟消息ID【{}】\n发送时间 {}\n延迟{}秒\n第{}次",message.getMessageProperties().getCorrelationIdString(), DateUtils.simpleDatetimeFormatter().format(newDate()),spacingInterval/1000,newNotifyCount);

    }else{

    logger.info("通知5次都失败,等待后台手工处理!");

    }

    }

    channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);//确认消息成功消费

    }

    });

    returncontainer;

    }

    }

    2.2

    @Component

    public classRabbitMqSenderimplementsRabbitTemplate.ConfirmCallback {

    privateLoggerlogger= LoggerFactory.getLogger(RabbitMqSender.class);

    @Autowired

    privateRabbitTemplaterabbitTemplate;

    public  voidsend(String msg,intdelayTime){

    CorrelationData correlationId =newCorrelationData("GTSY"+ DateUtils.simpleDatetimeFormatter().format(newDate()));

    rabbitTemplate.setBeforePublishPostProcessors(newMessagePostProcessor() {

    @Override

    publicMessage postProcessMessage(Message message)throwsAmqpException {

    message.getMessageProperties().setDelay(delayTime);

    message.getMessageProperties().setCorrelationIdString(correlationId+"");

    returnmessage;

    }

    });

    rabbitTemplate.setConfirmCallback(this);

    logger.info("【消息发送】-->延迟{}\n内容{}",delayTime,msg);

    rabbitTemplate.convertAndSend(RabbitConfig.exchangeName, RabbitConfig.routingKey, msg,

    correlationId);

    }

    /**

    * 消息的回调,主要是实现RabbitTemplate.ConfirmCallback接口

    * 注意,消息回调只能代表成功消息发送到RabbitMQ服务器,不能代表消息被成功处理和接受

    */

    @Override

    public voidconfirm(CorrelationData correlationData,booleanack, String cause) {

    if(ack) {

    logger.info(" 回调id:"+ correlationData +"-------"+"消息已到达MQ");

    }else{

    logger.info(" 回调id:"+ correlationData +"-------"+"消息没到达MQ????");

    }

    }

    }


    3使用

    @RestController

    @RequestMapping("/mq")

    public classMqRabbitController  {

    @Autowired

    privateRabbitMqSenderrabbitMqSender;

    @RequestMapping("/mq/send/{gameType:^.*$}/{appid:^\\d$}")

    @ResponseBody

    publicObject send(@PathVariableString gameType,@PathVariableString appid){

    rabbitMqSender.send(JSONObject.toJSONString(wxPayRequest), 0);

    }

    }

    相关文章

      网友评论

          本文标题:SpringBoot SpringAmqp(rabbitmq)

          本文链接:https://www.haomeiwen.com/subject/rhpxfxtx.html