美文网首页
RabbitMQ & RabbitMQ集群

RabbitMQ & RabbitMQ集群

作者: a9104fed92a0 | 来源:发表于2018-09-17 22:04 被阅读0次

    RabbitMQ

    Erang开发
    支持集群
    支持内存节点和磁盘节点

    RabbitMQ的模式

    (1)单一模式
    (2)普通模式(默认集群模式):只复制交换机和队列,不复制数据。
    (3)镜像模式:复制交换机队列和数据

    RabbitMQ 安装

    配置yum源
    yum install -y epel-release
    安装RabbitMQ-Server
    yum install -y rabbitmq-server
    启动服务

    /usr/lib/rabbitmq/bin/rabbitmq-plugins list //查看插件安装情况
    /usr/lib/rabbitmq/bin/rabbitmq-plugins enable rabbitmq_management //启用rabbitmq_management服务
    service rabbitmq-server start
    

    打开后台管理
    打开浏览器输入http://192.168.100.143:15672, 输入默认的Username:guest,输入默认的Password:guest ,登录后出现如图所示的界面。


    后台管理界面

    集群配置

    1.环境

    IP 主机名 用处
    192.168.33.10 centos01 磁盘
    192.168.33.11 centos02 内存
    192.168.33.12 centos03 内存

    配置本地域名,以centos01为例
    vi /etc/hostname
    修改为:
    centos01.localdomain
    vi /etc/hosts
    添加hosts
    192.168.33.10 centos01 192.168.33.11 centos02 192.168.33.12 centos03
    centos02,centos03,做类似配置。
    <font color='#A52A2A'>注意:一定要先配置域名和hosts,确保RabbitMQ Managerment中的Nodes 为 Rabbit@centos01</font>

    2.安装软件

    三台机器上安装RabbitMQ

    yum install -y epel-release
    yum install -y rabbitmq-server
    

    3.复制erlang.cookie

    # cat /var/lib/rabbitmq/.erlang.cookie 
    XAHPZVPYUQDWWJIOHUPQ
    

    复制XAHPZVPYUQDWWJIOHUPQ到centos02和centos03

    三台服务器上启动
    service rabbitmq-server start
    如果过程如果报错:
    {{shutdown,{failed_to_start_child,net_sup,{shutdown,{failed_to_start_child,auth,{"Cookie file /var/lib/rabbitmq/.erl
    则修改:

    chown rabbitmq:rabbitmq /var/lib/rabbitmq/.erlang.cookie 
    chmod 600 /var/lib/rabbitmq/.erlang.cookie
    

    4.将centos02、centos03作为内存节点加入centos01节点集群中

    rabbitmqctl stop_app    //停掉rabbit应用
    rabbitmqctl join_cluster --ram rabbit@centos01 //加入到磁盘节点
    rabbitmqctl start_app  //启动rabbit应用
    

    如果执行:rabbitmqctl join_cluster --ram rabbit@centos01 失败。要确保rabbitmq management的nodes中显示的名字为:rabbit@centos01。如果不是,
    则要修改 /etc/hostname 为:centos01.localdomain,并重启

    5.rabbitmq management

    打开http://localhost:15672/#/,nodes中显示为三个,则代表成功

    集群节点
    从图中可以看出,centos01 为磁盘节点(Disk),centos02,centos03 为内存节点(RAM)

    6.启动trace功能

    rabbitmq-plugins enable rabbitmq_tracing
    rabbitmq trace_on
    

    7.负载均衡

    Rabbitmq集群配置好之后,需要使用负载均衡技术来将所有的机器进行串联起来,可以使用HaProxy来进行负载均衡。

    RabbitMQ基本概念

    Broker:可以理解为消息队列服务器实体,负责接收消息生产者的消息,然后将消息发送至消息接收者或者其他的Broker。
    Exchange:消息交换机,是消息第一个到达的地方,消息通过它指定的路由规则,分发到不同的消息队列中去。
    Queue:消息队列,消息通过发送和路由之后最终到达的地方。
    Binding:绑定,作用是把Exchange和Queue按照路由规则绑定起来。
    Routing Key:路由关键字,Exchange根据这个关键字进行消息投递。
    Virtual host:虚拟主机,他是对Broker的虚拟划分,将消费者、生产这和他们依赖的AMQP相关结构进行隔离,一般是为了安全考虑。
    Connection:连接,代表生产者、消费者、Broker之间惊喜通信的物理网络。
    Channel:消息通道,用于连接生产者和消费者的逻辑结构。
    Producer:消息生产者
    Consumer:消息消费者

    RabbitMQ Sprint Boot集成

    1.项目配置

    pom.xml依赖

                    <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-web</artifactId>
            </dependency>
    

    application.properties rabbitmq 配置

    spring.application.name=spirng-boot-rabbitmq-sender
    spring.rabbitmq.host=localhost
    spring.rabbitmq.port=5672
    spring.rabbitmq.username=guest
    spring.rabbitmq.password=guest
    

    .direct

    队列配置

    @Configuration
    public class MQConfig {
        @Bean
        public Queue queue(){
            return new Queue("direct.queue");
        }
    }
    

    消息体Bean

    public class User implements Serializable{
        private long id;
        private String name;
    
        public long getId() {
            return id;
        }
    
        public void setId(long id) {
            this.id = id;
        }
    
        public String getName() {
            return name;
        }
    
        public void setName(String name) {
            this.name = name;
        }
    }
    

    生产者

    @Component
    public class MQProducer {
        @Autowired
        AmqpTemplate amqpTemplate;
        public void direct(User msg){
            amqpTemplate.convertAndSend("direct.queue",msg);
        }
    }
    

    消费者
    @Component
    public class MQReceiver {
    @RabbitListener(queues = "direct.queue")
    public void direct(User msg){
    System.out.println(msg.getName());
    }
    }

    2.topic

    队列配置

    @Component
    @Configuration
    public class MQTopicProducer {
        @Autowired
        AmqpTemplate amqpTemplate;
    
        @Bean(name = "queueA")
        public Queue queueA(){
            return new Queue("topic.queue.a");
        }
    
        @Bean(name = "queueB")
        public Queue queueB(){
            return new Queue("topic.queue.b");
        }
    
        @Bean(name = "topicExchange")
        public TopicExchange exchange(){
            return new TopicExchange("topic.exchange");
        }
    
        @Bean
        Binding bindingExchangeQueueA(@Qualifier("queueA") Queue queue, TopicExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("topic.#");
        }
    
        @Bean
        Binding bindingExchangeQueueB(@Qualifier("queueB") Queue queue, TopicExchange exchange){
            return BindingBuilder.bind(queue).to(exchange).with("topic.queue.b");
        }
    
    
      
    }
    

    生产者

      public void topic(String routingKey,User user){
            amqpTemplate.convertAndSend("topic.exchange",routingKey,user);
        }
    

    消费者

    @Component
    public class MQTopicConsumer {
        @RabbitListener(queues = "topic.queue.a")
        public void queueA(User user){
            System.out.println("queueA:"+user.getName());
        }
    
        @RabbitListener(queues = "topic.queue.b")
        public void queueB(User user){
            System.out.println("queueB:"+user.getName());
        }
    }
    

    3.fanout

    队列配置

    @Configuration
    public class MQFanoutProducer {
        @Bean(name = "fanoutQueueA")
        Queue queueA(){
            return new Queue("fanout.queue.a");
        }
        @Bean(name = "fanoutQueueB")
        Queue queueB(){
            return new Queue("fanout.queue.b");
        }
        @Bean(name = "fanoutQueueC")
        Queue queueC(){
            return new Queue("fanout.queue.c");
        }
    
        @Bean
        FanoutExchange fanoutExchange(){
            return new FanoutExchange("fanout.exchange");
        }
    
        @Bean
        Binding bindingQueueA(@Qualifier("fanoutQueueA") Queue queue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    
        @Bean
        Binding bindingQueueB(@Qualifier("fanoutQueueB") Queue queue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
        @Bean
        Binding bindingQueueC(@Qualifier("fanoutQueueC") Queue queue,FanoutExchange fanoutExchange){
            return BindingBuilder.bind(queue).to(fanoutExchange);
        }
    }
    
    

    生产者

    @Autowired
        AmqpTemplate amqpTemplate;
        public void fanout(User user){
            amqpTemplate.convertAndSend("fanout.exchange","",user);
        }
    

    消费者

    @Component
    public class MQFanoutConsumer {
        @RabbitListener(queues = "fanout.queue.a")
        public void queueA(User user){
            System.out.println("QueueA:"+user.getName());
        }
        @RabbitListener(queues = "fanout.queue.b")
        public void queueB(User user){
            System.out.println("QueueB:"+user.getName());
        }
    
        @RabbitListener(queues = "fanout.queue.c")
        public void queueC(User user){
            System.out.println("QueueC:"+user.getName());
        }
    
    }
    

    参考地址:http://blog.51cto.com/11134648/2155934

    相关文章

      网友评论

          本文标题:RabbitMQ & RabbitMQ集群

          本文链接:https://www.haomeiwen.com/subject/xviciftx.html