美文网首页
springboot集成mq问题总结

springboot集成mq问题总结

作者: Raral | 来源:发表于2021-10-09 18:19 被阅读0次

springboot集成mq

基本使用

  1. pom 和 yml
 <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
        <version>2.2.5.RELEASE</version>
    </dependency>
spring:
    rabbitmq:
        host: 119.29.14.214
        port: 5672
        username: lg123
        password: lg123
        virtualHost: /gzsz
        publisher-confirm-type: correlated
        template:
          retry:
            enabled: true
            initial-interval: 10000ms
            max-interval: 300000ms
            multiplier: 2
          exchange: topic.exchange
  1. 配置

@Configuration
public class RabbitmqConfig {
    public static final String QUEUE_EMAIL = "queue_email";//email队列
    public static final String QUEUE_SMS = "queue_sms";//sms队列
    public static final String EXCHANGE_NAME="topic.exchange";//topics类型交换机
    public static final String ROUTINGKEY_EMAIL="topic.#.email.#";
    public static final String ROUTINGKEY_SMS="topic.#.sms.#";

    //声明交换机
    @Bean(EXCHANGE_NAME)
    public Exchange exchange(){
        //durable(true) 持久化,mq重启之后交换机还在
        return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
    }

    //声明email队列
    /*
     *   new Queue(QUEUE_EMAIL,true,false,false)
     *   durable="true" 持久化 rabbitmq重启的时候不需要创建新的队列
     *   auto-delete 表示消息队列没有在使用时将被自动删除 默认是false
     *   exclusive  表示该消息队列是否只在当前connection生效,默认是false
     */
    @Bean(QUEUE_EMAIL)
    public Queue emailQueue(){
        return new Queue(QUEUE_EMAIL);
    }
    //声明sms队列
    @Bean(QUEUE_SMS)
    public Queue smsQueue(){
        return new Queue(QUEUE_SMS);
    }

    //ROUTINGKEY_EMAIL队列绑定交换机,指定routingKey
    @Bean
    public org.springframework.amqp.core.Binding bindingEmail(@Qualifier(QUEUE_EMAIL) Queue queue,
                                                              @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_EMAIL).noargs();
    }
    //ROUTINGKEY_SMS队列绑定交换机,指定routingKey
    @Bean
    public org.springframework.amqp.core.Binding  bindingSMS(@Qualifier(QUEUE_SMS) Queue queue,
                                                             @Qualifier(EXCHANGE_NAME) Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(ROUTINGKEY_SMS).noargs();
    }

}

  1. 消费者逻辑

@Component
public class ReceiveHandler {
    //监听邮件队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.email.#","email.*"}))
    public void rece_email(String msg){
        // System.out.println(" [邮件服务] received : " + msg + "!");
        System.out.println(" [邮件服务] received : " + msg + "!");
        String msg2 = "{\"name\":\"fs\",\"age\":12}";
        JSONObject json = JSONObject.parseObject(msg2);
        System.out.println(json);

        String s = StringEscapeUtils.unescapeJava(msg);
        JSONObject json2 = JSONObject.parseObject(s);
        System.out.println(json2);
        String age =String.valueOf(json2.get("age"));
        System.out.println(age);
//        System.out.println(jsonObject1);
//        JSONObject jsonObject = JSONObject.parseObject(msg);
//        System.out.println(jsonObject);
    }

    //监听短信队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_sms", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.sms.#"}))
    public void rece_sms(String msg){
        System.out.println(" [短信服务] received : " + msg + "!");
    }

}

成熟集成使用

  1. 配置
@Configuration
public class RabbitmqConfig2 {
//    public static final String QUEUE_NAME_MERCHANT = "queue_merch";//有关店铺队列
//    public static final String DIRECT_EXCHANGE_MERCHANT = "exchange_merch";//有关店铺交换机,类型direct
//    public static final String ROUTING_KEY_MERCHANT = "routing_merch";//队列绑定指定了需要routing key(店铺的)

    public final static String ROUTING_EXCHANGE = "routing1"; // 路由的交换机
    public final static String ROUTING_1 = "routing_1"; //routing队列1
    public final static String ROUTING_2 = "routing_2"; //routing队列2
    public final static String ROUTING3_3 = "coupon_merch"; //routing队列3 不要用queue命名
//    // 1. 定义队列
//    @Bean("queue_merch")
//    public Queue SearchMerchantDirectQueue() {
//        return new Queue(QUEUE_NAME_MERCHANT, true);
//    }
//    // 2. 定义交换机
//    @Bean("exchange_merch")
//    public Exchange SearchMerchantDirectExchange() {
//        return  ExchangeBuilder.directExchange(DIRECT_EXCHANGE_MERCHANT).build();
//    }
//    // 3. 队列和交换机绑定
//    public Binding MerchantQueueBind(@Qualifier(QUEUE_NAME_MERCHANT) Queue queue, @Qualifier(DIRECT_EXCHANGE_MERCHANT) Exchange exchange) {
//        return BindingBuilder.bind(queue).to(exchange).with(QUEUE_NAME_MERCHANT).noargs();
//    }



    @Bean("routing_1")
    public Queue OUTING_1_Queue() {
        return new Queue(ROUTING_1, true, false, true);
    }

    @Bean("routing_2")
    public Queue ROUTING_2_Queue() {
        return new Queue(ROUTING_2, true, false, true);
    }
    @Bean("coupon_merch")
    public Queue ROUTING_3_Queue() {
        return new Queue(ROUTING3_3, true, false, true);
    }

    @Bean("routing1")
    public Exchange routingExchange(){
        return ExchangeBuilder.directExchange(ROUTING_EXCHANGE).build();
    }

    @Bean
    public Binding binding_Routing_QUEUE_1(@Qualifier(ROUTING_1) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_1).noargs();
    }
    @Bean
    public Binding binding_Routing_QUEUE_2(@Qualifier(ROUTING_2) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING_2).noargs();
    }

    @Bean
    public Binding binding_Routing_QUEUE_3(@Qualifier(ROUTING3_3) Queue queue, @Qualifier(ROUTING_EXCHANGE) Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(ROUTING3_3).noargs();
    }



}
  1. 消费者
@Component
public class ReceiveHandler {
    //监听邮件队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_email", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.email.#","email.*"}))
    public void rece_email(String msg){
//        JSONObject jsonObject = JSU.parseObject(msg);
//        cn.hutool.json.JSONObject jsonObject = JsonUtil.p;
//        cn.hutool.json.JSONArray atno_list = jsonObject.getJSONArray("atno_list");
//        JSONObject jsonObject1 = JSONObject.parseObject(msg);
//        JSONUtil.parseObj(msg, )
//        System.out.println(" [邮件服务] received : " + msg + "!");
//        String msg2 = "{\"name\":\"fs\",\"age\":12}";
//        JSONObject json = JSONObject.parseObject(msg2);
//        System.out.println(json);
//
//        String s = StringEscapeUtils.unescapeJava(msg);
//        JSONObject json2 = JSONObject.parseObject(s);
//        System.out.println(json2);
//        String age =String.valueOf(json2.get("age"));
//        System.out.println(age);
//        System.out.println(jsonObject1);
//        JSONObject jsonObject = JSONObject.parseObject(msg);
//        System.out.println(jsonObject);
        System.out.println(" [邮件服务] received : " + msg + "!");
        JSONObject jsonObject = JSONObject.parseObject(msg);
        String username = String.valueOf(jsonObject.get("username"));
        String password = String.valueOf(jsonObject.get("password"));
        System.out.println(username);
        System.out.println(password);
    }

    //监听短信队列
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue_sms", durable = "true"),
            exchange = @Exchange(
                    value = "topic.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = {"topic.#.sms.#"}))
    public void rece_sms(String msg){
        System.out.println(" [短信服务] received : " + msg + "!");
    }

    // 监听direct交换机的消息;queues:队列名称
//    @RabbitListener(queues = {RabbitmqConfig2.QUEUE_NAME_MERCHANT})
//    public void reciveDirectMsg(String msg) {
//        System.out.println("reciveDirectMsg的消费者:" + msg);
////        System.out.println(msg);
//    }

    // 队列1的消费者
    @RabbitListener(queues = RabbitmqConfig2.ROUTING_1)
    public void queues_1(String msg) {
        System.out.println("队列1的消费者:" + msg);
    }

    // 队列2的消费者
    @RabbitListener(queues = RabbitmqConfig2.ROUTING_2)
    public void queues_2(String msg) {
        System.out.println("队列2的消费者:" + msg);
    }

    // 队列3的消费者
    @RabbitListener(queues = RabbitmqConfig2.ROUTING3_3)
    public void queues_333(String msg) {
        System.out.println("队列3的消费者:" + msg);
    }
}
  1. 生产者
//测试mq发送消息
    @PostMapping("/getOrderNum2")
    public String getOrderNum2() {
        UserInfo userInfo = new UserInfo();
        userInfo.setUsername("李三55");
        userInfo.setPassword("12355");
        String s = JsonUtils.toJSONString(userInfo);
//        rabbitTemplate.convertAndSend(RabbitmqConfig2.DIRECT_EXCHANGE_MERCHANT,RabbitmqConfig2.ROUTING_KEY_MERCHANT,s);

        rabbitTemplate.convertAndSend(RabbitmqConfig2.ROUTING_EXCHANGE, RabbitmqConfig2.ROUTING3_3,JsonUtils.toJSONString(userInfo));
        return "调用成功!";
    }

    //测试mq发送消息
    @PostMapping("/getOrderNum3")
    public String getOrderNum3() {
        for (int i = 0; i < 5; i++) {
            rabbitTemplate.convertAndSend(RabbitmqConfig2.ROUTING_EXCHANGE, RabbitmqConfig2.ROUTING_1,i);
        }
        return "调用成功!";
    }

相关文章

网友评论

      本文标题:springboot集成mq问题总结

      本文链接:https://www.haomeiwen.com/subject/pcaqoltx.html