美文网首页
RocketMQ单Master搭建以及快速入门(含java De

RocketMQ单Master搭建以及快速入门(含java De

作者: 小偷阿辉 | 来源:发表于2021-05-06 14:16 被阅读0次

    1.背景

    RocketMQ 是由阿里用java语言开发的一款高性能、高吞吐量的分布式消息中间件,于2017年正式捐赠 Apache 基金会并成为顶级开源项目。
    目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比于Rabbitmq、kafka具有主要优势特性有:
    • 支持事务型消息(消息发送和DB操作保持两方的最终一致性,rabbitmq和kafka不支持)
    • 支持结合rocketmq的多个系统之间数据最终一致性(多方事务,二方事务是前提)
    • 支持18个级别的延迟消息(rabbitmq和kafka不支持)
    • 支持指定次数和时间间隔的失败消息重发(kafka不支持,rabbitmq需要手动确认)
    • 支持consumer端tag过滤,减少不必要的网络传输(rabbitmq和kafka不支持)
    • 支持重复消费(rabbitmq不支持,kafka支持)

    2.安装环境

    官网:http://rocketmq.apache.org

    1.安装环境:
    官网要求:

    64bit OS, Linux/Unix/Mac is recommended;
    64bit JDK 1.8+;
    Maven 3.2.x;
    Git;
    4g+ free disk for Broker server

    本次安装环境:

    jdk1.8
    centos7

    官网给出的是源码版本,需要自己打包编译,所以需要git&maven等环境,我本地直接用的编译后版本,可以直接按照我给的步骤安装,下面给出了jdk1.8 linux下载地址,下载jdk后解压安装并配置环境变量
    链接:https://pan.baidu.com/s/1bZdBVx4RbzBwmwvsADP4SA
    提取码:qsuj
    rocketmq下载地址:
    https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-bin-release.zip

    2.解压并配置环境变量(环境变量也就是bin目录下的启动文件,请自行配置,这样就不用每次都进入部署目录执行启动命令)

    unzip rocketmq-all-4.8.0-source-release.zip
    cd rocketmq-all-4.8.0/

    3.修改默认配置:

    默认rocketmq的配置4G的堆内存,测试环境/开发环境如果服务器内存不支持容器卡死,开发调试的话,不需要使用太高的配置.默认的配置占用的内存太高.

    vim bin/runserver.sh
    vim bin/runbroker.sh
    vim bin/tools.sh
    复制代码修改JAVA_HOME及Xms,Xmx,Xmn等内存配置,默认最小4G

    JAVA_OPT="${JAVA_OPT} -server -Xms256m -Xmx256m -Xmn128m -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"

    4.启动nameserver

    注意 -n 指定必须是公网/可访问内网ip,不能是localhost/127.0.0.1,否则会报org.apache.rocketmq.remoting.exception.RemotingTooMuchRequestException: sendDefaultImpl call timeout错误

    nohup sh bin/mqnamesrv -n 公网/可访问内网ip:9876 &
    启动后可进入以下目录查看启动日志
    tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...

    5.启动broker

    启动前修改conf/broker.conf文件
    以下两项配置
    namesvrAddr:nameserver可访问地址
    brokerIP1:broker可访问ip

    broker.conf

    同理这里也不可使用localhost/127.0.0.1作为ip地址

    nohup sh bin/mqbroker -n 公网/可访问内网ip:9876 -c conf/broker.conf &
    启动后进入以下目录查看启动日志
    tail -f ~/logs/rocketmqlogs/broker.log
    The broker[%s, 172.30.30.233:10911] boot success...

    6.新建主题

    sh mqadmin updateTopic -n 192.168.30.150:9876 -b 192.168.30.150:10911 -t VictorTopic

    7.mqadmin查看命令

    1.首先进入 RocketMQ 工程,进入/RocketMQ/bin 在该目录下有个 mqadmin 脚本 .
    查看帮助: 在 mqadmin 下可以查看有哪些命令
    a: 查看具体命令的使用 : sh mqadmin
    b: sh mqadmin help 命令名称
    例如,查看 updateTopic 的使用

    sh mqadmin help updateTopic

    1. 关闭nameserver和所有的broker:
      进入到bin下:

      sh mqshutdown namesrv
      sh mqshutdown broker

    2. 查看所有消费组group:

      sh mqadmin consumerProgress -n 192.168.1.23:9876

    3. 查看指定消费组下的所有topic数据堆积情况:

      sh mqadmin consumerProgress -n 192.168.1.23:9876 -g warning-group

    4. 查看所有topic :

      sh mqadmin topicList -n 192.168.1.23:9876

    5. 查看topic信息列表详情统计

      sh mqadmin topicstatus -n 192.168.1.23:9876 -t topicWarning

    6. 新增topic

    sh mqadmin updateTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

    1. 删除topic

    sh mqadmin deleteTopic –n 192.168.1.23:9876 –c DefaultCluster –t topicWarning

    9、查询集群消息

    sh mqadmin clusterList -n 192.168.1.23:9876

    8.查看进程

    由于rocketmq是java进程,可以直接使用jps命令查看,可以看到NamesrvStartup&BrokerStartup后台进程则表示启动成功


    image.png

    以上就是安装的全部步骤,按照官方文档跑起来还是很多坑

    9.Java client demo

    9.1.maven依赖

    注意:rocketmq java客户端必须和rocketmq版本一致

     <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.8.0</version>
            </dependency>
    

    9.1.生产者(producer):

    
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.common.message.Message;
    
    @Slf4j
    public final class MQProducer {
    
        /**
         * 生产者
         */
        static private DefaultMQProducer _producer = null;
    
        private MQProducer(){}
    
        /**
         * 初始化
         */
        static public void init() {
            try {
                // 创建生产者
                DefaultMQProducer producer = new DefaultMQProducer("herostory");
                // 指定 nameServer 地址
                producer.setNamesrvAddr("192.168.1.188:9876");
                // 启动生产者
                producer.start();
                producer.setRetryTimesWhenSendAsyncFailed(3);
    
                _producer = producer;
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    
        /**
         * 发送消息
         *
         * @param topic 主题
         * @param msg   消息对象
         */
        static public void sendMsg(String topic, Object msg) {
            if (null == topic ||
                    null == msg) {
                return;
            }
    
            if (null == _producer) {
                throw new RuntimeException("_producer 尚未初始化");
            }
    
            Message mqMsg = new Message();
            mqMsg.setTopic(topic);
            mqMsg.setBody(JSONObject.toJSONBytes(msg));
    
            try {
                _producer.send(mqMsg);
            } catch (Exception ex) {
                log.error(ex.getMessage(), ex);
            }
        }
    
    
    }
    
    

    9.2.消费者(consumer):

    
    import com.alibaba.fastjson.JSONObject;
    import com.shine.herostory.rank.RankService;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    @Slf4j
    public final class GameMsgConsumer {
    
        private GameMsgConsumer(){}
    
    
    
        static public void init(){
            DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("herostory");
            consumer.setNamesrvAddr("192.168.1.188:9876");
            try {
                consumer.subscribe("VictorTopic","*");//订阅所有消息
                consumer.registerMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {
                    for (MessageExt messageExt : list) {
                        VictorMsg msg= JSONObject.parseObject(messageExt.getBody(), VictorMsg.class);
                        log.info("receive msg:{}",msg.toString());
                        RankService.getInstance().refreshRank(msg);
                    }
    
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                });
                consumer.start();
            } catch (MQClientException ex) {
                log.error(ex.getMessage(), ex);
            }
    
        }
    
    
    }
    
    

    完结撒花

    相关文章

      网友评论

          本文标题:RocketMQ单Master搭建以及快速入门(含java De

          本文链接:https://www.haomeiwen.com/subject/cyardltx.html