美文网首页
RocketMq总结

RocketMq总结

作者: 知止9528 | 来源:发表于2019-01-22 21:52 被阅读2次

    架构图

    架构图.png

    基本概念

    Producer

    消息生产者,负责产生消息,一般由业务系统负责产生消息

    Consumer

    消息消费者,负责消费消息,一般是后台系统负责异步消费

    Broker

    消息中转角色,负责存储消息,转収消息,一般也称为 Server。

    NameServer

    类似于zookeeper


    概念模型

    image.png

    即消息是根据主题(即我们图里面的Topic)进行订阅,而每个Topic下面又可以有多个队列,只是这里的队列并不真正存储消息,而是起到类似索引的作用,消息真正存储在CommitLog里面,如下图


    RocketMq消息实际存储结构.png

    所有数据单独存储到一个 Commit Log,完全顺序写,随机读。对最终用户展现的队列实际只存储消息在 Commit Log 的位置信息

    Message Queue

    在RocketMQ中,所有消息队列都是持久化,长度无限的数据结构,所谓长度无限是指队列中的每个存储单元都是定长,访问其中的存储单元使用Offset来访问,offset为java long类型,64位,理论上在100年内不会溢出,所以认为是长度无限,另外队列中只保存最近几天的数据,之前的数据会按照过期时间来删除。也可以认为Message Queue是一个长度无限的数组,offset就是下标。

    这样做的好处
    (1)队列轻量化,单个队列数据量非常少
    (2)对磁盘的访问串行化,避免磁盘竟争,丌会因为队列增加导致 IOWAIT 增高

    这样做的缺点
    (1)写虽然完全是顺序写,但是读却发成了完全的随机读。
    (2)读一条消息,会先读 Consume Queue,再读 Commit Log,增加了开销。
    (3)要保证 Commit Log 不 Consume Queue 完全的一致,增加了编程的复杂度

    RocketMq的解决方案

    随机读(主要是指磁盘随机读),尽可能让读命中 PAGECACHE,减少 IO 读操作,所以内存越大越好。同时由于缓存的局部性原理,可以很快的在内存上读取到消息


    RocketMq里面的消息类型

    顺序消息

    消费消息的顺序要同収送消息的顺序一致,在 RocketMQ 中,主要指的是局部顺序,即一类消息为满足顺序性,必须 Producer 单线程顺序发送到同一个队列,返样 Consumer 就可以按照 Producer 发送的顺序去消费消息。

    普通顺序消息

    顺序消息的一种,正常情况下可以保证完全的顺序消息,但是一旦发生通信异常,Broker重启,由于队列总数发生发化,哈希取模后定位的队列会发化,产生短暂的消息顺序不一致。如果业务能容忍在集群异常情况(如某个Broker宕机或者重启)下,消息短暂的乱序,使用普通顺序方式比较合适。

    严格顺序消息

    严格顺序消息顺序消息的一种,无论正常异常情况都能保证顺序,但是牺牲了分布式Failover特性,即Broker集群中只要有一台机器不可用,则整个集群都不可用,服务可用性大大降低。如果服务器部署为同步双写模式,此缺陷可通过备机自动切换为主避免,不过仍然会存在几分钟的服务不可用。(依赖同步双写,主备自动切换,自动切换功能目前还未实现)目前已知的应用只有数据库binlog同步强依赖严格顺序消息,其他应用绝大部分都可以容忍短暂乱序,推荐使用普通的顺序消息。


    获取消息的方式

    • Broker主动进行推送至消费者
      缺点:消费者可能消费过慢造成堆积,同时如果有很多消费者对于Broker也是一件很繁重的事情

    • 长轮询
      即消费者会主动去拉取,缺点是可能获取不及时,但长轮询指的是我会多等一会,类似于长连接


    RocketMq里面消息的几种消费方式


    涉及到磁盘,就会有零拷贝,RocketMq也不例外,常用的零拷贝有如下两种方式

    内存映射

    image.png

    对应的java代码

    File file = new File("data.zip");
            RandomAccessFile raf = new RandomAccessFile(file, "rw");
            FileChannel fileChannel = raf.getChannel();
            MappedByteBuffer buffer = fileChannel.map(FileChannel.MapMode.READ_ONLY, 0, fileChannel.size());
    

    真正的零拷贝


    image.png

    对应的java代码

     File file = new File("test.zip");
            RandomAccessFile raf = new RandomAccessFile(file, "rw");
            FileChannel fileChannel = raf.getChannel();
            SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("", 1234));
            // 直接使用了transferTo()进行通道间的数据传输
            fileChannel.transferTo(0, fileChannel.size(), socketChannel);
    

    这两种方式的比较

    使用mmap + write方式
    优点:即使频繁调用,使用小块文件传输,效率也很高
    缺点:不能很好的利用DMA方式,会比sendfile多消耗CPU,内存安全性控制复杂,需要避免JVM Crash问题。

    使用sendfile方式
    优点:可以利用DMA方式,消耗CPU较少,大块文件传输效率高,无内存安全新问题。
    缺点:小块文件效率低于mmap方式,只能是BIO方式传输,不能使用NIO。

    RocketMq采用的是基于内存映射的方式,因为小块数据传输的更为频繁


    消息的持久化

    同步刷盘与异步刷盘.png

    异步刷盘
    写入到Page Cache后就立马返回了,然后再调用fsync函数异步的去将数据刷到磁盘

    优点

    效率高又快

    缺点

    断点或者重启,内存里面的数据还没来得及刷入到磁盘就没有了,所以会有丢消息的概率


    同步刷盘

    当然就是写入到Page Cache后就立马调用fsync函数立马刷入到磁盘

    优点

    可以做到不丢消息

    缺点

    当然就是牺牲性能了


    接着再来分析下 几种消费消息的方式

    At least Once

    是指每个消息必须投递一次,RocketMQConsumer先pull消息到本地,消费完成后,才吐服务器返回ack,如果没有消费一定不会ack消息,所以RocketMQ可以很好的支持此特性

    Exactly Only Once

    (1).发送消息阶段,不允许収送重复的消息。
    (2).消费消息阶段,不允许消费重复的消息。

    只有以上两个条件都满足情况下,才能称为消息是“Exactly Only Once”,而要实现以上两点,在分布式系统环境下,不可避免要产生巨大的开销。所以RocketMQ为了追求高性能,并不保证此特性,要求在业务上进行去重,也就是说消费消息要做到幂等性。RocketMQ虽然步能严格保证不重复,但是正常情冴下很少会出现重复収送、消费情况,只有网络异常,Consumer启停等异常情况下会出现消息重复。此问题的本质原因是网络调用存在不确定性,即不成功也不失败的第三种状态,所以才产生了消息重复性问题。

    定时消息

    定时消息是指消息发到Broker后,不能立刻被Consumer消费,要到特定的时间点或者等待特定的时间后才能被消费。如果要支持任意的时间精度,在Broker局面,必须要做消息排序,如果再涉及到持久化,那消息排序要不可避免的产生巨大性能开销。RocketMQ支持定时消息,但是不支持任意时间精度,支持特定的level,例如定时5s,10s,1m等。


    消息过滤

    RocketMQ的消息过滤方式有别于其他消息中间件,是在订阅时,再做过滤,先来看下Consume Queue的存储结构


    ConsumeQueue单个存储单元结构.png

    (1)在Broker端迕行Message Tag比对,先遍历Consume Queue,如果存储的Message Tag与订阅的Message Tag不符合,则跳过,继续比对下一个,符合则传输给Consumer。注意:Message Tag是字符串形式,Consume Queue中存储的是其对应的hashcode,比对时也是比对hashcode。(2).Consumer收到过滤后的消息后,同样也要执行在Broker端的操作,但是比对的是真实的Message Tag字符串,而不是Hashcode

    这么做的原因?
    (1)Message Tag存储Hashcode,是为了在Consume Queue定长方式存储,节约空间
    (2)过滤过程中不会访问Commit Log数据,可以保证堆积情况下也能高效过滤
    (3)即使存在Hash冲突,也可以在Consumer端进行修正,保证万无一失


    高可用

    谈到高可用,自然就想到集群,那么多台机器间消息的同步方式就有同步双写和异步复制两种

    异步复制

    异步复制的实现思路非常简单,Slave启劢一个线程,不断从Master拉取Commit Log中的数据,然后在异步build出Consume Queue数据结构。整个实现过程基本同Mysql主从同步类似。

    同步双写

    也类似于Mysql的半同步复制,即主上写完,其中一台从也要写完才统一返回给客户端ok.整体思想是类似的


    上面我们谈到RocketMq没有使用Zookeeper而是自己实现了NameServer

     public boolean initialize() {
                    .....        
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.routeInfoManager.scanNotActiveBroker();
                }
            }, 5, 10, TimeUnit.SECONDS);
    
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
    
                @Override
                public void run() {
                    NamesrvController.this.kvConfigManager.printAllPeriodically();
                }
            }, 1, 10, TimeUnit.MINUTES);
    
            .....
        }
    
    • 定时任务1:NameServer每隔10s扫描一次Broker,移除处于不激活状态的Broker
    • 定时任务2:NameServer每隔10分钟打印一次KV配置

    我们可以看到,集群其实就是维护心跳,这里面其实还有很多细节,还没看完,看完再更新吧


    Producer最佳实践
    发送消息注意事项
    (1)一个应用尽可能用一个Topic,消息子类型用tags来标识,tags可以由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才可以利用tags在broker做消息过滤。message.setTags("TagA");

    (2)每个消息在业务局面的唯一标识码,要设置到keys字段,方便将来定位消息丢失问题。服务器会为每个消息创建索引(哈希索引),应用可以通过topic,key来查询返条消息内容,以及消息被谁消费。由于是哈希索引,请务必保证key尽可能唯一,返样可以避免潜在的哈希冲突。// 订单IdString orderId = "20034568923546";message.setKeys(orderId);

    (3)消息发送成功或者失败,要打印消息日志,务必要打印sendresult和key字段。

    (4)send消息方法,只要不抛异常,就代表发送成功。但是发送成功会有多个状态,在sendResult里定义
    SEND_OK消息发送成功
    FLUSH_DISK_TIMEOUT:消息发送成功,但是服务器刷盘超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    FLUSH_SLAVE_TIMEOUT:消息发送成功,但是服务器同步到Slave时超时,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失
    SLAVE_NOT_AVAILABLE:消息发送成功,但是此时slave不可用,消息已经进入服务器队列,只有此时服务器宕机,消息才会丢失对与精确发送顺序消息的应用,由亍顺序消息的局限性,可能会涉及到主备自动切换问题,所以如果sendresult中的status字段不等于SEND_OK,就应该尝试重试。对于其他应用,则没有必要这样。
    (5)对于消息不可丢失应用,务必要有消息重发机制
    例如如果消息发送失败,存储到数据库,能有定时程序尝试重发,或者人工触发重发。

    Consumer最佳实践
    (1)将消息的唯一键,可以是msgId,也可以是消息内容中的唯一标识字段,例如订单Id等,消费之前判断是否在Db或(全局KV存储)中存在,如果不存在则插入,并消费,否则跳过。(实际过程要考虑原子性问题,判断是否存在可以尝试插入入,如果报主键冲突,则插入失败,直接跳过)msgId一定是全局唯一标识符,但是可能会存在同样的消息有两个不同msgId的情冴(有多种原因),返种情况可能会使业务上重复消费,建议最好使用消息内容中的唯一标识字段去重。
    2.使用业务局面的状态机去重

    具体可见幂等总结


    最后讲一下集群的搭建

    Master-Slave 方式

    1.服务器环境

    序号 IP 用户名 密码 角色 模式
    1 192.168.11.128 root *** nameServer1,brokerServer1 Master1
    2 192.168.11.129 root *** nameServer2,brokerServer2 Master2

    2.Hosts 添加信息

    IP NAME

    192.168.11.128 rocketmq-nameserver1
    192.168.11.128 rocketmq-master1
    192.168.11.129 rocketmq-nameserver2
    192.168.11.129 rocketmq-master1-slave

    vi /etc/hosts
    

    3.上传解压【两台机器】

    #   上传 apache-rocketmq.tar.gz 文件至/usr/local
    
    #   tar -zxvf apache-rocketmq.tar.gz -C /usr/local
    
    #   ln -s apache-rocketmq rocketmq ll /usr/local
    
    

    4.创建存储路径【两台机器】

    #   mkdir /usr/local/rocketmq/store
    
    #   mkdir /usr/local/rocketmq/store/commitlog
    
    #   mkdir /usr/local/rocketmq/store/consumequeue
    
    #   mkdir /usr/local/rocketmq/store/index
    
    

    5.RocketMQ 配置文件【两台机器】

    #   vim /usr/local/rocketmq/conf/2m-2s-async/broker-a.properties
    
    #   vim /usr/local/rocketmq/conf/2m-2s-async /broker-a-s.properties
    
    

    配置文件如下

    #所属集群名字
    
    brokerClusterName=rocketmq-cluster
    
    #broker 名字,注意此处不同的配置文件填写的不一样 brokerName=broker-a|broker-b
    
    #0 表示 Master,>0 表示 Slave
    
    brokerId=0
    
    #nameServer 地址,分号分割
    
    namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    #在发送消息时,自动创建服务器不存在的 topic,默认创建的队列数 defaultTopicQueueNums=4
    
    #是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 autoCreateTopicEnable=true
    
    #是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭 autoCreateSubscriptionGroup=true
    
    #Broker 对外服务的监听端口
    
    listenPort=10911
    
    #删除文件时间点,默认凌晨 4 点
    
    deleteWhen=04
    
    #文件保留时间,默认 48 小时
    
    fileReservedTime=120
    
    #commitLog 每个文件的大小默认 1G
    
    mapedFileSizeCommitLog=1073741824
    
    #ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整 mapedFileSizeConsumeQueue=300000 #destroyMapedFileIntervalForcibly=120000 #redeleteHangedFileInterval=120000 #检测物理文件磁盘空间
    
    diskMaxUsedSpaceRatio=88
    
    #存储路径
    
    storePathRootDir=/usr/local/rocketmq/store
    
    #commitLog 存储路径
    
    storePathCommitLog=/usr/local/rocketmq/store/commitlog
    
    #消费队列存储路径存储路径
    
    storePathConsumeQueue=/usr/local/rocketmq/store/consumequeue
    
    #消息索引存储路径
    
    storePathIndex=/usr/local/rocketmq/store/index
    
    #checkpoint 文件存储路径
    
    storeCheckpoint=/usr/local/rocketmq/store/checkpoint
    
    #abort 文件存储路径
    
    abortFile=/usr/local/rocketmq/store/abort
    
    #限制的消息大小
    
    maxMessageSize=65536
    
    #flushCommitLogLeastPages=4
    
    #flushConsumeQueueLeastPages=2
    
    #flushCommitLogThoroughInterval=10000
    
    #flushConsumeQueueThoroughInterval=60000
    
    #Broker 的角色
    
    #- ASYNC_MASTER 异步复制 Master
    
    #- SYNC_MASTER  同步双写 Master
    
    #- SLAVE
    
    brokerRole=ASYNC_MASTER
    
    #刷盘方式
    
    #- ASYNC_FLUSH  异步刷盘
    
    #- SYNC_FLUSH   同步刷盘
    
    flushDiskType=ASYNC_FLUSH
    
    #checkTransactionMessageEnable=false
    
    #发消息线程池数量
    
    #sendMessageThreadPoolNums=128
    
    #拉消息线程池数量
    
    #pullMessageThreadPoolNums=128
    
    

    6.修改日志配置文件【两台机器】

    #   mkdir -p /usr/local/rocketmq/logs
    
    #   cd /usr/local/rocketmq/conf && sed -i 's#${user.home}#/usr/local/rocketmq#g' *.xml
    
    

    7.修改启动脚本参数【两台机器】

    runbroker.sh脚本

    #   vim /usr/local/rocketmq/bin/runbroker.sh
    

    脚本如下
    修改为1个g就好了

    #开发环境    
    JVM Configuration JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
    

    runserver.sh脚本

    #   vim /usr/local/rocketmq/bin/runserver.sh
    
    JAVA_OPT="${JAVA_OPT} -server -Xms1g -Xmx1g -Xmn512m -XX:PermSize=128m - XX:MaxPermSize=320m"
    

    8.启动 NameServer【两台机器】

    #   cd /usr/local/rocketmq/bin
    
    #   nohup sh mqnamesrv &
    
    

    9.启动 BrokerServer A【192.168.11.128】

    #   cd /usr/local/rocketmq/bin
    #   nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a.properties
    #   netstat -ntlp
    #   jps
    #   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
    #   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
    

    10.启动 BrokerServer B【192.168.11.129】

    #   cd /usr/local/rocketmq/bin
    #   nohup sh mqbroker -c /usr/local/rocketmq/conf/2m-noslave/broker-a-s.properties
    #   netstat -ntlp
    #   jps
    #   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/broker.log
    #   tail -f -n 500 /usr/local/rocketmq/logs/rocketmqlogs/namesrv.log
    
    

    11.停止命令

    #   cd /usr/local/rocketmq/bin
    
    #   sh mqshutdown broker
    
    #   sh mqshutdown namesrv
    
    #   --等待停止
    
    #   rm -rf /usr/local/rocketmq/store
    
    #   mkdir /usr/local/rocketmq/store
    
    #   mkdir /usr/local/rocketmq/store/commitlog
    
    #   mkdir /usr/local/rocketmq/store/consumequeue
    
    #   mkdir /usr/local/rocketmq/store/index
    
    #   --按照上面步骤重启 NameServer 与 BrokerServer
    
    

    相关文章

      网友评论

          本文标题:RocketMq总结

          本文链接:https://www.haomeiwen.com/subject/hitdjqtx.html