美文网首页
rabbitmq入门一简单队列(Hello World!、Wor

rabbitmq入门一简单队列(Hello World!、Wor

作者: weihubeats | 来源:发表于2019-10-29 17:30 被阅读0次

    @[toc]
    docker搭建rabbitm

    mq官网

    "Hello World!":

    官网教程
    点对点,一个生产者,一个消费者,一个队列。
    特点:

    • 没有交换机概念,生产者和消费者直接通过队列进行交流
    在这里插入图片描述

    1. mq创建一个队列

    • 安装完rabbitm直接访问 118.25.188.37:15672
    • 进入登入界面:默认密码都guest
    • ==这里不创建队列也行,java中绑定队列如果队列没有会创建==


      在这里插入图片描述
      在这里插入图片描述

      完成后点击add queue


      在这里插入图片描述

    2. 创建生产者消费者

    1. 引入依赖
         dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
                <version>5.73</version>
            </dependency>
        </dependencies>
    
    1. 获取mq连接工具类(类似jdbc连接)
    • MQConnectionUtils
    public class MQConnectionUtils {
        private static final String IP = "118.25.188.37";
        private static final Integer PORT = 5672;
        private static final String USERNAME = "guest";
        private static final String PASSWORD = "guest";
    
        public static Connection newConnection() throws Exception {
            //定义连接工厂
            ConnectionFactory factory = new ConnectionFactory();
            //设置服务地址
            factory.setHost(IP);
            //设置端口号
            factory.setPort(5672);
            //设置账号信息,用户名、密码、vhost
            factory.setUsername(USERNAME);
            factory.setPassword(PASSWORD);
            //创建连接
            Connection connection = factory.newConnection();
            return connection;
    
        }
    }
    
    
    1. Producer 生产者
    • Producer
    public class Producer {
        private static final String QUEUE_NAME = "mq";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.创建通道
            Channel channel = newConnection.createChannel();
            // 3.创建队列声明
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            String msg = "直接模式消息发送";
            System.out.println("生产者发送消息:" + msg);
            // 4.发送消息
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            channel.close();
            newConnection.close();
        }
    
    }
    
    

    发送消息后mq会出现待消费的消息


    在这里插入图片描述
    1. Customer 消费者
    • Customer
    public class Customer {
        private static final String QUEUE_NAME = "mq";
    
        public static void main(String[] args) throws IOException, TimeoutException {
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            // 2.获取通道
            Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                }
            };
            // 3.监听队列  true表示自动应答,false表示手动应答
            channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    
        }
    
    }
    
    

    消费完后mq消息就没有了

    工作队列 Work queues

    在这里插入图片描述

    与点对点不同的是,消费者由1个变成了两个,消费者集群了
    我们这里启动两个消费者


    在这里插入图片描述
    在这里插入图片描述

    然后发送10条消息
    看看结果:


    在这里插入图片描述 在这里插入图片描述

    可以看到实现的是均摊消费

    应答模式

    channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
    这里第二个参数表示应答模式为true,表示自动签收

    • 自动应答:不会在乎消费者对这个消息处理是否成功,都会告诉队列删除该消息,如果消息获取失败的情况,实现自动补偿
    • 手动应答:消费者处理完业务逻辑,手动返回一个ack(通知)告诉队列服务器是否删除该消息

    这里我们将 应答模式设置为false
    channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    然后向消费者发送10个消息

    在这里插入图片描述
    可以看到消费者接收到了10个消息,但是我现在如果停止消费者
    在这里插入图片描述
    发现队列中还是有10个消息未消费,原因我我们没有手动返回ask
    这里我们需要加上这个channel.basicAck(envelope.getDeliveryTag(), false);
    public class Customer {
        private static final String QUEUE_NAME = "mq";
    
        public static void main(String[] args) throws  Exception {
            System.out.println("消费者2启动");
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            /* 2.获取通道 */
            Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //监听队列
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 3.监听队列  true表示自动应答,false表示手动应答
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
        }
    
    }
    

    这样就表示消费者接受消息成功了
    实现:添加如下代码channel.basicQos(1);

    public class Customer {
        private static final String QUEUE_NAME = "mq";
    
        public static void main(String[] args) throws  Exception {
            System.out.println("消费者2启动");
            // 1.获取连接
            Connection newConnection = MQConnectionUtils.newConnection();
            /* 2.获取通道 */
            Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            //表示一次只消费一个消息
            channel.basicQos(1);
            //监听队列
            DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                        throws IOException {
                    String msgString = new String(body, "UTF-8");
                    System.out.println("消费者获取消息:" + msgString);
                    //手动应答
                    channel.basicAck(envelope.getDeliveryTag(), false);
                }
            };
            // 3.监听队列  true表示自动应答,false表示手动应答
            channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
    
        }
    
    }
    

    公平队列

    在上面我们消费者如果集群,消费者接受采用的均摊消费,但每个消费者处理业务时间不同,这样就不能让性能更好的消费者消费更多的消息(能者多劳)

    • 解决方案:消费者都采用应答模式实现公平队列,即谁消费快,消费的消息多

    集成springboot

    demo 路径结构:


    在这里插入图片描述

    2.代码测试

    创建一个springboot项目,然后加入mq依赖

    dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>
        </dependencies>
    

    配置 application.properties

    # web端口
    server.port=8089
    # mq地址
    spring.rabbitmq.host=118.25.188.37
    

    测试生产者

    @RunWith(SpringRunner.class)
    @SpringBootTest(classes = RabbitmqApplication.class)
    public class ProductTest {
    
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        @Test
        public void senMes() {
            rabbitTemplate.convertAndSend("mq","直接模式消息发送");
    
        }
    }
    
    

    运行sendMes
    无错误后访问mq web管理页面发现多了一条待消费的消息


    在这里插入图片描述

    编写消费者

    @Component
    @RabbitListener(queues = "mq")  //指定消费队列
    public class Customer1 {
    
        @RabbitHandler
        public void getMsg(String msg) {
            System.out.println("直接模式消费消息" + msg);
        }
    }
    
    

    然后直接运行main
    可以看到效果


    在这里插入图片描述
    在这里插入图片描述

    也可以启动多个消费者等待消息,具体idea启动多个实例请看这里

    相关文章

      网友评论

          本文标题:rabbitmq入门一简单队列(Hello World!、Wor

          本文链接:https://www.haomeiwen.com/subject/bypgvctx.html