RabbitMQ

作者: 小和大大 | 来源:发表于2023-02-26 08:21 被阅读0次

    1.初识MQ
    1.1.同步和异步通讯
    微服务间通讯有同步和异步两种方式:

    同步通讯:就像打电话,需要实时响应。

    异步通讯:就像发邮件,不需要马上回复。

    两种方式各有优劣,打电话可以立即得到响应,但是你却不能跟多个人同时通话。发送邮件可以同时与多个人收发邮件,但是往往响应会有延迟。

    1.1.1.同步通讯
    我们之前学习的Feign调用就属于同步方式,虽然调用可以实时得到结果,但存在下面的问题:

    总结:

    同步调用的优点:

    时效性较强,可以立即得到结果
    同步调用的问题:

    耦合度高
    性能和吞吐能力下降
    有额外的资源消耗
    有级联失败问题
    1.1.2.异步通讯
    异步调用则可以避免上述问题:

    我们以购买商品为例,用户支付后需要调用订单服务完成订单状态修改,调用物流服务,从仓库分配响应的库存并准备发货。

    在事件模式中,支付服务是事件发布者(publisher),在支付完成后只需要发布一个支付成功的事件(event),事件中带上订单id。

    订单服务和物流服务是事件订阅者(Consumer),订阅支付成功的事件,监听到事件后完成自己业务即可。

    为了解除事件发布者与订阅者之间的耦合,两者并不是直接通信,而是有一个中间人(Broker)。发布者发布事件到Broker,不关心谁来订阅事件。订阅者从Broker订阅事件,不关心谁发来的消息。

    Broker 是一个像数据总线一样的东西,所有的服务要接收数据和发送数据都发到这个总线上,这个总线就像协议一样,让服务间的通讯变得标准和可控。

    好处:

    吞吐量提升:无需等待订阅者处理完成,响应更快速

    故障隔离:服务没有直接调用,不存在级联失败问题

    调用间没有阻塞,不会造成无效的资源占用

    耦合度极低,每个服务都可以灵活插拔,可替换

    流量削峰:不管发布事件的流量波动多大,都由Broker接收,订阅者可以按照自己的速度去处理事件

    缺点:

    架构复杂了,业务没有明显的流程线,不好管理
    需要依赖于Broker的可靠、安全、性能
    好在现在开源软件或云平台上 Broker 的软件是非常成熟的,比较常见的一种就是我们今天要学习的MQ技术。

    1.2.技术对比:
    MQ,中文是消息队列(MessageQueue),字面来看就是存放消息的队列。也就是事件驱动架构中的Broker。

    比较常见的MQ实现:

    ActiveMQ
    RabbitMQ
    RocketMQ
    Kafka
    几种常见MQ的对比:

    RabbitMQ ActiveMQ RocketMQ Kafka
    公司/社区 Rabbit Apache 阿里 Apache
    开发语言 Erlang Java Java Scala&Java
    协议支持 AMQP,XMPP,SMTP,STOMP OpenWire,STOMP,REST,XMPP,AMQP 自定义协议 自定义协议
    可用性 高 一般 高 高
    单机吞吐量 一般 差 高 非常高
    消息延迟 微秒级 毫秒级 毫秒级 毫秒以内
    消息可靠性 高 一般 高 一般
    追求可用性:Kafka、 RocketMQ 、RabbitMQ

    追求可靠性:RabbitMQ、RocketMQ

    追求吞吐能力:RocketMQ、Kafka

    追求消息低延迟:RabbitMQ、Kafka

    2.快速入门
    2.1.安装RabbitMQ
    安装RabbitMQ,参考地址

    MQ的基本结构:

    RabbitMQ中的一些角色:

    publisher:生产者
    consumer:消费者
    exchange个:交换机,负责消息路由
    queue:队列,存储消息
    virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离
    2.2.RabbitMQ消息模型
    RabbitMQ官方提供了5个不同的Demo示例,对应了不同的消息模型:

    2.3.导入Demo工程
    课前资料提供了一个Demo工程,mq-demo:

    导入后可以看到结构如下:

    包括三部分:

    mq-demo:父工程,管理项目依赖
    publisher:消息的发送者
    consumer:消息的消费者
    2.4.入门案例
    简单队列模式的模型图:

    官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

    publisher:消息发布者,将消息发送到队列queue
    queue:消息队列,负责接受并缓存消息
    consumer:订阅队列,处理队列中的消息
    2.4.1.publisher实现
    思路:

    建立连接
    创建Channel
    声明队列
    发送消息
    关闭连接和channel
    代码实现:

    package cn.itcast.mq.helloworld;

    import com.rabbitmq.client.Channel;
    import com.rabbitmq.client.Connection;
    import com.rabbitmq.client.ConnectionFactory;
    import org.junit.Test;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class PublisherTest {
    @Test
    public void testSendMessage() throws IOException, TimeoutException {
    // 1.建立连接
    ConnectionFactory factory = new ConnectionFactory();
    // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
    factory.setHost("192.168.150.101");
    factory.setPort(5672);
    factory.setVirtualHost("/");
    factory.setUsername("itcast");
    factory.setPassword("123321");
    // 1.2.建立连接
    Connection connection = factory.newConnection();

        // 2.创建通道Channel
        Channel channel = connection.createChannel();
    
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
    
        // 4.发送消息
        String message = "hello, rabbitmq!";
        channel.basicPublish("", queueName, null, message.getBytes());
        System.out.println("发送消息成功:【" + message + "】");
    
        // 5.关闭通道和连接
        channel.close();
        connection.close();
    
    }
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    2.4.2.consumer实现
    代码思路:

    建立连接
    创建Channel
    声明队列
    订阅消息
    代码实现:

    package cn.itcast.mq.helloworld;

    import com.rabbitmq.client.*;

    import java.io.IOException;
    import java.util.concurrent.TimeoutException;

    public class ConsumerTest {

    public static void main(String[] args) throws IOException, TimeoutException {
        // 1.建立连接
        ConnectionFactory factory = new ConnectionFactory();
        // 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
        factory.setHost("192.168.150.101");
        factory.setPort(5672);
        factory.setVirtualHost("/");
        factory.setUsername("itcast");
        factory.setPassword("123321");
        // 1.2.建立连接
        Connection connection = factory.newConnection();
    
        // 2.创建通道Channel
        Channel channel = connection.createChannel();
    
        // 3.创建队列
        String queueName = "simple.queue";
        channel.queueDeclare(queueName, false, false, false, null);
    
        // 4.订阅消息
        channel.basicConsume(queueName, true, new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope,
                                       AMQP.BasicProperties properties, byte[] body) throws IOException {
                // 5.处理消息
                String message = new String(body);
                System.out.println("接收到消息:【" + message + "】");
            }
        });
        System.out.println("等待接收消息。。。。");
    }
    

    }

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    2.5.总结
    基本消息队列的消息发送流程:

    建立connection

    创建channel

    利用channel声明队列

    利用channel向队列发送消息

    基本消息队列的消息接收流程:

    建立connection

    创建channel

    利用channel声明队列

    定义consumer的消费行为handleDelivery()

    利用channel将消费者与队列绑定
    ————————————————
    版权声明:本文为CSDN博主「look-word」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/qq_50975965/article/details/122931961

    相关文章

      网友评论

          本文标题:RabbitMQ

          本文链接:https://www.haomeiwen.com/subject/viftldtx.html