1、下载rocketMQ。
地址:https://rocketmq.apache.org/release_notes/
下载bin
包。比如:rocketmq-all-4.5.1-bin-release
2、上传到服务器指定地址。
比如/home/{user}/data
3、添加brokerIP
。
cd ~/data/rocketmq-all-4.5.1-bin-release/conf`
vim broker.conf
添加
brokerIP1=116.196.69.88` //当前安装mq的ip。
保存退出
4、启动rocketMQ
cd /home/www/data/rocketmq-all-4.5.1-bin-release/bin
nohup sh mqnamesrv &
nohup sh mqbroker -n localhost:9876 -c ../conf/broker.conf &
5、查看启动日志
broker
的:
tail -f ~/logs/rocketmqlogs/broker.log
namesrv
的:
tail -f ~/logs/rocketmqlogs/namesrv.log
日志文件路径配置在/rocketmq-all-4.5.1-bin-release/conf/
下的logback_broker.xml
和logback_namesrv.xml
文件中配置。
6、停止:
cd /home/www/rocketmq-all-4.5.1-bin-release/bin
sh mqshutdown broker
sh mqshutdown namesrv
7、java代码测试
package com.business.mq;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.concurrent.TimeUnit;
/**
* rocketmq
*/
@Component
public class MqThread implements ApplicationRunner {
private static Logger logger = LoggerFactory.getLogger(MqThread.class);
@Override
public void run(ApplicationArguments args) {
consumer();
new Thread(new MqThread.producerThread()).start();
System.out.println(111);
}
/**
* 消费者
*/
public void consumer() {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group1");
// 服务地址.
consumer.setNamesrvAddr("116.196.122.122:9876");
// 订阅TopicTest. *表示订阅所有tags
try {
consumer.subscribe("TopicTest", "*");
} catch (MQClientException e) {
e.printStackTrace();
}
// 注册回调函数接收信息.
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
logger.info("收到消息: {}", msgs);
for (MessageExt messageExt : msgs) {
try {
//此处string转对象进行数据处理
logger.info("消息内容: {}", new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
try {
//启动.
consumer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
}
/**
* 生产者
*/
class producerThread implements Runnable {
@Override
public void run() {
//实例化生产者(组名).
DefaultMQProducer producer = new DefaultMQProducer("producer_group1");
// 指定服务器地址:端口.
producer.setNamesrvAddr("116.196.122.122:9876");
//启动.
try {
producer.start();
} catch (MQClientException e) {
e.printStackTrace();
}
int i = 0;
while (true) {
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
//创建消息实例, 指定 topic, tag ,message.
Message msg = null;
try {
msg = new Message("TopicTest" /* Topic */,
"bbb" /* Tag */,
(String.valueOf(i++)).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
try {
//发送.
SendResult sendResult = producer.send(msg);
logger.info("发送结果:{}", sendResult);
} catch (MQClientException | InterruptedException | RemotingException | MQBrokerException e) {
e.printStackTrace();
//关闭.
producer.shutdown();
}
}
}
}
}
网友评论