RocketMQ

作者: 索伦x | 来源:发表于2019-03-18 10:12 被阅读0次

    什么是 MQ?

    MQ全称为Message Queue, 消息队列(MQ)是一种应用程序对应用程序的通信方法。应用程序通过写和检索出入列队的针对应用程序的数据(消息)来通信,而无需专用连接来链接它们。消息传递指的是程序之间通过在消息中发送数据进行通信,而不是通过直接调用彼此来通信,直接调用通常是用于诸如远程过程调用的技术。

    MQ的好处
    1. 实现开发语言间的解耦,比如生产者是Java,消费者是.NET
    2. 实现高并发场景下的分布式线程池
    3. 将无需即时返回且耗时的操作进行异步处理,比如短信群发
    有 Broker 的 MQ

    这个流派通常有一台服务器作为 Broker,所有的消息都通过它中转。生产者把消息发送给它就结束自己的任务了,Broker 则把消息主动推送给消费者(或者消费者主动轮询)

    无 Broker 的 MQ

    无 Broker 的 MQ 的代表是 ZeroMQ。该作者非常睿智,他非常敏锐的意识到——MQ 是更高级的 Socket,它是解决通讯问题的。所以 ZeroMQ 被设计成了一个“库”而不是一个中间件,这种实现也可以达到——没有 Broker 的目的

    image

    节点之间通讯的消息都是发送到彼此的队列中,每个节点都既是生产者又是消费者。ZeroMQ 做的事情就是封装出一套类似于 Socket 的 API 可以完成发送数据,读取数据

    ZeroMQ 其实就是一个跨语言的、重量级的 Actor 模型邮箱库。你可以把自己的程序想象成一个 Actor,ZeroMQ 就是提供邮箱功能的库;ZeroMQ 可以实现同一台机器的 RPC 通讯也可以实现不同机器的 TCP、UDP 通讯,如果你需要一个强大的、灵活、野蛮的通讯能力,别犹豫 ZeroMQ

    附:Queue 和 Topic 的区别

    • Queue:一个发布者发布消息,下面的接收者按队列顺序接收,比如发布了 10 个消息,两个接收者 A,B 那就是 A,B总共会收到 10 条消息,不重复。
    • Topic:一个发布者发布消息,有两个接收者 A,B 来订阅,那么发布了 10 条消息,A,B各收到10 条消息。
    类型 Topic Queue
    概要 Publish Subscribe Messaging 发布订阅消息 Point-to-Point 点对点
    有无状态 Topic 数据默认不落地,是无状态的。 Queue 数据默认会在 MQ 服务器上以文件形式保存,比如 ActiveMQ 一般保存在?$AMQ_HOME\data\kr-store\data?下面。也可以配置成 DB 存储。
    完整性保障 并不保证 Publisher 发布的每条数据,Subscriber 都能接受到。 Queue 保证每条数据都能被 Receiver 接收。
    消息是否会丢失 一般来说 Publisher 发布消息到某一个 Topic 时,只有正在监听该 Topic 地址的 Sub 能够接收到消息;如果没有 Sub 在监听,该 Topic 就丢失了。 Sender 发送消息到目标 Queue,Receiver 可以异步接收这个 Queue 上的消息。Queue 上的消息如果暂时没有 Receiver 来取,也不会丢失。
    消息发布接收策略 一对多的消息发布接收策略,监听同一个 Topic 地址的多个 Sub 都能收到 Publisher 发送的消息。Sub 接收完通知 MQ 服务器 一对一的消息发布接收策略,一个 Sender 发送的消息,只能有一个 Receiver 接收。Receiver 接收完后,通知 MQ 服务器已接收,MQ 服务器对 Queue 里的消息采取删除或其他操作。

    RocketMQ 简介

    概述

    消息队列作为高并发系统的核心组件之一,能够帮助业务系统解构提升开发效率和系统稳定性。主要具有以下优势:

    • 削峰填谷:主要解决瞬时写压力大于应用服务能力导致消息丢失、系统奔溃等问题
    • 系统解耦:解决不同重要程度、不同能力级别系统之间依赖导致一死全死
    • 提升性能:当存在一对多调用时,可以发一条消息给消息系统,让消息系统通知相关系统
    • 蓄流压测:线上有些链路不好压测,可以通过堆积一定量消息再放开来压测

    RocketMQ

    Apache Alibaba RocketMQ 是一个消息中间件。消息中间件中有两个角色:消息生产者和消息消费者。RocketMQ 里同样有这两个概念,消息生产者负责创建消息并发送到 RocketMQ 服务器,RocketMQ 服务器会将消息持久化到磁盘,消息消费者从 RocketMQ 服务器拉取消息并提交给应用消费。

    RocketMQ 特点

    RocketMQ 是一款分布式、队列模型的消息中间件,具有以下特点:

    • 支持严格的消息顺序
    • 支持 Topic 与 Queue 两种模式
    • 亿级消息堆积能力
    • 比较友好的分布式特性
    • 同时支持 Push 与 Pull 方式消费消息
    • 历经多次天猫双十一海量消息考验

    RocketMQ 优势

    目前主流的 MQ 主要是 RocketMQ、kafka、RabbitMQ,其主要优势有:

    • 支持事务型消息(消息发送和 DB 操作保持两方的最终一致性,RabbitMQ 和 Kafka 不支持)
    • 支持结合 RocketMQ 的多个系统之间数据最终一致性(多方事务,二方事务是前提)
    • 支持 18 个级别的延迟消息(RabbitMQ 和 Kafka 不支持)
    • 支持指定次数和时间间隔的失败消息重发(Kafka 不支持,RabbitMQ 需要手动确认)
    • 支持 Consumer 端 Tag 过滤,减少不必要的网络传输(RabbitMQ 和 Kafka 不支持)
    • 支持重复消费(RabbitMQ 不支持,Kafka 支持)

    消息队列对比参照表

    image

    基于 Docker 安装 RocketMQ

    docker-compose.yml

    ** 注意:启动 RocketMQ Server + Broker + Console 至少需要 2G 内存 **

    version: '3.3'
    services:
      rmqnamesrv:
        image: foxiswho/rocketmq:server
        container_name: rmqnamesrv
        ports:
          - 9876:9876
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
        networks:
            rmq:
              aliases:
                - rmqnamesrv
    
      rmqbroker:
        image: foxiswho/rocketmq:broker
        container_name: rmqbroker
        ports:
          - 10909:10909
          - 10911:10911
        volumes:
          - ./data/logs:/opt/logs
          - ./data/store:/opt/store
          - ./data/brokerconf/broker.conf:/etc/rocketmq/broker.conf
        environment:
            NAMESRV_ADDR: "rmqnamesrv:9876"
            JAVA_OPTS: " -Duser.home=/opt"
            JAVA_OPT_EXT: "-server -Xms128m -Xmx128m -Xmn128m"
        command: mqbroker -c /etc/rocketmq/broker.conf
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqbroker
    
      rmqconsole:
        image: styletang/rocketmq-console-ng
        container_name: rmqconsole
        ports:
          - 8080:8080
        environment:
            JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"
        depends_on:
          - rmqnamesrv
        networks:
          rmq:
            aliases:
              - rmqconsole
    
    networks:
      rmq:
        external:
           name: my_net
    

    broker.conf

    RocketMQ Broker 需要一个配置文件,按照上面的 Compose 配置,我们需要在./data/brokerconf/目录下创建一个名为broker.conf的配置文件,内容如下:

    # Licensed to the Apache Software Foundation (ASF) under one or more
    # contributor license agreements.  See the NOTICE file distributed with
    # this work for additional information regarding copyright ownership.
    # The ASF licenses this file to You under the Apache License, Version 2.0
    # (the "License"); you may not use this file except in compliance with
    # the License.  You may obtain a copy of the License at
    #
    #     http://www.apache.org/licenses/LICENSE-2.0
    #
    #  Unless required by applicable law or agreed to in writing, software
    #  distributed under the License is distributed on an "AS IS" BASIS,
    #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    #  See the License for the specific language governing permissions and
    #  limitations under the License.
    
    # 所属集群名字
    brokerClusterName=DefaultCluster
    
    # broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
    # 在 broker-b.properties 使用: broker-b
    brokerName=broker-a
    
    # 0 表示 Master,> 0 表示 Slave
    brokerId=0
    
    # nameServer地址,分号分割
    # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876
    
    # 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
    # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
    # brokerIP1=192.168.0.253
    
    # 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
    defaultTopicQueueNums=4
    
    # 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
    autoCreateTopicEnable=true
    
    # 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
    autoCreateSubscriptionGroup=true
    
    # Broker 对外服务的监听端口
    listenPort=10911
    
    # 删除文件时间点,默认凌晨4点
    deleteWhen=04
    
    # 文件保留时间,默认48小时
    fileReservedTime=120
    
    # commitLog 每个文件的大小默认1G
    mapedFileSizeCommitLog=1073741824
    
    # ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
    mapedFileSizeConsumeQueue=300000
    
    # destroyMapedFileIntervalForcibly=120000
    # redeleteHangedFileInterval=120000
    # 检测物理文件磁盘空间
    diskMaxUsedSpaceRatio=88
    # 存储路径
    # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
    # commitLog 存储路径
    # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
    # 消费队列存储
    # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
    # 消息索引存储路径
    # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
    # checkpoint 文件存储路径
    # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
    # abort 文件存储路径
    # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
    # 限制的消息大小
    maxMessageSize=65536
    
    # flushCommitLogLeastPages=4
    # flushConsumeQueueLeastPages=2
    # flushCommitLogThoroughInterval=10000
    # flushConsumeQueueThoroughInterval=60000
    
    # Broker 的角色
    # - ASYNC_MASTER 异步复制Master
    # - SYNC_MASTER 同步双写Master
    # - SLAVE
    brokerRole=ASYNC_MASTER
    
    # 刷盘方式
    # - ASYNC_FLUSH 异步刷盘
    # - SYNC_FLUSH 同步刷盘
    flushDiskType=ASYNC_FLUSH
    
    # 发消息线程池数量
    # sendMessageThreadPoolNums=128
    # 拉消息线程池数量
    # pullMessageThreadPoolNums=128
    

    RocketMQ 控制台

    访问 http://rmqIP:8080 登入控制台

    RocketMQ的架构

    RocketMQ架构
    Nameserver

    Nameserver的开发旨在轻量级,多台Nameserver互相独立,彼此间互不通信,这样的设计,保证了单台Nameserver宕机,不影响Nameserver。nameserver不会有频繁的读写,所以性能开销非常小,稳定性很高。

    1. NameServer用来保存活跃的broker列表,包括Master和Slave。
    2. NameServer用来保存所有topic和该topic所有队列的列表。
    3. NameServer用来保存所有broker的Filter列表。

    相关文章

      网友评论

        本文标题:RocketMQ

        本文链接:https://www.haomeiwen.com/subject/eyaauqtx.html