美文网首页
rabbitmq消息队列入门

rabbitmq消息队列入门

作者: qiuxiaojie | 来源:发表于2019-04-18 22:11 被阅读0次

    介绍

    rabbitmq是一个消息代理,它接收和转发消息,类似一个邮局,把你投递的邮件送给指定收件人。
    相关术语:

    • producing: 消息生产者,用于发送消息
    • queue: 队列,用于存储消息
    • consuming: 消息消费者,用于接收消息

    HelloWorld

    P为生产者,C是消费者,中间的框是队列,消息的缓冲区。


    image.png

    消息发送

    image.png

    send.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try {
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个频道,用于复用连接
                Channel channel = connection.createChannel();
                // 声明消息发送的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                // 往队列中发出一条消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("[x] Sent'" + message + "'");
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    消息接收

    image.png

    Recv.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Send {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try {
                // 创建一个连接
                Connection connection = factory.newConnection();
                // 创建一个频道,用于复用连接
                Channel channel = connection.createChannel();
                // 指定发送的队列
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                String message = "Hello World!";
                // 往队列中发出一条消息
                channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
                System.out.println("[x] Sent'" + message + "'");
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    Work Queues

    image.png

    在HelloWorld中写了发送/接收消息的程序,现在我们创建一个Work Queues(也称为Task Queues),来在多个耗时的消息之间分配任务。

    NewTask.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.MessageProperties;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class NewTask {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                // durable true 持久化
                channel.queueDeclare(QUEUE_NAME, true, false, false, null);
                // 发送当前时间
                String message = String.valueOf(System.currentTimeMillis());
                // PERSISTENT_TEXT_PLAIN 持久化
                channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
                System.out.println("[x] Sent'" + message + "'");
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    声明消息持久化后rabbitmq宕机也能从存储中恢复消息。

    Worker.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    import java.io.IOException;
    import java.util.concurrent.TimeoutException;
    
    public class Worker {
    
        private static final String QUEUE_NAME = "hello";
    
        public static void main(String[] args) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try {
                Connection connection = factory.newConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(QUEUE_NAME, false, false, false, null);
                System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
                // 未回复消息处理完,消息队列不会给它发新的消息
                channel.basicQos(1);
                DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                    String message = new String(delivery.getBody(), "UTF-8");
                    System.out.println(" [x] Received '" + message + "'");
    
                    try {
                        doWork();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println("[x] Done");
                        channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    }
                };
                // 手动确认
                boolean autoAck = false;
                channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> { });
            } catch (IOException | TimeoutException e) {
                e.printStackTrace();
            }
        }
    
        private  static  void  doWork () throws InterruptedException {
            Thread.sleep(1000);
        }
    
    }
    

    设置prefetchCount为1,在没有处理完一条消息的时候,消息队列不会給它继续下发消息,在它确认完消息后,消息队列继续下发新消息。


    image.png

    发送/订阅

    在上面我们把消息发给相同的一个消费者,现在把消息发送给多个消费者,这种模式称为发布订阅模式。为了演示这种模式,我们创建一个简单的日志记录系统,生产者发出日志,消费者接收并打印它们,发布的消息将被广播给所有消费者。

    Exchange (交换机)

    前面简单的展示了如何接收发送消息,现在介绍完整的rabbitmq概念。简单重复一下前面介绍的内容:

    • 生产者是发送消息的程序
    • 队列是消息的缓冲器
    • 消费者是接收处理消息的程序

    rabbitmq消息模型的核心思想是,生产者从来不会直接发送消息给一个队列。又或者说生产者甚至不知道它的消息将会发送到哪个队列。

    生产者只能发送消息给一个交换机。交换机是一个很简单的概念。它接收生产者的消息,然后推送消息到队列中。交换机必须明确知道自己要对接收到的消息进行何种处理: 是推送到特定队列,还是推送到所有的队列还是直接丢弃,这些规则由交换机的类型来定义。


    image.png

    有许多可供选择的交换机类型: direct、topic、headers、fanout。 我详细介绍fanout。创建一个fanout类型的交换机,称它为logs。

    EmitLog.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    
    public class EmitLog {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            try (Connection connection = factory.newConnection();
                 Channel channel = connection.createChannel()) {
                // fanout广播模式,会广播所有接收到的消息给所有它的已知队列
                channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
    
                String message = argv.length < 1 ? "info: Hello World!" :
                        String.join(" ", argv);
    
                channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));
                System.out.println(" [x] Sent '" + message + "'");
            }
        }
    
    }
    

    ReveiveLogs.java

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import com.rabbitmq.client.DeliverCallback;
    
    public class ReceiveLogs {
    
        private static final String EXCHANGE_NAME = "logs";
    
        public static void main(String[] argv) throws Exception {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost("localhost");
            Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();
    
            channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
            // 临时队列,非持久化的,唯一的,断开连接自动删除的并且随机名称的队列
            String queueName = channel.queueDeclare().getQueue();
            // 绑定队列和交换机,告诉交换机给我们发送消息,如果没有绑定到交换机上,消息会丢失
            channel.queueBind(queueName, EXCHANGE_NAME, "");
    
            System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    
            DeliverCallback deliverCallback = (consumerTag, delivery) -> {
                String message = new String(delivery.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            };
            channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { });
        }
    
    }
    

    相关文章

      网友评论

          本文标题:rabbitmq消息队列入门

          本文链接:https://www.haomeiwen.com/subject/neyxgqtx.html