美文网首页
linux服务器rocketMQ安装

linux服务器rocketMQ安装

作者: 飞鹰雪玉 | 来源:发表于2021-08-25 13:46 被阅读0次

1、下载rocketMQ。

地址:https://rocketmq.apache.org/release_notes/
下载bin包。比如:rocketmq-all-4.5.1-bin-release

2、上传到服务器指定地址。

比如/home/{user}/data

3、添加brokerIP

cd ~/data/rocketmq-all-4.5.1-bin-release/conf`
vim broker.conf

添加

brokerIP1=116.196.69.88` //当前安装mq的ip。

保存退出

4、启动rocketMQ

cd /home/www/data/rocketmq-all-4.5.1-bin-release/bin
nohup sh mqnamesrv &
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &

5、查看启动日志

broker的:

tail -f ~/logs/rocketmqlogs/broker.log

namesrv的:

tail -f ~/logs/rocketmqlogs/namesrv.log

日志文件路径配置在/rocketmq-all-4.5.1-bin-release/conf/下的logback_broker.xmllogback_namesrv.xml文件中配置。

6、停止:

cd /home/www/rocketmq-all-4.5.1-bin-release/bin
sh mqshutdown broker
sh mqshutdown namesrv

7、java代码测试

package com.business.mq;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.TimeUnit;

/**
 * rocketmq
 */
@Component
public class MqThread implements ApplicationRunner {
    private static Logger logger = LoggerFactory.getLogger(MqThread.class);

    @Override
    public void run(ApplicationArguments args) {
        consumer();
        new Thread(new MqThread.producerThread()).start();
        System.out.println(111);
    }

    /**
     * 消费者
     */
    public void consumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
        // 服务地址.
        consumer.setNamesrvAddr("116.196.122.122:9876");
        // 订阅TopicTest. *表示订阅所有tags
        try {
            consumer.subscribe("TopicTest", "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        // 注册回调函数接收信息.
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                logger.info("收到消息: {}", msgs);
                for (MessageExt messageExt : msgs) {
                    try {
                        //此处string转对象进行数据处理
                        logger.info("消息内容: {}", new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    } catch (UnsupportedEncodingException e) {
                        e.printStackTrace();
                    }
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        try {
            //启动.
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }

    }

    /**
     * 生产者
     */
    class producerThread implements Runnable {
        @Override
        public void run() {
            //实例化生产者(组名).
            DefaultMQProducer producer = new DefaultMQProducer("producer_group1");
            // 指定服务器地址:端口.
            producer.setNamesrvAddr("116.196.122.122:9876");
            //启动.
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
            }
            int i = 0;
            while (true) {
                try {
                    TimeUnit.SECONDS.sleep(2);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //创建消息实例, 指定 topic, tag ,message.
                Message msg = null;
                try {
                    msg = new Message("TopicTest" /* Topic */,
                            "bbb" /* Tag */,
                            (String.valueOf(i++)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                    );
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                }

                try {
                    //发送.
                    SendResult sendResult = producer.send(msg);
                    logger.info("发送结果:{}", sendResult);
                } catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
                    e.printStackTrace();
                    //关闭.
                    producer.shutdown();
                }
            }
        }
    }


}

相关文章

网友评论

      本文标题:linux服务器rocketMQ安装

      本文链接:https://www.haomeiwen.com/subject/clggbltx.html