美文网首页prog
消息队列——了解一下

消息队列——了解一下

作者: lucode | 来源:发表于2018-07-05 23:31 被阅读162次

    本文将从以下几点展开


    image.png

    为什么使用消息队列?

    image.png

    消息队列的缺点

    image.png

    消息队列如何选型

    image.png

    RabbitMQ

    RabbitMQ 是使用Erlang编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它非常重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。

    阿里云MNS

    阿里云消息服务(Message Service,原MQS)是阿里云唯一商用的消息中间件服务。与传统的消息中间件不同,消息服务一开始就是基于阿里云自主研发的飞天分布式系统来设计和实现,具有大规模,高可靠、高并发访问和超强消息堆积能力的特点。消息服务API采用HTTP RESTful标准,接入方便,跨网络能力强;已全面接入资源访问控制服务(RAM)、专有网络(VPC),支持各种安全访问控制;接入云监控,提供完善的监控及报警机制。消息服务提供丰富的SDK、解决方案、最佳实践和7x24小时的技术支持,帮助应用开发者在应用组件之间自由地传递数据和构建松耦合、分布式、高可用系统。

    阿里云ONS / RocketMQ

    消息队列(Message Queue,简称MQ)是企业级互联网架构的核心服务,基于高可用分布式集群技术,搭建了包括发布订阅、接入、管理、监控报警等一套完整的高性能消息云服务,帮您实现分布式计算场景中所有异步解耦功能。经过多年积累,在交易、商品、营销等核心链路包括在双11场景下都有广泛使用,服务于阿里内部上千个核心应用,每天消息量达上千亿条,MQ由阿里巴巴集团中间件技术部自主研发,是原汁原味的阿里集团中间件技术精华之沉淀。

    Kafka

    Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统。具有以下特性:快速持久化,可以在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既可以达到10W/s的吞吐速率;完全的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡.

    Kafka的用户中包括LinkedIn, Yahoo, Twitter, Uber, PayPal, Airbnb, Tumblr等, 被用于日志收集, 离线分析, 实时分析, 消息管道等, 详情见 Powerd By Kafka

    Kafka官方提供了Java版本的客户端API, Kafka社区产生了多种语言的客户端, 包括PHP, Python, Go, C/C++, Ruby, NodeJS等, 详情见 Kafka 客户端列表

    Kafka Broker较为轻量, 不保存consumer的消费进度, 由consumer自己控制。 因此使用起来非常灵活, 可以针对不同场景定制不同的消费服务.

    • Exactly Once: 消费且仅消费一次
    • 回溯数据, 进行重复消费

    产品总结

    事务支持方面,ONS/RocketMQ较为优秀,但是不支持消息批量操作, 不保证消息至少被消费一次.
    Kafka提供完全分布式架构, 并有replica机制, 拥有较高的可用性和可靠性, 理论上支持消息无限堆积, 支持批量操作, 消费者采用Pull方式获取消息, 消息有序, 通过控制能够保证所有消息被消费且仅被消费一次. 但是官方提供的运维工具不友好,开源社区的运维工具支持的版本一般落后于最新版本的Kafka.
    目前使用的MNS服务,拥有HTTP REST API, 使用简单, 数据可靠性高, 但是不保证消息有序,不能回溯数据.
    RabbitMQ为重量级消息系统, 支持多协议(很多协议是目前业务用不到的), 但是不支持回溯数据, master挂掉之后, 需要手动从slave恢复, 可用性略逊一筹.

    如何保证消息队列的可用性

    首先单机是不可能的保证高可用的。
    以rcoketMQ为例,他的集群就有

    • 多master 模式
    • 多master多slave异步复制模式
    • 多 master多slave同步双写模式


      多master多slave模式部署架构图

      第一眼看到这个图,就觉得和kafka好像,只是NameServer集群,在kafka中是用zookeeper代替,都是用来保存和发现master和slave用的。通信过程如下:
      Producer 与 NameServer集群中的其中一个节点(随机选择)建立长连接,定期从 NameServer 获取 Topic 路由信息,并向提供 Topic 服务的 Broker Master 建立长连接,且定时向 Broker 发送心跳。Producer 只能将消息发送到 Broker master,但是 Consumer 则不一样,它同时和提供 Topic 服务的 Master 和 Slave建立长连接,既可以从 Broker Master 订阅消息,也可以从 Broker Slave 订阅消息。
      那么kafka呢,为了对比说明直接上kafka的拓补架构图


      image.png
      如上图所示,一个典型的Kafka集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

    如何保证消息不被重复消费

    换一个说法,如何保证消息队列的幂等性?
    另外说一点,幂等性的保证需要在一次请求中所有链路都是幂等的,再能最终保证这次请求的幂等,比如前段按钮点击两次,后端认为都是这是两次不同的请求,当然处理成两次请求,所以说一个请求的幂等性,需要全局的幂等才能保证。
    其实无论是那种消息队列,造成重复消费原因其实都是类似的。正常情况下,消费者在消费消息时候,消费完毕后,会发送一个确认信息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。只是不同的消息队列发送的确认信息形式不同。
    例如RabbitMQ是发送一个ACK确认消息,RocketMQ是返回一个CONSUME_SUCCESS成功标志,kafka实际上有个offset的概念,简单说一下(如果还不懂,出门找一个kafka入门到精通教程),就是每一个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。那造成重复消费的原因?,就是因为网络传输等等故障,确认信息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他的消费者。

    如何解决?这个问题针对业务场景来答分以下几点

    • (1)你拿到这个消息做数据库的insert操作。那就容易了,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
    • (2)你拿到这个消息做redis的set的操作,那就容易了,不用解决,因为你无论set几次结果都是一样的,set操作本来就算幂等操作。
    • (3)如果上面两种情况还不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

    如何保证消费的可靠传输

    其实这个可靠性传输,每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据。

    生产者丢数据

    从生产者弄丢数据这个角度来看,RabbitMQ提供transaction和confirm模式来确保生产者不丢消息。
    transaction(事物机制)机制就是说,发送消息前,开启事物(channel.txSelect()),然后发送消息,如果发送过程中出现什么异常,事物就会回滚(channel.txRollback()),如果发送成功则提交事物(channel.txCommit())。然而缺点就是吞吐量下降了。
    生产上用confirm模式的居多。一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的ID(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitMQ就会发送一个Ack给生产者(包含消息的唯一ID),这就使得生产者知道消息已经正确到达目的队列了.如果rabiitMQ没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。
    简单来讲 confirm模式就是生产者发送请求,到了消息队列,消息队列会回复一个消息收到的应答,如果没收到,生产者开始重试。

    消息队列丢数据

    处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前,rabbitMQ阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

    消费者丢数据

    消费者丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rahbitMQ会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息(但是消息队列那边已经认为消息被消费了),就会丢失该消息。
    至于解决方案,采用手动确认消息即可。
    kafka为例


    kafka Replication的数据流向图

    Producer在发布消息到某个Partition时,先通过ZooKeeper找到该Partition的Leader,然后无论该Topic的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每个Follower都从Leader中pull数据。

    针对上述情况,得出如下分析:
    (1)生产者丢数据
    在kafka生产中,基本都有一个leader和多个follwer。follwer会去同步leader的信息。因此,为了避免生产者丢数据,做如下两点配置
    第一个配置要在producer端设置acks=all。这个配置保证了,follwer同步完成后,才认为消息发送成功。
    在producer端设置retries=MAX,一旦写入失败,这无限重试

    (2)消息队列丢数据
    针对消息队列丢数据的情况,无外乎就是,数据还没同步,leader就挂了,这时zookpeer会将其他的follwer切换为leader,那数据就丢失了。针对这种情况,应该做两个配置。
    replication.factor参数,这个值必须大于1,即要求每个partition必须有至少2个副本min.insync.replicas参数,这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系这两个配置加上上面生产者的配置联合起来用,基本可确保kafka不丢数据

    (3)消费者丢数据
    这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了。再强调一次offset是干嘛的
    offset:指的是kafka的topic中的每个消费组消费的下标。简单的来说就是一条消息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset加一那里开始消费。
    比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务端记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。
    解决方案也很简单,改成手动提交即可。

    如何保证消息的顺序性

    回答:针对这个问题,通过某种算法,将需要保持先后顺序的消息放到同一个消息队列中(kafka中就是partition,rabbitMq中就是queue)。然后只用一个消费者去消费该队列。
    有的人会问:那如果为了吞吐量,有多个消费者去消费怎么办?
    这个问题,没有固定回答的套路。比如我们有一个微博的操作,发微博、写评论、删除微博,这三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是这时候,微博都还没发,写评论一定是失败的,等一段时间。等另一个消费者,先执行写评论的操作后,再执行,就可以成功。
    总之,针对这个问题,我的观点是保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。
    这点以后还需要研究

    相关文章

      网友评论

        本文标题:消息队列——了解一下

        本文链接:https://www.haomeiwen.com/subject/ebvmuftx.html