美文网首页
RabbitMQ发布/订阅模式(Publish/Subscrib

RabbitMQ发布/订阅模式(Publish/Subscrib

作者: 裂开的汤圆 | 来源:发表于2020-11-20 18:53 被阅读0次

模型

模型解读:
1.一个生产者,多个消费者
2.每个消费者都有属于自己的队列
3.生产者没有直接将消息发送到队列中,而是发送到转发器(exchange),再由转发器分发到队列中去
4.订阅模式可以实现,一个消息被多个消费者消费

生产者代码

public class Send {
    
    private static final String EXCHANGE_NAME = "exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();

        // 声明交换机
        // fanout:分发类型,表示绑定到这个交换机的队列都收到这个消息
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        // 发送消息
        String msg = "hello publish model";
        channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes());

        channel.close();
        connection.close();
    }

}

消费者代码

public class Rec1 {

    private static final String QUEUE_NAME = "publish_queue_1";
    private static final String EXCHANGE_NAME = "exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Connection connection = ConnectionUtils.getConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);

        // 将队列绑定到交换机上
        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");

        // 接收到消息后的回调函数
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String message = new String(delivery.getBody(), "UTF-8");
            System.out.println(LocalDateTime.now().toString() + " [x] Received '" + message + "'");
        };

        // 监听队列,每当队列中接收到新消息后会触发回调函数
        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });

    }
}

可以在RabbitMQ管理后台中查看该交换机

总结

对比简单消息队列以及发布/订阅模式消息队列,我们可以发现,在发布/订阅模式中,生产者通道需要绑定到交换机上,而不能直接绑定到队列上。消费者的队列需要绑定到交换机上,统一由交换机复制数据的派发,可以参考文章一开始的模型图加深印象

相关文章

网友评论

      本文标题:RabbitMQ发布/订阅模式(Publish/Subscrib

      本文链接:https://www.haomeiwen.com/subject/tmaccktx.html