美文网首页SpringBoot程序员
SpringBoot 与 RabbitMQ

SpringBoot 与 RabbitMQ

作者: 纯正it狗 | 来源:发表于2017-09-14 17:38 被阅读102次

    简介

    RabbitMQ是一款开源的消息队列中间件, 使用Advanced Message Queuing Protocol (AMQP) 为协议来处理队列消息。由于当下分布式系统越来越普及,消息队列中间件也越来越被得到使用。

    和RabbitMQ同类型的消息中间件也有

    • ActiveMQ
    • RocketMQ
    • Kafka

    这篇文章暂时不分析几种MQ的比较,主要是写一些SpringBoot与RabbitMQ的使用

    Ubuntu 上简单安装 RabbitMQ

    首先找一台Linux服务器.

    这里我用的是Ubuntu 16.04(ubuntu-xenial)

    添加下载信息

    > echo "deb http://www.rabbitmq.com/debian/ testing main" >> /etc/apt/sources.list
    > curl http://www.rabbitmq.com/rabbitmq-signing-key-public.asc | sudo apt-key add -
    > apt-get update
    

    安装RabbitMQ

    > apt-get install rabbitmq-server
    

    安装完后咋们看一下RabbitMQ 有没有跑起来

    > service rabbitmq-server status
    

    如果跑起来的话会显示RabbitMQ的基本信息,没有的话运行

    > service rabbitmq-server start
    

    接下来我们需要一个后台的控制页面来管理我们的RabbitMQ服务, RabbitMQ其实自带了一个管理后台,所以直接激活一下就行了

    > rabbitmq-plugins enable rabbitmq_management
    

    接下来我们需要创建一个可以登录的用户,刚开始可以创建一个admin管理员,并给这个用户最高权限

    > rabbitmqctl add_user [username] [password]
    > rabbitmqctl set_user_tags [username] administrator
    > rabbitmqctl set_permissions -p "/" [username] ".*" ".*" ".*"
    

    当用户创建完毕后访问 http://[你的服务器IP]:15672 , 然后会出现登录画面,用刚才创建的用户登录即可

    RabbitMQ 的简单概念与使用

    首先看一下面的模型,简单了来讲就是Publisher将消息发送到Exchange上,然后Exchange通过Routes的健值来发布到Queue里,然后Consumer去Queue里消费消息.

    RabbitMQ消息队列模型图

    这边有三种常用的Exchange的方式

    • Direct Exchange
    • Topic Exchange
    • Fanout Exchange

    下面就对上面三种方式我们来进行一下代码的实现

    Direct Exchange

    Direct Exchange 是RabbitMQ 默认的交换形式,从图上可以看出, 消息发送方直接将带有Routing Key: green 这个参数发送到Exchange中,然后Exchange再将消息分配到对应的Queue中

    Direct Exchange Type

    Spring 代码实现如下

    Producer 部分

    @Component
    public class DirectProducer {
    
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "This direct exchange message";
            this.rabbitTemplate.convertAndSend("testqueue" , context);
        }
    }
    

    代码中将“testqueue” 做为routing key, exchange会将消息发送到名为“testqueue”的Queue里面

    Consumer部分

    @Component
    @RabbitListener(queues = "testqueue")
    public class DirectConsumer {
    
        @RabbitHandler
        public void consume(String context) {
            System.out.println("Direct Exchange Consumer  : " + context);
        }
    }
    

    Consumer对“testqueue” 进行监听然后消费消息。

    Topic Exchange

    Topic Exchange的模式跟Direct 相似,也需要传routing key,但是转发消息是通过通配符来做的。比如:

    • 路由键必须是一串字符,用句号(.) 隔开,比如说 main.queue

    • 路由模式必须包含一个 星号(*),主要用于匹配路由键指定位置的一个单词,比如说,一个路由模式是这样子:main.subqueue.*,那么就只能匹配路由键是这样子的:第一个单词是 main,第二个单词是 subqueue。

    • 井号(#)就表示相当于一个或者多个单词,例如一个匹配模式是main.subqueue.test.#,那么,以main.subqueue.test.one开头的路由键都是可以的。

    Topic Exchange type

    首先我们先造3个Consumer分别监听不同的queue,但是这里面监听的queue有绑定不同的通配符规则, 其中Annotation里的key包含了通配符规则

    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueueA",durable = "true"),
                    exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                    key = "*.topic.A")
    )
    public class TopicConsumerA {
        @RabbitHandler
        public void consume(String message) {
            System.out.println("Topic Consumer A  : " + message);
        }
    }
    
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueueB",durable = "true"),
                    exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                    key = "*.topic.*" )
    )
    public class TopicConsumerB {
        @RabbitHandler
        public void consume(String message) {
            System.out.println("Topic Consumer B  : " + message);
        }
    }
    
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "topicQueueC",durable = "true"),
                    exchange = @Exchange(value = "testTopicExchange",type = ExchangeTypes.TOPIC),
                    key = "main.topic.*")
    )
    public class TopicConsumerC {
        @RabbitHandler
        public void consume(String message) {
            System.out.println("Topic Consumer C  : " + message);
        }
    }
    

    然后我们通过发送不同routing key来控制那些Consumer能收到消息

    this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.A", context);
    

    main.topic.A 这条routing key 应该是满足上面三个通配符规则的,所有三个Consumer 都能收到发送的消息

    this.rabbitTemplate.convertAndSend("testTopicExchange", "test.topic.A", context);
    

    test.topic.A 只能满足A和B的通配符规则,即只有ConsumerA和ConsumerB能收到消息

    this.rabbitTemplate.convertAndSend("testTopicExchange", "main.topic.KK", context);
    

    main.topic.KK 只有ConsumerB和C能收到

    Fanout Exchange

    Fanout Exchange模式是,不管routing key 只要这个queue绑定到fanout exchange 这些queues 都会收到消息,有点像广播的形式.

    Fanout Exchange type

    接下来我看一下Spring代码的实现部分

    @Component
    public class FanoutProducer {
        @Autowired
        private AmqpTemplate rabbitTemplate;
    
        public void send() {
            String context = "This fanout exchange message";
            rabbitTemplate.convertAndSend("testFanoutExchange","", context);
        }
    }
    

    Producer将消息发送到"testFanoutExchange"的exchange中,第二个参数因为是Fanout模式所以我们不需要传routing key。

    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "fanoutqueueA",durable = "true"),
                    exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
    )
    public class FanoutConsumerA {
    
        @RabbitHandler
        public void consume(String message) {
            System.out.println("Fanout Exchange Consumer A  : " + message);
        }
    }
    
    @Component
    @RabbitListener(
            bindings = @QueueBinding(
                    value = @Queue(value = "fanoutqueueB",durable = "true"),
                    exchange = @Exchange(value = "testFanoutExchange",type = ExchangeTypes.FANOUT))
    )
    public class FanoutConsumerB {
    
        @RabbitHandler
        public void consume(String message) {
            System.out.println("Fanout Exchange Consumer B  : " + message);
        }
    }
    

    这边consumer中只要将监听的queue绑定到接收消息的"testFanoutExchange" 即可.

    这样在fanout这种模式下只要是绑定到指定的exchange上所有的queues都能收到消息

    演示代码

    以上演示代码可以到 https://github.com/dreamcatchernick/spring-boot-samples 的spring-boot-rabbitmq 目录下载并运行

    相关文章

      网友评论

        本文标题:SpringBoot 与 RabbitMQ

        本文链接:https://www.haomeiwen.com/subject/hbydsxtx.html