美文网首页日常学习
Linux环境下安装RocketMQ(MetaQ)

Linux环境下安装RocketMQ(MetaQ)

作者: 木叶之荣 | 来源:发表于2017-01-12 23:15 被阅读879次

    一:RocketMQ简介

    RocketMQ是一款分布式、队列模型的消息中间件,具有以下特点:

    1.能够保证严格的消息顺序

    2.提供丰富的消息拉取模式

    3.高效的订阅者水平扩展能力

    4.实时的消息订阅机制

    5.亿级消息堆积能力

    二:安装RocketMQ

    下载源码

    首先我们从githup上获取RocketMQ的源码,目前最新的版本为3.5.8,下载地址为:https://github.com/alibaba/RocketMQ/releases 或者 wget https://github.com/alibaba/RocketMQ/releases/alibaba/RocketMQ/archive/v3.5.8.tar.gz。请注意:此时我们下载的是源码,直接解压时不能用的,所以我们需要编译之后才能使用。

    编译源码

    在进行编译源码之前我们需要安装JDK。如果你已经安装过了,请跳过这里。如果你还没有安装过JDK,请参考这篇文章(Linux环境下安装JDK)。然后我们还需要安装一下Maven。Maven的安装还是比较简单,只需要去官方上下载的安装吧,然后直接解压,再配置一下环境变量就OK。接下来我们把刚才下载来的RockeMQ的源码解压到/usr/local/rockemq-source文件夹中。在源码中有一个Install.sh。如图所示:

    ![Install.sh(https://img.haomeiwen.com/i2382618/9508b0581377d386.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

    。运行sh install.sh。在编译完成之后,我们只要target目录下的alibaba-rocketmq这个文件夹中内容,把alibaba-rocketmq文件夹中的内容移动到/usr/local/rocketmq中。如果你不想编译的话,可以从这里下载编译之后的rocketmq。(rocketmq3.5.8)。

    配置环境变量

    接下来我们需要配置一下环境变量。在终端中输入以下命令:vi /etc/profile ,在文件的末尾中添加如下两句话:export rocketmq=/usr/local/rocketmq export PATH=$PATH:$rocketmq/bin。接下来我们使配置的换将变量生效:source /etc/profile.

    三:启动RocketMQ

    接下来我们启动一下刚才编译的RocketMQ.在启动之前我们需要修改一下RocketMQ启动的内存大小(如果你的系统内存比较大的话,请忽略)。我们进入到/usr/local/rocketmq/bin中,在终端中输入以下命令修改mqnamesrv的内存大小:vi runserver.sh.修改为如图的内容:

    runserver.sh.

    ,接下来修改broker的内存大小:vi runbroker.sh:

    runbroker.sh

    启动mqnameserver

    进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqnamesrv > ~/logs/rocketmqlogs/namesrv.log 2>&1 &。注意最后的这个 & 不要少。

    启动mqbroker

    进入到/usr/local/rocketmq/bin中输入以下命令:nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true > ~/logs/rocketmqlogs/broker.log 2>&1 &。注意:localhost可以换成你刚才启动mqnamesrv的IP。autoCreateTopicEnable=true 这句话不要少了。最后的 & 也不要少了。
    我们可以通过 ps aux | grep java命令来查看启动的情况。

    Paste_Image.png

    到此,rocketmq的安装完毕。

    四:RocketMQ的小例子

    producer:

    package com.zkn.newlearn.rocketmq;
    
    import com.alibaba.rocketmq.client.exception.MQBrokerException;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
    import com.alibaba.rocketmq.client.producer.SendResult;
    import com.alibaba.rocketmq.common.message.Message;
    import com.alibaba.rocketmq.remoting.exception.RemotingException;
    
    import java.util.concurrent.TimeUnit;
    
    /**
     * Created by zkn on 2016/10/27.
     */
    public class ProducerTest01 {
    
        public static void main(String[] args) {
    
            /**
             * 一个应用创建一个Producer,由应用来维护此对象,可以设置为全局对象或者单例<br>
             * 注意:ProducerGroupName需要由应用来保证唯一<br>
             * ProducerGroup这个概念发送普通的消息时,作用不大,但是发送分布式事务消息时,比较关键,
             * 因为服务器会回查这个Group下的任意一个Producer
             */
            DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
            //producer.setNamesrvAddr("192.168.180.1:9876");
            producer.setNamesrvAddr("192.168.180.133:9876");
            producer.setInstanceName("Producer");
            /**
             * Producer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             * 注意:切记不可以在每次发送消息时,都调用start方法
             */
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            for (int i = 0; i < 100; i++) {
                try {
                    /**
                     * 下面这段代码表明一个Producer对象可以发送多个topic,多个tag的消息。
                     * 注意:send方法是同步调用,只要不抛异常就标识成功。但是发送成功也可会有多种状态,<br>
                     * 例如消息写入Master成功,但是Slave不成功,这种情况消息属于成功,但是对于个别应用如果对消息可靠性要求极高,<br>
                     * 需要对这种情况做处理。另外,消息可能会存在发送失败的情况,失败重试由应用来处理。
                     */
                    {
                        Message msg = new Message("TopicTest1",// topic
                                "TagA",// tag
                                "OrderID001",// key
                                ("Hello MetaQ").getBytes());// body
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }
    
                    {
                        Message msg = new Message("TopicTest2",
                                "TagB",
                                "OrderID001",
                                ("Hello MetaQ TagB".getBytes()));
    
                        SendResult sendResult = producer.send(msg);
                        System.out.println(sendResult);
                    }
    
                    {
                        Message msg = new Message("TopicTest3",
                                "TagC",
                                "OrderID001",
                                ("Hello MetaQ TagC").getBytes());
    
                        SendResult sendResult = producer.send(msg);
    
                        System.out.println(sendResult);
                    }
    
                    TimeUnit.MILLISECONDS.sleep(1000);
    
                } catch (MQClientException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                } catch (RemotingException e) {
                    e.printStackTrace();
                } catch (MQBrokerException e) {
                    e.printStackTrace();
                }
            }
            /**
             * 应用退出时,要调用shutdown来清理资源,关闭网络连接,从MetaQ服务器上注销自己
             * 注意:我们建议应用在JBOSS、Tomcat等容器的退出销毁方法里调用shutdown方法
             */
            producer.shutdown();
        }
    }
    

    consumer:

    package com.zkn.newlearn.rocketmq;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * Created by zkn on 2016/10/27.
     */
    public class ConsumerTest01 {
    
        /**
         * 当前例子是PushConsumer用法,使用方式给用户感觉是消息从RocketMQ服务器推到了应用客户端。<br>
         * 但是实际PushConsumer内部是使用长轮询Pull方式从MetaQ服务器拉消息,然后再回调用户Listener方法<br>
         */
        public static void main(String[] args) {
    
            /**
             * 注意:ConsumerGroupName需要由应用来保证唯一
             */
            DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");
            //pushConsumer.setNamesrvAddr("192.168.180.1:9876");
            pushConsumer.setNamesrvAddr("192.168.180.133:9876");
            pushConsumer.setInstanceName("Consumer");
            try {
                /**
                 * 订阅指定topic下tags分别等于TagA或TagC或TagD
                 * 两个参数:第一个参数是topic第二个参数是tags
                 */
                pushConsumer.subscribe("TopicTest1", "TagA || TagC || TagD");
                /**
                * 订阅指定topic下所有消息<br>
                * 注意:一个consumer对象可以订阅多个topic
                */
                //pushConsumer.subscribe("TopicTest2", "*");
                pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                    @Override
                    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                    ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                        System.out.println(Thread.currentThread().getName() + " Receive New Messages: " + msgs.size());
                        MessageExt messageExt = msgs.get(0);
                        if("TopicTest1".equals(messageExt.getTopic())){
                            // 执行TopicTest1的消费逻辑
                            if (messageExt.getTags() != null && messageExt.getTags().equals("TagA")) {
                                // 执行TagA的消费
                                System.out.println(new String(messageExt.getBody()));
                            }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagB")){
                                System.out.println(new String(messageExt.getBody()));
                            }else if(messageExt.getTags() != null && messageExt.getTags().equals("TagC")) {
                                System.out.println(new String(messageExt.getBody()));
                            }
                        }else if("TopicTest2".equals(messageExt.getTopic())){
                            System.out.println(new String(messageExt.getBody()));
                        }
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    }
                });
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            /**
             * Consumer对象在使用之前必须要调用start初始化,初始化一次即可<br>
             */
            try {
                pushConsumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            System.out.println("Consumer Started.");
        }
    }
    
    package com.zkn.newlearn.rocketmq;
    
    import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import com.alibaba.rocketmq.client.exception.MQClientException;
    import com.alibaba.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * Created by zkn on 2016/10/30.
     */
    public class ConsumerTest02 extends ConsumerTest01 {
    
        public static void main(String[] args) {
    
            DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer("ProducerGroupName");
            //pushConsumer.setNamesrvAddr("192.168.180.1:9876");
            pushConsumer.setNamesrvAddr("192.168.180.133:9876");
            pushConsumer.setInstanceName("Consumer");
            /**
             * 订阅指定topic下所有消息<br>
             * 注意:一个consumer对象可以订阅多个topic
             */
            try {
                pushConsumer.subscribe("TopicTest2", "*");
                pushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                     @Override
                      public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                            MessageExt messageExt = msgs.get(0);
                            if("TopicTest2".equals(messageExt.getTopic())){
                                System.out.println(new String(messageExt.getBody()));
                            }
                            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                     }
                }
                );
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            try {
                pushConsumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
        }
    }
    

    相关文章

      网友评论

      本文标题:Linux环境下安装RocketMQ(MetaQ)

      本文链接:https://www.haomeiwen.com/subject/umpjbttx.html