美文网首页rocketmq
3分钟快速入门RocketMQ(下)

3分钟快速入门RocketMQ(下)

作者: 云原生实战 | 来源:发表于2017-08-09 07:39 被阅读4062次

    RocketMQ 集群部署模式

    • 单 master 模式

    也就是只有一个 master 节点,称不上是集群,一旦这个 master 节点宕机,那么整个服务就不可用。

    优点:部署简单。
    缺点:存在单点故障。
    注意:该模式一般只用来个人学习,或者作为开发环境使用,生产环境不推荐使用该模式。

    • 多 master 模式

    多个 master 节点组成集群,单个 master 节点宕机或者重启对应用来说没有影响。

    优点:所有模式中性能最高。
    缺点:单个 master 节点宕机期间,未被消费的消息在节点恢复之前不可用,消息的实时性会受到影响。
    注意:使用同步刷盘可以保证消息不丢失,同时每个 Topic 应该均匀分布在集群中每个节点,而不是只在某个节点上,否则,该节点宕机就会对订阅该 topic 的应用造成影响。

    • 多 master 多 slave 异步复制模式

    在多 master 模式的基础上,每个 master 节点都有至少一个对应的 slave。master 节点可读可写,但是 slave 只能读不能写,类似于 mysql 的主备模式。

    优点: 在 master 宕机时,消费者可以从 slave 读取消息,消息的实时性不会受影响,性能几乎和多 master 一样。
    缺点:使用异步复制的同步方式有可能会有消息丢失的问题。

    • 多 master 多 slave 同步双写模式

    同多 master 多 slave 异步复制模式类似,区别在于 master 和 slave 之间的数据同步方式是同步的。

    优点:同步双写模式能保证数据不丢失。
    缺点:发送单个消息 RT 会略长,性能相比异步复制低 10% 左右。
    刷盘策略:同步刷盘和异步刷盘(指的是节点自身数据是同步还是异步存储)。
    同步方式:同步双写和异步复制(指的一组 master 和 slave 之间数据的同步方式)。
    注意:要保证数据可靠,需采用同步刷盘和同步双写的方式,但性能会较其他方式低。

    RocketMQ 单主部署

    鉴于是快速入门,我选择的是第一种单 master 的部署模式。先说明一下我的安装环境:

    1. Centos 7.2
    2. jdk 1.8
    3. Maven 3.2.x
    4. Git

    这里 git 可用可不用,主要是用来直接下载 github 上的源码。也可以选择自己到
    github 上下载,然后上传到服务器上。这里以 git 操作为示例。

    1. clone 源码并用 maven 编译
    > git clone https://github.com/alibaba/RocketMQ.git /opt/RocketMQ
    > cd /opt/RocketMQ && mvn -Dmaven.test.skip=true clean package install assembly:assembly -U
    > cd target/alibaba-rocketmq-broker/alibaba-rocketmq
    
    1. 启动 Name Server
    > nohup sh /opt/RocketMQ/bin/mqnamesrv &
    //执行 jps 查看进程
    > jps
    25913 NamesrvStartup
    //查看日志确保服务已正常启动
    > tail -f ~/logs/rocketmqlogs/namesrv.log
    The Name Server boot success...
    
    1. 启动 broker
    > nohup sh /opt/RocketMQ/bin/mqbroker -n localhost:9876 &
    //执行 jps 查看进程
    > jps
    25954 BrokerStartup
    //查看日志确保服务已正常启动
    > tail -f ~/logs/rocketmqlogs/broker.log
    The broker[broker-a, 10.1.54.121:10911] boot success...
    
    1. 发送和接收消息

    发送/接收消息之前,我们需要告诉客户端 NameServer 地址。RocketMQ 提供了多种方式来实现这一目标。为简单起见,我们使用环境变量 NAMESRV_ADDR。

    > export NAMESRV_ADDR=localhost:9876
    > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Producer
    SendResult [sendStatus=SEND_OK, msgId= ...
    > sh /opt/RocketMQ/bin/tools.sh com.alibaba.rocketmq.example.quickstart.Consumer
    ConsumeMessageThread_%d Receive New Messages: [MessageExt...
    
    1. 关闭服务
    > sh /opt/RocketMQ/bin/mqshutdown broker
    The mqbroker(36695) is running...
    Send shutdown request to mqbroker(36695) OK
    > sh /opt/RocketMQ/bin/mqshutdown namesrv
    The mqnamesrv(36664) is running...
    Send shutdown request to mqnamesrv(36664) OK
    

    此处可能遇到的问题

    1. 执行"git clone https://github.com/alibaba/RocketMQ.git /home/inspkgs/RocketMQ"时出现以下提示:
    fatal: unable to access 'https://github.com/alibaba/RocketMQ.git/': Could not resolve host: github.com; Unknown error
    

    解决办法:一般是由于网络原因造成的,执行以下命令:

    > ping github.com
    

    确定可以 ping 通之后,再重新执行 git clone 命令。

    1. 执行"mvn -Dmaven.test.skip=true clean package install assembly:assembly -U"编译时,可能出现下载相关 jar 包很慢的情况。

    这也是由于默认 maven 中央仓库在国外的原因,可以根据需要在 /home/maven/conf/setting.xml 中的 <mirrors></mirrors> 添加以下内容后重新编译:

    <mirror>
        <id>aliyun</id>
        <mirrorOf>central</mirrorOf>
        <name>aliyun maven</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
    </mirror>
    

    示例代码

    • 生产者
    public class Producer {
        public static void main(String[] args) throws MQClientException, InterruptedException {
    
            //声明并初始化一个producer
            //需要一个producer group名字作为构造方法的参数,这里为producer1
            DefaultMQProducer producer = new DefaultMQProducer("producer1");
    
            //设置NameServer地址,此处应改为实际NameServer地址,多个地址之间用;分隔
            //NameServer的地址必须有,但是也可以通过环境变量的方式设置,不一定非得写死在代码里
            producer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
    
            //调用start()方法启动一个producer实例
            producer.start();
    
            //发送10条消息到Topic为TopicTest,tag为TagA,消息内容为“Hello RocketMQ”拼接上i的值
            for (int i = 0; i < 10; i++) {
                try {
                    Message msg = new Message("TopicTest",// topic
                            "TagA",// tag
                            ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET)// body
                    );
    
                    //调用producer的send()方法发送消息
                    //这里调用的是同步的方式,所以会有返回结果
                    SendResult sendResult = producer.send(msg);
    
                    //打印返回结果,可以看到消息发送的状态以及一些相关信息
                    System.out.println(sendResult);
                } catch (Exception e) {
                    e.printStackTrace();
                    Thread.sleep(1000);
                }
            }
    
            //发送完消息之后,调用shutdown()方法关闭producer
            producer.shutdown();
        }
    }
    
    • 消费者
    public class Consumer {
    
        public static void main(String[] args) throws InterruptedException, MQClientException {
    
            //声明并初始化一个consumer
            //需要一个consumer group名字作为构造方法的参数,这里为consumer1
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer1");
    
            //同样也要设置NameServer地址
            consumer.setNamesrvAddr("10.1.54.121:9876;10.1.54.122:9876");
    
            //这里设置的是一个consumer的消费策略
            //CONSUME_FROM_LAST_OFFSET 默认策略,从该队列最尾开始消费,即跳过历史消息
            //CONSUME_FROM_FIRST_OFFSET 从队列最开始开始消费,即历史消息(还储存在broker的)全部消费一遍
            //CONSUME_FROM_TIMESTAMP 从某个时间点开始消费,和setConsumeTimestamp()配合使用,默认是半个小时以前
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            //设置consumer所订阅的Topic和Tag,*代表全部的Tag
            consumer.subscribe("TopicTest", "*");
    
            //设置一个Listener,主要进行消息的逻辑处理
            consumer.registerMessageListener(new MessageListenerConcurrently() {
    
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
    
                    System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs);
    
                    //返回消费状态
                    //CONSUME_SUCCESS 消费成功
                    //RECONSUME_LATER 消费失败,需要稍后重新消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //调用start()方法启动consumer
            consumer.start();
    
            System.out.println("Consumer Started.");
        }
    }
    

    更多技术干货,可以扫描下面的二维码,关注微信公众号:冯先生的笔记


    冯先生的笔记

    相关文章

      网友评论

      • 0c4f35cc103b:写的很好啊,但是我还有很多疑惑,比如MQ中怎么设置存储的大小和数量,还有多MASTER怎么样实现和部署

      本文标题:3分钟快速入门RocketMQ(下)

      本文链接:https://www.haomeiwen.com/subject/qvmmlxtx.html