美文网首页
linux服务器rocketMQ安装

linux服务器rocketMQ安装

作者: 飞鹰雪玉 | 来源:发表于2021-08-25 13:46 被阅读0次

    1、下载rocketMQ。

    地址:https://rocketmq.apache.org/release_notes/
    下载bin包。比如:rocketmq-all-4.5.1-bin-release

    2、上传到服务器指定地址。

    比如/home/{user}/data

    3、添加brokerIP

    cd ~/data/rocketmq-all-4.5.1-bin-release/conf`
    vim broker.conf
    

    添加

    brokerIP1=116.196.69.88` //当前安装mq的ip。
    

    保存退出

    4、启动rocketMQ

    cd /home/www/data/rocketmq-all-4.5.1-bin-release/bin
    nohup sh mqnamesrv &
    nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
    

    5、查看启动日志

    broker的:

    tail -f ~/logs/rocketmqlogs/broker.log
    

    namesrv的:

    tail -f ~/logs/rocketmqlogs/namesrv.log
    

    日志文件路径配置在/rocketmq-all-4.5.1-bin-release/conf/下的logback_broker.xmllogback_namesrv.xml文件中配置。

    6、停止:

    cd /home/www/rocketmq-all-4.5.1-bin-release/bin
    sh mqshutdown broker
    sh mqshutdown namesrv
    

    7、java代码测试

    package com.business.mq;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQBrokerException;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    import org.apache.rocketmq.remoting.exception.RemotingException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.boot.ApplicationArguments;
    import org.springframework.boot.ApplicationRunner;
    import org.springframework.stereotype.Component;
    
    import java.io.UnsupportedEncodingException;
    import java.util.List;
    import java.util.concurrent.TimeUnit;
    
    /**
     * rocketmq
     */
    @Component
    public class MqThread implements ApplicationRunner {
        private static Logger logger = LoggerFactory.getLogger(MqThread.class);
    
        @Override
        public void run(ApplicationArguments args) {
            consumer();
            new Thread(new MqThread.producerThread()).start();
            System.out.println(111);
        }
    
        /**
         * 消费者
         */
        public void consumer() {
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
            // 服务地址.
            consumer.setNamesrvAddr("116.196.122.122:9876");
            // 订阅TopicTest. *表示订阅所有tags
            try {
                consumer.subscribe("TopicTest", "*");
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            // 注册回调函数接收信息.
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    logger.info("收到消息: {}", msgs);
                    for (MessageExt messageExt : msgs) {
                        try {
                            //此处string转对象进行数据处理
                            logger.info("消息内容: {}", new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                        } catch (UnsupportedEncodingException e) {
                            e.printStackTrace();
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            try {
                //启动.
                consumer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
    
        }
    
        /**
         * 生产者
         */
        class producerThread implements Runnable {
            @Override
            public void run() {
                //实例化生产者(组名).
                DefaultMQProducer producer = new DefaultMQProducer("producer_group1");
                // 指定服务器地址:端口.
                producer.setNamesrvAddr("116.196.122.122:9876");
                //启动.
                try {
                    producer.start();
                } catch (MQClientException e) {
                    e.printStackTrace();
                }
                int i = 0;
                while (true) {
                    try {
                        TimeUnit.SECONDS.sleep(2);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    //创建消息实例, 指定 topic, tag ,message.
                    Message msg = null;
                    try {
                        msg = new Message("TopicTest" /* Topic */,
                                "bbb" /* Tag */,
                                (String.valueOf(i++)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                        );
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
    
                    try {
                        //发送.
                        SendResult sendResult = producer.send(msg);
                        logger.info("发送结果:{}", sendResult);
                    } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
                        e.printStackTrace();
                        //关闭.
                        producer.shutdown();
                    }
                }
            }
        }
    
    
    }
    
    

    相关文章

      网友评论

          本文标题:linux服务器rocketMQ安装

          本文链接:https://www.haomeiwen.com/subject/clggbltx.html