rabbitmq

作者: 78f6ced3a012 | 来源:发表于2019-06-07 22:34 被阅读0次

先说一下安装rabbitmq
1,安装opt_win64_22.0.exe (Erlang )
2,安装rabbitmq
3,计算机--管理--服务 找到rabbitmq服务右键属性,更改为当前登录人并填写密码,然后重启服务
4,去安装目录C:\Program Files\RabbitMQ Server\rabbitmq_server-3.7.15\sbin的路径的cmd窗口 执行:rabbitmq-plugins enable rabbitmq_management 打开管理窗口
5,管理地址:http://localhost:15672 登录名:guest,密码:guest(注意:本地登录才有效,远程登录不行)

概述:
rabbitmq是一个消息队列,主要用于系统之间的异步和解耦,同时也能起到消息缓存,消息分发的作用。
一般的消息队列有3个关键部分,生产者,队列,消费者;但是rabbitmq还有一个exchange(交换机);交换机的作用是:生产者把消息给交换机然后根据策略路由到队列上面去保存;交换机不能保存消息。
rabbitmq还有一个虚拟主机的概念:其作用就是权限隔离,A虚拟机的交换机和队列跟B虚拟机的交换机和队列是不能互相访问(针对用户),就是用户仅有A虚拟机的权限就不能去访问B虚拟机里的交换机和队列。

交换机四种模式(每个队列中的消息只能被消费一次):
交换机的作用就是把消息路由到绑定的队列中去。
direct:简单模式,一个发,另一个收
topic:主题模式,比direct灵活
fanout:广播模式,绑定的队列全部都能消费
headers:设置 header attribute 参数类型的交换机(使用的少,后期再琢磨)
特性:
ack:应答机制,用于很重要的消息,必须确保送到且消费。默认是自动应答;如需手动应答需要自己设置配置文件。
durable:持久化机制,把队列中的数据持久化到硬盘,避免rabbitmq服务器down机,而数据不会丢失。

贴代码(springboot整合rabbitmq):
properties配置:

#rabbitmq配置
spring.rabbitmq.addresses=127.0.0.1
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.port=5672

# 开启ACK
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual

direct模式(不用配置交换机,有默认的)
配置队列:

package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
 * rabbitmq的默认模式:direct,最简单的队列模式,一个生产者,一个消费者,交换机也是默认的不用创建,路由key默认是队列名
 * 1个消息只能被消费一次,如果有多个消费者,rabbitmq服务器会自己做负载均衡,平均消费
 */
@Configuration
public class DirectConfig {
    public static final String queue_direct_a = "queue.direct.a";
    public static final String queue_direct_user = "queue.direct.user";

    @Bean(name = queue_direct_a)
    public Queue queue1(){
        return new Queue(queue_direct_a);
    }

    @Bean(name = queue_direct_user)
    public Queue queue2(){
        return new Queue(queue_direct_user);
    }
}

生产者:

package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
 * 生产者
 */
@Component
public class DirectProducer {
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String info){
        String now = LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);
        String msg = "时间: " + now + ", info: " + info;
        amqpTemplate.convertAndSend(DirectConfig.queue_direct_a, msg);
    }

    /**
     * mq之间传递Java对象(Java对象要实现序列化接口)
     * @param user
     */
    public void sendMsg(User user){
        amqpTemplate.convertAndSend(DirectConfig.queue_direct_user, user);
    }
}

消费者:

package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.DirectConfig;
import com.example.rabbitmq1.model.User;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@RabbitListener(queues = DirectConfig.queue_direct_user)
public class DirectConsumer3 {

    @RabbitHandler
    public void receive(User user){
        // 如果是String类型,接收类型就改为String
        System.out.println("-----------------------------user--------------------------");
        System.out.println(user);
    }
}

topic模式
配置队列,交换机,互相绑定

package com.example.rabbitmq1.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * topic模式相比direct模式就是灵活一点(在同一个队列中也是1个消息实体只能被消费1次)
 * 一个交换机可以绑定到多个队列上,也可以使用匹配模式来匹配符合规则的队列
 * 匹配模式:*代表一个元素,#代表0个或多个元素
 * 队列命名原则:aaa.bbbb.ccc   以.隔开
 */
@Configuration
public class TopicConfig {

    public static final String queue_topic_a = "queue.topicA";  //不支持queue.topic.a    三级模式(这个坑,我刚踩完)

    public static final String queue_topic_b = "queue.topicB";

    public static final String exchange_topic = "exchange_topic";


    @Bean(name = queue_topic_a)
    public Queue queue1(){
        return new Queue(queue_topic_a);
    }

    @Bean(name = queue_topic_b)
    public Queue queue2(){
        return new Queue(queue_topic_b);
    }

    @Bean(name = exchange_topic)
    public TopicExchange exchange(){
        return new TopicExchange(exchange_topic);
    }

    /**
     * 把队列绑定到交换机上面
     * with方法中的参数是路由key
     * 在使用的时候会根据交换和路由队列名来推送消息,
     * 如果队列名符合路由key的规则就会把消息推送到绑定的队列中去
     * @param queue
     * @param topicExchange
     * @return
     */
    @Bean
    public Binding bindingExchangeA(@Qualifier(queue_topic_a) Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with(queue_topic_a);
    }

    @Bean
    public Binding bindingExchangeB(@Qualifier(queue_topic_b) Queue queue, TopicExchange topicExchange){
        return BindingBuilder.bind(queue).to(topicExchange).with("queue.*");
    }
}

生产者:

package com.example.rabbitmq1.producer;
import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.core.AmqpTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class TopicProducer {

    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMsg(String text){
        amqpTemplate.convertAndSend(TopicConfig.exchange_topic, TopicConfig.queue_topic_b, text);
    }
}

消费者:

package com.example.rabbitmq1.consumer;

import com.example.rabbitmq1.config.TopicConfig;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class TopicConsumer {

    @RabbitListener(queues = TopicConfig.queue_topic_a)
    public void receiveA(String info){
        System.out.println("-----------------------队列A:" + info);
    }

    @RabbitListener(queues = TopicConfig.queue_topic_b)
    public void receiveB(String info){
        System.out.println("-----------------------队列B:" + info);
    }
}

fanout模式就是广播模式,和topic类似,把不同点说说

    @Bean
    public FanoutExchange exchange() {
        // 是fanout的交换机
        return new FanoutExchange(exchange_fanout);
    }

    @Bean
    public Binding bindingFanoutA(@Qualifier(queue_fanout_a) Queue queue, FanoutExchange fanoutExchange){
        // 没有路由key,(后面没有with)
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    // 生产者,注意队列名为null
    public void sendMsg(String info){
//        amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, info);   此用法无效
        amqpTemplate.convertAndSend(FanoutConfig.exchange_fanout, null, info);
    }

应答机制(properties记得开启配置):

package com.example.rabbitmq1.consumer;
import com.example.rabbitmq1.config.FanoutConfig;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.io.IOException;

@Component
public class FanoutConsumer {

    @RabbitListener(queues = FanoutConfig.queue_fanout_a)
    public void receive1(String info, Channel channel, Message message){
        try {
            // 应答这条消息,如果没有应答,下次连接消息依然会收到
            channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
            System.out.println("----------------------------队列A: "+info);
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    @RabbitListener(queues = FanoutConfig.queue_fanout_b)
    public void receive2(String info, Channel channel, Message message){
        try {
            // 销毁这条消息(在队列中销毁)
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false);
        } catch (IOException e) {
            e.printStackTrace();
        }
        System.out.println("----------------------------队列B: "+info);
    }
}

持久化机制:默认是持久化
看源码的构造方法:队列和交换机
队列:

public class Queue extends AbstractDeclarable {
    public static final String X_QUEUE_MASTER_LOCATOR = "x-queue-master-locator";
    private final String name;
    private final boolean durable;
    private final boolean exclusive;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile String actualName;

    public Queue(String name) {
        this(name, true, false, false);  //看第2个参数默认true
    }

    public Queue(String name, boolean durable) {
        this(name, durable, false, false, (Map)null);
    }

    public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete) {
        this(name, durable, exclusive, autoDelete, (Map)null);
    }
}

交换机:

public abstract class AbstractExchange extends AbstractDeclarable implements Exchange {
    private final String name;
    private final boolean durable;
    private final boolean autoDelete;
    private final Map<String, Object> arguments;
    private volatile boolean delayed;
    private boolean internal;

    public AbstractExchange(String name) {
        this(name, true, false);  //看第2个参数默认为true
    }

    public AbstractExchange(String name, boolean durable, boolean autoDelete) {
        this(name, durable, autoDelete, (Map)null);
    }
}

相关文章

网友评论

      本文标题:rabbitmq

      本文链接:https://www.haomeiwen.com/subject/vlgvxctx.html