美文网首页
读书《RabbitMQ实战》- 基础

读书《RabbitMQ实战》- 基础

作者: 陈菲TW | 来源:发表于2020-05-09 12:06 被阅读0次

    我们都知道,消息队列提供了一个异步通信机制,消息的发送者不用一直等待,直到消息被成功处理,而是立即返回。消息被暂存在队列中,对消息感兴趣的消费者会订阅消息并处理他们。他分隔了数据的发送与接收来接耦应用。

    回想一下项目中使用消息队列的场景:1)系统A中进行数据更新,两个customer信息发生了合并,该消息需要同步到多个下游系统,用消息的方式进行一对多的消息传递;2)系统B中新注册了一个customer,通过消息的方式告知系统A,这里,消息队列连接的系统A和系统B可以是完全不同的编程语言、技术栈。3)IBM消息总线:各种不同的消息先发送到message broker,message broker会进行消息的路由、格式转换、数据验证与清洗。

    此外,消息队列还可以用于存放作业,并让服务器为你做负载均衡、作业分配;消息通信也是分布式的解决方案。

    项目中用过的消息队列系统包括:JBoss Messaging、IBM MQ、Apache ActiveMQ。不过IBM的收费较高,所以趋势是从IBM MQ向开源的Active MQ、Rabbit MQ迁移。

    项目中的消息包括:correlationID,用于串联消息处理的整个生命周期;source、destination等。

    RabbitMQ也是一种开源的消息队列系统,实现了AMQP协议,可以方便的和spring集成。

    第一章、兔子🐰诞生 - Rabbit MQ

    1. 发布订阅模式 pub-sub出现了。这种生产者消费者模式和服务器客户端模式是不同的。

    2. JMS java message service诞生,试图通过提供公共的java API,隐藏单独MQ供应商提供的实际接口,从而跨越商业壁垒。从技术上来讲,java应用只需针对JMS API编程,选择合适的MQ驱动即可。

    3. AMQP诞生,即高级消息队列协议。

    4. Rabbit MQ诞生,基于Erlang语言,实现了AMQP标准。相当于软件界的路由器,类似于传输层。

    第二章、理解消息通信

    2.1 消费者和生产者

    我们把Rabbit理解为投递服务,就像顺丰一样,应用程序可以发送和接收包裹,Rabbit则承担路由器的角色。消息包含标签和有效载荷,Rabbit没有路由表,而是根据标签决定包裹发给谁,是一种fire-and-forget模式。

    消息通信中的角色包括:生产者、消费者和Rabbit。其中Rabbit又包含:

    1)信道channel:服务必须连接到Rabbit,才能发布、接收消息。这里的连接是一个TCP连接,一旦TCP连接打开,就会创建一个信道,信道是建立在真实TCP连接内的虚拟连接。为什么不直接用TCP连接而采用信道呢?因为对操作系统来说创建、销毁TCP连接是很昂贵的。当一个服务有多个线程连接到Rabbit消费消息,如果每个线程使用一个TCP连接,TCP连接数很快被耗尽;所有线程使用一个TCP连接的多个信道,既保证性能又保证线程的信息隔离。线程启动时,就在当前连接上创建一个信道,一个TCP连接承载的信道数是没有限制的。

    2)Rabbit代理服务器

    举个例子,对于一个停车系统,收到查询某车停放位置的请求消息,那么需要返回汽车位置的应答消息。chan_recv用于接收请求消息,收到消息后查找车辆位置,并创建新线程来返回应答,通过chan_sendX发送给Rabbit,X指的是线程ID。

    2.2 从底层开始构造:队列

    AMQP消息路由的三部分:交换器、队列、绑定。生产者把消息发布到交换器,消息最终达到队列并被接受者接收,绑定决定了消息如何从交换器路由到队列。

    队列就如同具名邮箱,消息到达队列并等待消费。队列又像是菜鸟驿站,为消息提供住所。对负载均衡来说,队列是绝佳方案,只需附加一堆消费者,并让Rabbit以循环方式分配给各个消费者。

    2.2.1 消费者从队列接收消息

    消费者通过两种模式从队列中接收消息:1)AMQP的basic.consume代表持续订阅;2)AMQP的basic.get代表只读一条消息就取消订阅。

    消费者确认接收消息后,消息在队列里删除。这里注意,消费者收到的每条消息都必须进行确认,通过AMQP的basic.ack命令显式的向RabbitMQ发送确认;或者在订阅到队列时就把auto_ack设置为true,这样一旦消费者接受了消息,Rabbit自动视其确认了消息。如果确认之前连接断开或取消了订阅,Rabbit会认为消费者没有收到消息,从而保留消息在队列中;如果没有断开连接但没有收到确认消息,Rabbit将不会给消费者发送更多消息,也就是说如果消息处理费时,我们可以在消息处理完成后才发送确认消息来防止过载。

    当多个消费者订阅了队列,那么消息会循环发送给各个消费者。

    basic.reject用于拒绝一条消息,可以通过配置参数让Rabbit保留或删除该消息,比如对于一条格式错误的消息,无论如何都无法正常处理的,可以用这个命令。这些被拒绝并建议删除的消息会被放在dead letter消息队列中,这个特殊的队列让我们通过检测坏消息来发现问题。

    2.2.2 创建队列

    生产者和消费者都可以通过queue.declare命令来创建队列,需要指定队列名。其他参数:

    1)exclusive:私有,也就是这个队列只有你的应用可以消费。

    2)auto delete:没有消费者则删除队列

    2.3 交换器和绑定

    前面我们说队列就像菜鸟驿站,那么消息是如何到达队列的呢?答案是交换器和绑定。

    当消息到达代理服务器,消息将拥有一个路由键,和绑定使用的路由键进行比对,如果有匹配项则进入相应队列,否则进入黑洞。

    四种类型的交换器:direct、fanout、topic、headers,每种对应着不同的路由算法。header交换器允许你匹配消息的header而非路由键,其他和direct一样,但性能不好,因此不常用。

    1)direct:当创建一个队列时,自动绑定到交换器,路由键是队列名。当生产者调用channel.basic_publish(msg, '', 'queue_name')时,就可以完成发送消息。direct是默认交换器,可声明使用其他交换器。

    2)fanout:把收到的消息广播到绑定的所有队列上。比如一个web应用需要在用户上传图片时晴空相册缓存,同时给予积分奖励, 那么可以绑定两个队列到fanout交换器;当用户上传图片消息到达交换器后,广播给两个队列,两个消费者完成各自的任务;如果后续有其他需求,只需要新建consumer和队列,接入到fanout交换器即可。

    3)topic:把不同生产者的消息发送到同一个队列。比如一个日志系统,我的应用有3个模块,每个模块都会产生不同等级的日志。然后然后消费者希望拿到所有模块的error等级的日志,可以在配置绑定时通过带有通配符的route key实现。channel.queueBind('queueName', 'exchange name', 'route key'),而生产者发布消息会指定具体的route key。

    2.4 多租户模式:虚拟主机和隔离

    每个Rabbit主机都能创建虚拟消息服务器,成为vhost,本质上是一个mini版的Rabbit服务器,有自己的交换器、绑定、队列和权限配置。这让我们能够使用一个Rabbit服务器服务于多个应用,而不用担心应用A删除了应用B的消息队列,也可以避免队列和交换器命名冲突。

    在连接时需要指定vhost,Rabbit提供默认的vhost是“/”。vhost还能保证可移植性,比如应用A和应用B运行在同一台Rabbit的不同vhost,有一天,应用A的负载变大,就可以安全的迁移到新的Rabbit服务器。另外,在Rabbit集群上创建vhost时,整个集群的rabbit服务器都会创建vhost。

    vhost通过rabbit的rabbitmqctl进行管理,包含创建、删除、查看。

    2.5 保证消息投递 - 消息持久化

    默认情况下,重启Rabbit服务器,里面的交换器和队列会消失,因为他们的durable属性默认为false,可以设置为true让Rabbit服务器重启后重建交换器和队列。

    能从服务器崩溃中恢复的消息成为持久化消息。生产者发布消息之前,把消息的delivery mode设置为2,表示持久化;之后还需要发布到持久化的交换器并进入到持久化队列。

    消息持久化是通过把消息写入到磁盘中的持久化日志文件中实现的。如果持久化消息被消费,则在日志中标记为等待垃圾回收。持久化消息的代价是性能,因为涉及到磁盘IO,而且极大减少Rabbit可处理的消息总数。

    也有其他方式保证消息投递,比如生产者发送消息的同时都包含应答队列的名称,这样接受者就可以回发应答表示收到了,如果一定时间范围内没有收到应答,生产者就重新发送消息。

    另一种方案是准备两套集群,一套处理非持久化消息通信,另一套处理持久化消息。

    AMQP事务:事务填补了生产者发布消息和RabbitMQ将他们提交到磁盘这两者最后1公里的差距。这个事务对性能的损害也是很大,不是很推荐。

    更好的消息投递是采用生产者确认模式,把send信道设置为confirm模式,所有在该信道上发布的消息都会有一个ID,一旦消息成功投递,信道会发送确认消息到生产者。该模式的最大好处是异步,一旦发布了一条消息,生产者不会一直等待确认消息,而是会继续发送其他消息;收到消息后采用回调的方式处理。如果消息确认丢失,信道会发送nack消息给生产者。

    2.6 一条消息的一生

    从代码层面来看,生产者需要经过这几个步骤:1)建立到代理服务器的连接host、port、username、password;2)获取信道;3)声明交换器;4)创建并发送消息;5)关闭连接。

    那么消费者呢?1)建立到代理服务器的连接;2)获取信道;3)声明交换器和队列,这里的声明declare的语义是如果不存在就创建,存在则直接使用;4)把队列和交换器绑定起来;5)消费消息;6)关闭连接。

    生产者 消费者

    相关文章

      网友评论

          本文标题:读书《RabbitMQ实战》- 基础

          本文链接:https://www.haomeiwen.com/subject/qekhnhtx.html