美文网首页开发
springboot集成Rabbit Mq

springboot集成Rabbit Mq

作者: 丶君为红颜酔 | 来源:发表于2018-12-26 18:39 被阅读0次

    http://www.cnblogs.com/boshen-hzb/p/6841982.html

    部署服务器(docker)

    ###  docker-hub
    docker pull ...
    
    ###  运行
    sudo docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456  -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883  Rabbit的镜像号
    
    ## 开放端口
    firewall-cmd --list-ports
    firewall-cmd --zone=public --add-port=2181/tcp --permanent
    firewall-cmd --reload
    systemctl stop firewalld.service #停止firewall
    systemctl disable firewalld.service #禁止firewall开机启动
    

    依赖

     <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
    

    使用过程发现的一些情况

    1. 流程是:客户端推送消息到exchange,如果有router-key(topic模式)则会推送到符合条件的队列里,如果使用订阅模式(fanout exchange) 则会推送给所有订阅的队列。
    2. 回调的使用是指:消息成功推送到指定队列,就会接受到回调。
    3. 多个java类监听同个队列,会轮流处理。
    4. topic模式(topic exchange)和订阅模式(fanout exchange)的区别在于 topic会使用到router-key,支持模糊匹配。

    使用方法

    ##  配置文件
    spring:
     rabbitmq:
          password: guest
          port: 5672
          virtual-host: /
          host: 127.0.0.1
          publisher-confirms: true
          username: guest
    
    ## 配置类
    public class RabbitConfig {
        @Value("${spring.rabbitmq.host}")
        private String addresses;
    
        @Value("${spring.rabbitmq.port}")
        private String port;
    
        @Value("${spring.rabbitmq.username}")
        private String username;
    
        @Value("${spring.rabbitmq.password}")
        private String password;
    
        @Value("${spring.rabbitmq.virtual-host}")
        private String virtualHost;
    
        @Value("${spring.rabbitmq.publisher-confirms}")
        private boolean publisherConfirms;
    
        @Bean
        public ConnectionFactory connectionFactory() {
            CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
            connectionFactory.setAddresses(addresses + ":" + port);
            connectionFactory.setUsername(username);
            connectionFactory.setPassword(password);
            connectionFactory.setVirtualHost(virtualHost);
            /** 如果要进行消息回调,则这里必须要设置为true */
            connectionFactory.setPublisherConfirms(publisherConfirms);
            return connectionFactory;
        }
    
        @Bean
        /** 因为要设置回调类,所以应是prototype类型,如果是singleton类型,则回调类为最后一次设置 */
        @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
        public RabbitTemplate rabbitTemplatenew() {
            RabbitTemplate template = new RabbitTemplate(connectionFactory());
            return template;
        }
    }
    
    ###  RabitMq 特色:回调确认(回调是指推送到队列即返回)
    @Slf4j
    @Component
    public class CallBackSender implements RabbitTemplate.ConfirmCallback {
    
        @Autowired
        private RabbitTemplate rabbitTemplatenew;
    
        public void send() {
            rabbitTemplatenew.setConfirmCallback(this);
            String msg = "callbackSender : i am callback sender";
            System.out.println(msg);
            CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
            System.out.println("callbackSender UUID: " + correlationData.getId());
            this.rabbitTemplatenew.convertAndSend("exchange", "topic.messages", msg, correlationData);
        }
    
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            log.info("callbakck confirm:{}", correlationData);
        }
    }
    
    ###  创建队列
      @Bean
        public Queue helloQueue() {
            return new Queue("helloQueue");
        }
    
    ###  创建exchange
        @Bean
        TopicExchange exchange() {
            return new TopicExchange("exchange");
        }
    
        @Bean
        FanoutExchange fanoutExchange() {
            return new FanoutExchange("fanoutExchange");
        }
    
    ### 生产者
     /**
         * 多消费者轮流消费:要求消费者处理相同逻辑
         *        this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);
         * @return
         */
        @NotAuth
        @GetMapping("/normal")
        public Object test() {
            for (int i = 0; i < 5; i++) {
                helloSender1.send("hellomsg:" + i);
                helloSender2.send("hellomsg:" + i);
            }
            return ResultUtil.success();
        }
    
        /*=======================================  主题发布  ======================================*/
    
        @Autowired
        TopicSender topicSender;
    
        /**
         * 主题发布模式,和订阅模式最大区别是支持模糊路由
         * 1. this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);
         * 2. BindingBuilder.bind(new Queue("topic.messages")).to(exchange).with("topic.#");//模糊匹配成功,也能发送到这个队列
         * 3. RabbitListener(queues = "topic.messages")
         *
         * @return
         */
        @NotAuth
        @GetMapping("/topic")
        public Object topic() {
            topicSender.send();
            return ResultUtil.success();
        }
    
        /*=======================================  广播模式或者订阅模式  ======================================*/
    
        @Autowired
        FanoutSender fanoutSender;
    
        /**
         * 发布订阅模式(有订阅就能获取)
         * 1. this.rabbitTemplate.convertAndSend("fanoutExchange",null, msgString);
         * 2. BindingBuilder.bind(new Queue("fanout.A")).to(fanoutExchange);
         * 3. RabbitListener(queues = "fanout.A")
         *
         * @return
         */
        @NotAuth
        @GetMapping("/fanout")
        public Object fanout() {
            fanoutSender.send();
            return ResultUtil.success();
        }
    
    
    ###  消费者
    @Component
    @RabbitListener(queues = "helloQueue")
    @RabbitListener(queues = "topic.messages")
    @RabbitListener(queues = "fanout.C")
    public class Receiver {
        @RabbitHandler
        public void process(String msg) {
            System.out.println("Receiver : " + msg);
        }
    
    }
    
    
    

    相关文章

      网友评论

        本文标题:springboot集成Rabbit Mq

        本文链接:https://www.haomeiwen.com/subject/cikklqtx.html