美文网首页
RabbitMQ六种队列模式-简单队列模式

RabbitMQ六种队列模式-简单队列模式

作者: 呆叔么么 | 来源:发表于2020-01-15 22:20 被阅读0次

    一.简单队列

    简单队列模式实际上就是work模式的一种特例,生产者和消费者都只有一个
    1.创建Virtual Hosts

    Virtual Hosts

    2.实现消息生产者
    2.1 添加依赖

            <dependency>
                <groupId>com.rabbitmq</groupId>
                <artifactId>amqp-client</artifactId>
                <version>5.7.3</version>
            </dependency>
    

    2.2 获取RabbitMQ连接
    cn.lovingliu.rabbitmq.util.ConnectionUtil

    package cn.lovingliu.rabbitmq.util;
    
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    /**
     * @Author:LovingLiu
     * @Description: 获取rabbitMq连接
     * @Date:Created in 2020-01-15
     */
    public class ConnectionUtil {
        public static Connection getConnection() throws IOException, TimeoutException {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("49.235.110.134");
            factory.setPort(5672);
            factory.setVirtualHost("test001_host");
            factory.setUsername("root");
            factory.setPassword("root");
    
            Connection connection = factory.newConnection();
            return connection;
        }
    }
    

    2.3 生产者
    生产者负责创建消息队列并发送消息入列,简单分为5步:

    • 获取连接
    • 创建通道
    • 创建队列声明
    • 发送消息
    • 关闭队列
      cn.lovingliu.rabbitmq.producer.Send
    package cn.lovingliu.rabbitmq.producer;
    
    
    import cn.lovingliu.common.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    
    
    /**
     * @Author:LovingLiu
     * @Description: 生产者发送消息
     * @Date:Created in 2020-01-15
     */
    public class Send {
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] args) throws Exception {
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.创建通道 */
            Channel channel = newConnection.createChannel();
            /** 3.创建队列声明 */
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            // channel.queueDeclare(QUEUE_NAME, true, false, false, null); // 持久化
            String msg = "我是生产者生成的消息";
            System.out.println("生产者发送消息:" + msg);
            /** 4.发送消息 */
            channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());
            // channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); // 持久化
            channel.close();
            newConnection.close();
        }
    }
    
    

    3.消费者
    消费者实现和生产者实现过程差不多,但是没有关闭连接和通道,是因为要消费者一直等待随时可能发来的消息,大致分为如下3步:

    • 获取连接
    • 获取通道
    • 监听队列
      cn.lovingliu.rabbitmq.consumer.MyConsumer
    package cn.lovingliu.rabbitmq.consumer;
    
    import com.rabbitmq.client.AMQP;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.DefaultConsumer;
    import com.rabbitmq.client.Envelope;
    
    import java.io.IOException;
    
    /**
     * @Author:LovingLiu
     * @Description: 自定义消费者。QueueingConsumer 已被废弃,建议使用继承DefaultConsumer的方式
     * @Date:Created in 2020-01-15
     */
    public class MyConsumer extends DefaultConsumer {
        public MyConsumer(Channel channel){
            super(channel);
        }
        @Override
        public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
            System.err.println("-----------consume message----------");
            System.err.println("consumerTag: " + consumerTag);
            System.err.println("envelope: " + envelope);
            System.err.println("properties: " + properties);
            System.err.println("body: " + new String(body));
        }
    }
    

    cn.lovingliu.rabbitmq.consumer.Recv

    package cn.lovingliu.rabbitmq.consumer;
    
    import cn.lovingliu.common.ConnectionUtil;
    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    /**
     * @Author:LovingLiu
     * @Description: 消费者消费消息
     * @Date:Created in 2020-01-15
     */
    public class Recv{
        /** 队列名称 */
        private final static String QUEUE_NAME = "test_queue";
    
        public static void main(String[] argv) throws Exception {
            /** 1.获取连接 */
            Connection newConnection = ConnectionUtil.getConnection();
            /** 2.获取通道 */
            Channel channel = newConnection.createChannel();
            channel.queueDeclare(QUEUE_NAME, false, false, false, null);
            /** 3.监听队列 */
            channel.basicConsume(QUEUE_NAME, true, new MyConsumer(channel));
        }
    }
    

    4.运行截图
    运行生产者



    运行消费者

    相关文章

      网友评论

          本文标题:RabbitMQ六种队列模式-简单队列模式

          本文链接:https://www.haomeiwen.com/subject/vratzctx.html