美文网首页dockerSpring Cloud程序员
docker环境下的RabbitMQ部署,Spring AMQP

docker环境下的RabbitMQ部署,Spring AMQP

作者: 月冷心寒 | 来源:发表于2016-11-26 21:01 被阅读8651次

    AMQP简介

    AMQP,即 Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦和通讯。
    AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。
    RabbitMQ是一个开源的AMQP实现,服务器端用 Erlang 语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,具有很高的易用性和可用性。

    在docker环境部署RabbitMQ

    RabbitMQ是用 Erlang 编写的,直接部署的话需要先部署 Erlang 环境,比较麻烦。在 docker 环境下部署就比较简单了,直接使用rabbitmq官方提供的镜像即可。

    登录 docker 节点,运行 docker pull rabbitmq:management,这里使用的是带 web 管理插件的镜像。

    启动容器:

    docker run -d --name rabbitmq --publish 5671:5671 \
     --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
    rabbitmq:management
    

    容器启动之后就可以访问web 管理端了 http://宿主机IP:15672,默认创建了一个 guest 用户,密码也是 guest

    AMQP协议中的几个重要概念

    • Queue 是RabbitMQ的内部对象,用于存储消息。RabbitMQ中的消息只能存储在 Queue 中,消费者从 Queue 中获取消息并消费。
    • Exchange 生产者将消息发送到 Exchange,由 Exchange 根据一定的规则将消息路由到一个或多个 Queue 中(或者丢弃)。
    • Binding RabbitMQ中通过 BindingExchangeQueue 关联起来。
    • Binding key 在绑定(Binding) ExchangeQueue 的同时,一般会指定一个 binding key
    • Routing key 生产者在将消息发送给 Exchange 的时候,一般会指定一个 routing key,来指定这个消息的路由规则。 Exchange 会根据 routing keyExchange Type 以及 Binding key 的匹配情况来决定把消息路由到哪个 Queue
    • Exchange Types RabbitMQ常用的Exchange Type有 fanoutdirecttopicheaders 这四种。
      • fanout 这种类型的Exchange路由规则非常简单,它会把所有发送到该Exchange的消息路由到所有与它绑定的Queue中,这时 Routing key 不起作用。
      • direct 这种类型的Exchange路由规则也很简单,它会把消息路由到那些 binding keyrouting key完全匹配的Queue中。
      • topic 这种类型的Exchange的路由规则支持 binding keyrouting key 的模糊匹配,会把消息路由到满足条件的Queue。 binding key 中可以存在两种特殊字符 *#,用于做模糊匹配,其中 * 用于匹配一个单词,# 用于匹配多个单词(可以是零个),单词以 .为分隔符。
      • headers 这种类型的Exchange不依赖于 routing keybinding key 的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。

    使用Spring AMQP收发消息

    新建一个maven工程,修改pom.xml引入 spring amqp 依赖:

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>1.4.2.RELEASE</version>
    </parent>
    
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>
    </dependencies>
    

    java 目录中创建一个包 demo ,在包中创建启动入口 SpringAmqpApplication.java

    public static void main(String[] args) {
        ConfigurableApplicationContext context = SpringApplication.run(SpringAmqpApplication.class, args);
        Sender sender = context.getBean("sender", Sender.class);
        sender.sendMsg("测试Spring AMQP发送消息");
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        context.close();
    }
    
    @Bean
    CachingConnectionFactory myConnectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setHost("10.47.160.238");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }
    
    @Bean
    Exchange myExchange() {
        return ExchangeBuilder.topicExchange("test.topic").durable().build();
    }
    
    @Bean
    Queue myQueue() {
        return QueueBuilder.durable("myQueue").build();
    }
    
    @Bean
    public Binding myExchangeBinding(@Qualifier("myExchange") Exchange topicExchange,
                                     @Qualifier("myQueue") Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with("test.#").noargs();
    }
    
    @Bean
    public RabbitTemplate myExchangeTemplate(CachingConnectionFactory myConnectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(myConnectionFactory);
        rabbitTemplate.setExchange("test.topic");
        rabbitTemplate.setRoutingKey("test.abc.123");
        return rabbitTemplate;
    }
    

    demo 包下创建 Sender.java

    @Component
    public class Sender {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        public void sendMsg(String content) {
            rabbitTemplate.convertAndSend(content);
            System.out.println("发送消息: '" + content + "'");
        }
    
    }
    

    demo 包下创建Receiver.java

    @Component
    public class Receiver {
    
        @RabbitListener(queues = "myQueue")
        public void processMessage(Message message) {
            byte[] body = message.getBody();
            System.out.println("收到消息: '" + new String(body) + "'");
        }
    
    }
    

    回头看下代码,在 SpringAmqpApplication.java 中创建了程序启动入口 main 方法,为了有时间把收到的消息打印出来,让主线程 sleep 了1秒,配置了几个和 RabbitMQ 相关的重要配置:

    • RabbitMQ 的连接 CachingConnectionFactory
    • 创建了一个名为 test.topic 并且类型为 topicExchange
    • 创建了一个名为 myQueueQueue
    • 把上边创建的 QueueExchange 进行了绑定,并指定 binding keytest.#
    • 最后还配置了一个spring封装的模板工具类 rabbitTemplate,指定了 ExchangeRouting key,用这个 rabbitTemplate 发送消息, 会把消息发送到名为 test.topicExchange,并且带有 Routing key test.abc.123

    Sender.javaReceiver.java 的代码就比较简单了,在 sendMsg 方法中使用 rabbitTemplate 发送消息。在 processMessage 方法上加了一个 @RabbitListener(queues = "myQueue") 注解,指定从 myQueue 这个队列中获取消息。

    运行 main 方法启动工程,可以看到控制台打印出了发送的消息和接收的消息。

    demo源码 spring-amqp-demo

    最后

    RabbitMQ在 spring cloud 中做为消息总线,负责传递和分发系统消息,是非常重要的一个角色,spring cloud bus 动态加载配置就是使用消息总线,把重新拉去配置的消息分发到各个连接到消息总线的微服务。在 spring cloud steam 的消息驱动模型中同样使用了RabbitMQ。
    不仅如此,RabbitMQ本身也是一个非常高效的消息服务器,可以用在服务之间异步调用,以及RPC远程调用(在消息头中增加 Reply Queue 来监听调用返回信息)。

    相关文章

      网友评论

      • 0de8e8618612:IP写死,那要是重启rabbitmq怎么办,还要继续改IP?
      • TilKai:请问
        ```
        docker run -d --name rabbitmq --publish 5671:5671 \
        --publish 5672:5672 --publish 4369:4369 --publish 25672:25672 --publish 15671:15671 --publish 15672:15672 \
        rabbitmq:management
        ```
        中的6个publish分别代表什么呢?
        28609869e965:其实只需要映射主机5672和15672端口就行了
        TilKai:```
        4369:epmd(Erlang Port Mapper Daemon)
        25672:Erlang distribution
        5672, 5671:AMQP 0-9-1 without and with TLS
        15672:if management plugin is enabled
        61613, 61614:if STOMP is enabled
        1883, 8883:if MQTT is enabled
        ```

      本文标题:docker环境下的RabbitMQ部署,Spring AMQP

      本文链接:https://www.haomeiwen.com/subject/exugpttx.html