闲谈MQ

作者: wolf4j | 来源:发表于2018-03-29 11:41 被阅读192次

MQ,顾名思义:就是消息究竟被几个人得到,目前业界有很多的成形的mq,但大体可以分为以下几种:

一对一模型:

direct.png

首先一对一的消息队列,不管C(customer)是否存在,消息A是不能丢的,那么就有一个缺陷,内存会被存爆,所以我的消息需要实时性,其次一对一的消息队列是没有备份的,也就是当消息被取走的时候回给P端一个ACK,当失效的时候把消息记录在文件中。但是我消息的主体放在内存如果修改同一块内存,就会出现伪共享问题,可以尝试用volatile关键字(指Java语言中的)来处理(当然是对于同一台机器)。

其实只要你往队列中写入数据就避免不了伪共享的问题,也就是无法做到零拷贝。

image.png

那么还有一个问题,数据备份???????

为什么要做数据备份呢?现在假设一个场景,如图1所示,假设当我C1拿到数据,也回复给P端ACK了,ACK已经认为C端拿到了数据,但就在这时,C1宕机了,那么我的数据就丢失了,所以需要去做数据备份,但是与其说我去做数据备份,还不如我选择去做负载均衡,但是做负载均衡又有一个问题,我的数据不断负载给C1, C2 ,........依然无法保证数据不丢失啊,所以又有如下两种策略来解决这一问题:
(1) P端只发一次(问题:数据不会重,但是还是无法保证数据不丢失)
(2) 多次发,直到P端收到ACK(问题:没有回复给P端ACK,并不一定就是C死了,也有可能是网络阻塞,当网络畅通时,我本地可能备份了好多一样的数据)

难道就没有好的办法解决这个问题?当然,肯定是有的。

秘诀:变被动为主动,变推送为拉取

a . 变推送为拉取


image.png

我在队列中并不去存放真正的数据,我只放一个msg:1,我C去拿,拿到msg:1就代表P告诉C,
去redis中去拿取所要的数据,因为是主动去拿,所以我肯定知道数据的大小以及数据的详细信息,也就是说,在这条队列中,我只去推送通知,而不去推送数据。

b .变被动为主动:

image.png

当P往队列中写入一条数据时,我叫我的队列来通知C端,队列中有数据了,来拿吧,然后,我C去队列拿取数据,这才是正确的姿势,而不是P写一条数据,我C就必须去拿。

fanout模型

fanout.png

它的方式是,我往队列中丢一条数据,所有人想什么时候拿就什么时候拿,但我有一个失效周期,时间一过,队列中的数据就会消失,也称为扇出方式,能失效周期较长,所以这种模型用在磁盘比较合适(顺序存储),比较典型的例子是kafka队列(拿scala写的),那么我们来研究一下该模型的一些问题以及解决方式:

image.png

首先我们还是来考虑零拷贝的问题,我认为消息在某种程度上是没有只读的概念的,消息更多的是写的(当然需要先删除),那么,就肯定需要先来加锁,涉及到加锁,肯定先尝试使用无锁(CAS),我们可以在消息队列中维护一个atomic,(CAS)在任何人拿的时候把指针下移取走数据,每个人都去维护自己的指针offset,那么还有一个问题????

image.png

假设有一个瞬间,我C2突然死掉了,我里面堆积了很多数据还没有消费掉,当我C2再次启动的时候,你这部分数据已经消失了,但是你的指针还在A的位置,所以无法消费下一个(是空指针),怎么办呢?比较暴力的解决办法:

直接来一个死循环,比如这个姿势:

while(true){
   if(atomic->null){
      doSomething();
   }
}

但是这样肯定很慢(循环执行一次,需要等半天.........)

正确的解决途径:

还记得提到的那个offset么?就用它来解决,我们可以在消息消失的时候,记录一下消息的offset每次当我消费为null的时候,我就脚offest+1,指针就会移动到下一个位置。

那么,我们如何来保证数据不丢失呢?

  1. 数据在内存中(数据量小),那我在做冗备的过程中,数据往内存中写的时候,往硬盘上记录一份日志信息。
  2. 数据在硬盘上的时候,对硬盘做分区。
image.png

这样我数据可以放在不同的机器上,但是我没有办法保证顺序,分区并发与一致性本身就是互斥的一组关系。该如何解决呢?

image.png

加顺序号的思路貌似不错,那我多会儿才算完呢,我肯定有完的时候,所以我需要给每一个数据加一个包的大小这样,大小没够,我就知道内完其实就是引入了事物的概念(消息队列的消息事物),

这种存储方式,类似于hadoop的分布式存储,

Kafka可能出现的坑:

  1. 因为kafka的offset在zookeeper身上或者是etcd,所以你对他更新太频繁呢,他的速度就慢,更新不频繁呢?如果消费者宕机了,你重启,就需要重新去消费一遍,所以会有重复数据,所以没事做的时候,你需要自己记录一个offset,万一重启的时候,你需要把这个offset攻入zk或etcd中
  2. Kafka过一段时间它的数据就会被删除,所以你一段时间内没有消费或采集数据的话,需要去补踩大量的数据,而且他的删除是真的删除,可不是闹着玩的

topic订阅模式

image.png

这种模式具有如下特性:

  1. 有人订阅就拿,没人订阅就扔
  2. 必须现有生产者,后有消费者
  3. 谁订阅就给谁
  4. 在内存中使用(内存数据库),redis只实现了topic模式

所以topic就是先订阅我再给你扔,不订阅,数据就没,如果订阅者稍慢的时候,会作缓存,主要分为以下两种情形:

  1. 数据可丢,也就是消费端C消费不走,就没了。
  2. 数据不可丢,也就是不消费走,就一直驻留在内存。

所以这种队列的消费者是需要有通配符的.

image.png

也就是说我发送一条数据的时候,我应该算出来你发送的数据是那几个人注册(订阅)过的我先形成一个表,等我下次再往这个队列中发数据的时候,就会发给对应的那几个人

下面罗列一下业界比较成功的几个mq:

  1. rabbitMQ(兔子q):实现了direct、fanout、 topic三种模型,具有rpc的功能,erlang语言编写的,速度较快
  2. activeMQ:支持JMS(J2EE框架),支持AMQP协议。

这里大致聊一毛钱的AMQP协议:

image.png

a . 消费者只能去订阅属于自己的消息队列,即C1只能去queue1中去订阅。
b . 需要让P在交换机中看到消息:A1发给了queue1,消息:A2发给queue2。

  1. Disque:redis3.0的时候,把mq分离出来形成了disque,在源码中可以看到getjob底层做了一个重量级锁,速度很慢,C语言写的。
  2. MQTT协议:它的优势在于,字节数特别少,当网络拥挤的时候,扔个包就能出去,但是只能做topic。
  3. Mq也可以用来当抗压使用。

家境清寒,整理不易。

image.png

相关文章

  • 闲谈MQ

    MQ,顾名思义:就是消息究竟被几个人得到,目前业界有很多的成形的mq,但大体可以分为以下几种: 一对一模型: 首先...

  • MQ的使用及QMQ的设计

    1. 为什么要用MQ? MQ带来了什么好处? 带来了什么坏处? 为什么要用MQ?MQ(message queue)...

  • RocketMQ整体介绍

    1. MQ介绍 1.1 什么是MQ?为什么要用MQ? MQ:MessageQueue,消息队列。队列,是一种FIF...

  • MQ

    什么是MQ?MQ全称为Message Queue, 消息队列(MQ)是应用程序“对”应用程序的通信方法。MQ:生产...

  • RocketMQ集群消息与广播消费

    如图A项目的MQ组(mq.group=A) A项目部署了两台tomcatB项目的MQ组(mq.group=B)...

  • centos7 安装rabbitMQ 踩坑

    1、安装mq,首先mq以来erlang插件,所以必须安装 erlang 插件 首先在mq官网查看mq和erlang...

  • 消息队列

    MQ的作用 消息通信MQ的基础功能即为消息通信。使用MQ的客户端可以将消息发送到MQ中,也可以从MQ中消费消息。 ...

  • spring 使用 rabbit mq

    rabbit mq 安装 brew 安装 rabbit mq后台rabbit mq后台 用户名:guest 密码...

  • Prometheus监控MQ集群

    mq 为Rabbitmq 。 下载插件:rabbitmq_exporter(集群中所有的mq node)3台mq ...

  • MQ精华液,你值得更好(MQ青春定制)

    MQ 重塑紧致轮廓,绽现纯净光彩 MQ青春定制:MQ精华+MQ波导入+SMAS剥离术 青春之力注入生命之源 在技术...

网友评论

    本文标题:闲谈MQ

    本文链接:https://www.haomeiwen.com/subject/etlmcftx.html