RocketMQ入门

作者: 王小冬 | 来源:发表于2018-09-10 18:03 被阅读4次

    RocketMQ入门

    1. RocketMQ简介

    RocketMQ是阿里开源的消息中间件,它是纯java开发,具有低延迟、高吞吐量、高可用性和适合大规模分布式系统应用的特点。从名字可以看出Rocket火箭,代表RocketMQ主打速度。RocketMQ思路起源于Kafka,它对消息的可靠传输及事务性做了优化。

    学习MQ必须知道的几个专业术语:

    1)Producer

    消息生产者,生产者的作用就是将消息发送到 MQ,生产者本身既可以产生消息,如读取文本信息等。也可以对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。 (Producer Group生产者组,简单来说就是多个发送同一类消息的生产者称之为一个生产者组。)

    2)Consumer

    消息消费者,简单来说,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,还是直接存储到数据库等取决于业务需要。(Consumer Group消费者组,和生产者组类似,消费同一类消息的多个 consumer 实例组成一个消费者组。)

    3)Topic

    Topic 是一种消息的逻辑分类,比如说你有订单类的消息,也有库存类的消息,那么就需要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。

    4)Message

    Message 是消息的载体。一个 Message 必须指定 topic,相当于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端可以基于 tag 进行过滤消息。也可以添加额外的键值对,例如你需要一个业务 key 来查找 broker 上的消息,方便在开发过程中诊断问题。

    5)Tag

    标签可以被认为是对 Topic 进一步细化。一般在相同业务模块中通过引入标签来标记不同用途的消息。

    6)Broker

    Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求做好准备。

    7)Name Server

    Name Server 为 producer 和 consumer 提供路由信息。

    RocketMQ架构图:

    rocketMQ架构图.png

    2. 为什么使用消息队列MQ

    使用消息队列主要是为了解决三个问题:松耦合、异步和削峰。下面简单介绍一下这三种作用的应用场景:

    2.1 松耦合

    传统的系统间耦合性太强,系统A在代码中直接调用系统B和系统C的代码,如果将来D系统接入,系统A还需要修改代码,过于麻烦,使用MQ将消息写入消息队列,需要消息的系统自己从消息队列中订阅,从而系统A不需要做任何修改。

    2.2 异步

    一些非必要的业务逻辑以同步的方式运行,太耗费时间。 将消息写入消息队列,非必要的业务逻辑以异步的方式运行,加快响应速度。

    2.3 削峰

    在高并发分布式环境下,在高峰请求的时候,由于来不及处理,请求往往发生堵塞,比如大量的update、insert操作同时到达mysql,直接导致大量的行锁和表锁,甚至触发too many connections错误。通过使用消息队列,我们可以异步处理请求,按照数据处理能力从消息队列中慢慢的拉取消息,在生产中,这个短暂的高峰期积压是允许的。

    缺点:

    引入MQ也会有一些缺点,比如系统的可用性降低,因为多加了一个系统就要保证MQ不出问题,一旦MQ挂了,整个系统也就挂了。

    还有就是系统的复杂性提高了,加入MQ之后要考虑很多问题,比如一致性问题,如何保证消息不被重复消费,如何保证消息可靠性传输。

    3. Linux下安装使用RocketMQ

    简单介绍了RocketMQ之后,我们动手安装一下它吧。环境:

    Ubuntu x86_64 GNU/Linux
    java version "1.8.0_181"
    Apache Maven 3.5.4 
    git version 1.9.1
    

    3.1 下载代码

    git clone https://github.com/apache/rocketmq.git
    

    3.2 使用maven编译打包

    由于默认的中央仓库在国外下载东西可能导致失败,可以将其改为国内镜像,编辑maven的配置文件settings.xml,添加:

    <mirror>
        <id>aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </mirror>
    

    然后再编译打包RocketMQ:

    cd rocketmq/
    mvn -Prelease-all -DskipTests clean install -U
    

    成功后会显示build success。

    3.3 启动Name Server

    cd distribution/target/apache-rocketmq
    nohup sh bin/mqnamesrv &
    tail -f bin/nohup.out
    # 启动成功显示
    The Name Server boot success. serializeType=JSON
    

    3.4 启动Broker

    nohup sh bin/mqbroker -n localhost:9876 &
    tail -f bin/nohup.out
    # 启动成功显示
    The broker[**, **.**.**.**:10911] boot success. serializeType=JSON and name server is localhost:9876
    

    3.5 测试发送和接收消息

    在发送和接收之前我们需要告诉客户端name servers的地址,RocketMQ提供了很多种方式来实现,为了演示方便我们使用环境变量NAMESRV_ADDR

     export NAMESRV_ADDR=localhost:9876
     sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
     # 可以看到有很多条消息已经发送成功了
     SendResult [sendStatus=SEND_OK, msgId= ...
     SendResult [sendStatus=SEND_OK, msgId= ...
     SendResult [sendStatus=SEND_OK, msgId= ...
    
     sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
     # 可以看到刚才发送的消息已经成功被消费者消费了
     ConsumeMessageThread_%d Receive New Messages: [MessageExt...
     ConsumeMessageThread_%d Receive New Messages: [MessageExt...
     ConsumeMessageThread_%d Receive New Messages: [MessageExt...
    

    3.6 关闭服务

    sh bin/mqshutdown broker
    # 关闭成功后显示
    The mqbroker(8916) is running...
    Send shutdown request to mqbroker(8916) OK
    
    
    sh bin/mqshutdown namesrv
    # 关闭成功后显示
    The mqnamesrv(8892) is running...
    Send shutdown request to mqnamesrv(8892) OK
    

    4. 遇到的问题

    1. 启动服务时报内存不足

    执行

    nohup sh mqnamesrv &
    

    时,报错

    # There is insufficient memory for the Java Runtime Environment to continue.
    # Native memory allocation (mmap) failed to map 2147483648 bytes for committing reserved memory.
    # An error report file with more information is saved as:
    # /home/wangjun/rocketmq/distribution/bin/hs_err_pid8300.log
    
    

    解决方案

    修改rocketmq/distribution/target/apache-rocketmq/bin/runserver.sh和runbroker.sh。

    JAVA_OPT="${JAVA_OPT} -server -Xms4g -Xmx4g -Xmn2g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    改为:

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn125m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    

    可以看出之前的默认设置是4g内存,如果你的机器没有这么大只是自己搭建着玩就把它设置小一点就行了。

    顺便将tools.sh的内存也改成256m,不然运行消息的发送和接收的demo的时候也会报错。

    PS:网上很多教程因为时间和版本问题有各种问题,最稳妥的办法就是看官方教程,及时英文不好也好坚持看,必定事半功倍

    参考:

    消息队列系列(1)为什么使用MQ:https://blog.csdn.net/dadadie/article/details/51553780

    *为什么要是用消息队列以及消息队列的优缺点分析:https://blog.csdn.net/alinshen/article/details/80583214

    *RocketMQ 实战之快速入门:https://www.jianshu.com/p/824066d70da8

    官网教程:http://rocketmq.apache.org/docs/quick-start/

    Kafka、RabbitMQ、RocketMQ等消息中间件的对比:https://blog.csdn.net/yunfeng482/article/details/72856762

    阿里中间件,十分钟入门RocketMQ:http://jm.taobao.org/2017/01/12/rocketmq-quick-start-in-10-minutes/

    相关文章

      网友评论

        本文标题:RocketMQ入门

        本文链接:https://www.haomeiwen.com/subject/zsfegftx.html