Producer.java
package edu.hgnu.one2one;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
public class Provider {
public static void main(String[] args) throws Exception {
//1.谁来发
DefaultMQProducer producer = new DefaultMQProducer("group1");
//2.发给谁
producer.setNamesrvAddr("localhost:9876");
producer.start();
//3.怎么发
//4.发什么
String msg = " 盼望着,盼望着,东风来了,春天的脚步近了。\n" +
" 一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。\n" +
" 小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。\n" +
" 坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的`,草软绵绵的。";
Message message = new Message("topic1", "tag1", msg.getBytes(StandardCharsets.UTF_8));
SendResult sendResult = producer.send(message);
//5.发的结果是什么
System.out.println(sendResult.toString());
//6.打扫战场
producer.shutdown();
}
}
Consumer.java
package edu.hgnu.one2one;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* @author admin
*/
public class Consumer {
public static void main(String[] args) throws Exception {
//1.创建一个接收消息的对象Consumer
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
//2.设定接收的命名服务器地址
consumer.setNamesrvAddr("localhost:9876");
//3.设置接收信息对应的topic,对应的sub标签为任意
consumer.subscribe("topic1","*");
//4.启动监听,接收消息
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
for (MessageExt msg:list) {
System.out.println("msg=====>"+msg);
String s = new String(msg.getBody());
System.out.println(s);
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
//5.启动接收消息的服务
consumer.start();
}
}
1.run Provider.java
控制台:
"C:\Program Files\Java\jdk1.8.0_101\bin\java.exe" ...
SendResult [sendStatus=SEND_OK, msgId=7F0000013BCC18B4AAC20330096D0000, offsetMsgId=AC10328300002A9F00000000002977AE, messageQueue=MessageQueue [topic=topic1, brokerName=SK-20210719XOMZ, queueId=1], queueOffset=3]
14:51:19.807 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:51:19.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
14:51:19.821 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.16.50.131:10911] result: true
2.run Consumer.java
控制台:
msg=====>MessageExt [brokerName=SK-20210719XOMZ, queueId=1, storeSize=636, queueOffset=3, sysFlag=0, bornTimestamp=1630479079790, bornHost=/172.16.50.131:57667, storeTimestamp=1630479079795, storeHost=/172.16.50.131:10911, msgId=AC10328300002A9F00000000002977AE, commitLogOffset=2717614, bodyCRC=1403423844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1630479232738, UNIQ_KEY=7F0000013BCC18B4AAC20330096D0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=tag1}, body=[-29, -128, -128, -29, -128, -128, -25, -101, -68, -26, -100, -101, -25, -99, -128, -17, -68, -116, -25, -101, -68,], transactionId='null'}]
盼望着,盼望着,东风来了,春天的脚步近了。
一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。
小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。
坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的,草软绵绵的。
网友评论