美文网首页dockerRabbitMQ
RabbitMQ的安装及入门

RabbitMQ的安装及入门

作者: AbstractCulture | 来源:发表于2020-06-27 14:11 被阅读0次

    什么是消息中间件?

    消息队列中间件(Message Queue Middleware,简称为MQ)是指利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。

    消息中间件的作用

    • 解耦
    • 存储
    • 扩展性
    • 削峰
    • 可恢复性
    • 顺序
    • 缓冲
    • 异步通信

    Linux环境下安装RabbitMQ

    这里我们使用docker对RabbitMQ进行安装
    启动docker,执行以下指令,即可安装RabbitMQ

    docker run --name rabbitmq -p 5672:5672 -p 15672:15672 -d rabbitmq
    

    安装完毕后,添加新用户(默认账户guest默认只能通过localhost进行访问)
    添加新用户,用户名为"root" 密码为"root"

    rabbitmqctl add_user root root
    

    为root用户设置所有权限

    rabbitmqctl set_permissions -p / root ".*" ".*" ".*"
    

    设置root用户为管理员的角色

    rabbitmqctl set_user_tags root administrator
    

    Spring集成RabbitMQ

    注意,这里引入的包为amqp-client,主要用于手动调用amqp的方法,来理解整个通信模型。

    <dependency>
        <groupId>com.rabbitmq</groupId>
        <artifactId>amqp-client</artifactId>
        <version>4.2.1</version>
    </dependency>
    

    Hello World

    • 生产者代码
    package com.xjm.rabbitmq;
    
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitProducer {
        //交换机名称
        private static final String EXCHANGE_NAME = "exchange_demo";
        //路由key
        private static final String ROUTING_KEY = "routingkey_demo";
        //队列名称
        private static final String QUEUE_NAME = "queue_demo";
        //ip地址
        private static final String IP_ADDRESS = "192.168.11.131";
        //端口
        private static final Integer PORT = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException {
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setHost(IP_ADDRESS);
            connectionFactory.setPort(PORT);
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root");
            //创建连接
            Connection connection = connectionFactory.newConnection();
            //创建信道
            Channel channel = connection.createChannel();
            //创建type为direct,持久化的,非自动化删除的交换机
            channel.exchangeDeclare(EXCHANGE_NAME,"direct",true,false,null);
            //创建一个持久化的,非排他的,非自动删除的队列
            channel.queueDeclare(QUEUE_NAME,true,false,false,null);
            //将交换机与队列进行绑定
            channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,ROUTING_KEY);
            //发送消息
            String message = "Hello,World!";
            channel.basicPublish(EXCHANGE_NAME,ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
            //关闭资源
            channel.close();
            connection.close();
        }
    
    }
    
    
    • 消费者代码
    package com.xjm.rabbitmq;
    
    import com.rabbitmq.client.*;
    import jdk.nashorn.internal.ir.CallNode;
    
    import java.io.IOException;
    import java.util.concurrent.TimeUnit;
    import java.util.concurrent.TimeoutException;
    
    public class RabbitConsumer {
        private static final String QUEUE_NAME = "queue_demo";
        private static final String IP_ADDRESS = "192.168.11.131";
        private static final Integer PORT = 5672;
    
        public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
            Address[] addresses = new Address[]{new Address(IP_ADDRESS,PORT)};
            ConnectionFactory connectionFactory = new ConnectionFactory();
            connectionFactory.setUsername("root");
            connectionFactory.setPassword("root");
            //创建连接
            Connection connection = connectionFactory.newConnection(addresses);
            //创建信道
            Channel channel = connection.createChannel();
            //设置客户端最多接收未被ack的消息个数
            channel.basicQos(64);
            //使用DefaultConsumer的方式实现消费
            Consumer consumer = new DefaultConsumer(channel){
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                    System.out.println("recv message:"+new String(body));
                    try {
                        TimeUnit.SECONDS.sleep(1);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    channel.basicAck(envelope.getDeliveryTag(),false);
                }
            };
            channel.basicConsume(QUEUE_NAME,consumer);
            //等待回调函数执行完毕后,关闭资源
            TimeUnit.SECONDS.sleep(5);
            channel.close();
            connection.close();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:RabbitMQ的安装及入门

        本文链接:https://www.haomeiwen.com/subject/jrnmfktx.html