美文网首页
在代码中实现生产者和消费者one2one

在代码中实现生产者和消费者one2one

作者: 你家门口的两朵云 | 来源:发表于2021-09-01 14:55 被阅读0次

    Producer.java

    package edu.hgnu.one2one;
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    
    import java.nio.charset.StandardCharsets;
    
    public class Provider {
        public static void main(String[] args) throws Exception {
            //1.谁来发
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            //2.发给谁
            producer.setNamesrvAddr("localhost:9876");
            producer.start();
            //3.怎么发
            //4.发什么
            String msg = "  盼望着,盼望着,东风来了,春天的脚步近了。\n" +
                    "  一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。\n" +
                    "  小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。\n" +
                    "   坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的`,草软绵绵的。";
            Message message = new Message("topic1", "tag1", msg.getBytes(StandardCharsets.UTF_8));
            SendResult sendResult = producer.send(message);
            //5.发的结果是什么
            System.out.println(sendResult.toString());
            //6.打扫战场
            producer.shutdown();
        }
    }
    

    Consumer.java

    package edu.hgnu.one2one;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.client.exception.MQClientException;
    import org.apache.rocketmq.common.message.MessageExt;
    
    import java.util.List;
    
    /**
     * @author admin
     */
    public class Consumer {
        public static void main(String[] args) throws Exception {
            //1.创建一个接收消息的对象Consumer
            DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group1");
            //2.设定接收的命名服务器地址
            consumer.setNamesrvAddr("localhost:9876");
            //3.设置接收信息对应的topic,对应的sub标签为任意
            consumer.subscribe("topic1","*");
            //4.启动监听,接收消息
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    for (MessageExt msg:list) {
                        System.out.println("msg=====>"+msg);
                        String s = new String(msg.getBody());
                        System.out.println(s);
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            //5.启动接收消息的服务
            consumer.start();
        }
    }
    
    
    1.run Provider.java

    控制台:

    "C:\Program Files\Java\jdk1.8.0_101\bin\java.exe" ...
    SendResult [sendStatus=SEND_OK, msgId=7F0000013BCC18B4AAC20330096D0000, offsetMsgId=AC10328300002A9F00000000002977AE, messageQueue=MessageQueue [topic=topic1, brokerName=SK-20210719XOMZ, queueId=1], queueOffset=3]
    14:51:19.807 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:51:19.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
    14:51:19.821 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[172.16.50.131:10911] result: true
    
    
    2.run Consumer.java

    控制台:

    msg=====>MessageExt [brokerName=SK-20210719XOMZ, queueId=1, storeSize=636, queueOffset=3, sysFlag=0, bornTimestamp=1630479079790, bornHost=/172.16.50.131:57667, storeTimestamp=1630479079795, storeHost=/172.16.50.131:10911, msgId=AC10328300002A9F00000000002977AE, commitLogOffset=2717614, bodyCRC=1403423844, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='topic1', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=4, CONSUME_START_TIME=1630479232738, UNIQ_KEY=7F0000013BCC18B4AAC20330096D0000, CLUSTER=DefaultCluster, WAIT=true, TAGS=tag1}, body=[-29, -128, -128, -29, -128, -128, -25, -101, -68, -26, -100, -101, -25, -99, -128, -17, -68, -116, -25, -101, -68,], transactionId='null'}]
      盼望着,盼望着,东风来了,春天的脚步近了。
      一切都像刚睡醒的样子,欣欣然张开了眼。山朗润起来了,水涨起来了,太阳的脸红起来了。
      小草偷偷地从土地里钻出来,嫩嫩的,绿绿的。园子里,田野里,瞧去,一大片一大片满是的。
       坐着,躺着,打两个滚,踢几脚球,赛几趟跑,捉几回迷藏。风轻俏俏的,草软绵绵的。
    
    

    相关文章

      网友评论

          本文标题:在代码中实现生产者和消费者one2one

          本文链接:https://www.haomeiwen.com/subject/uattwltx.html