美文网首页
rocketmq自测

rocketmq自测

作者: zxy_3197 | 来源:发表于2019-02-22 13:39 被阅读0次

    1、生产者
    package com.example.demo.rocketmq;

    /**

    • @Description:
    • @author: YuanTong-ZXY
    • @Date: 2019/2/22 9:34
      */
      import org.apache.rocketmq.client.producer.DefaultMQProducer;
      import org.apache.rocketmq.client.producer.SendResult;
      import org.apache.rocketmq.common.message.Message;

    public class Producer {
    public static void main(String[] args){
    DefaultMQProducer producer = new DefaultMQProducer("Producer");
    producer.setNamesrvAddr("192.168.205.196:9876");
    try {
    producer.start();

            Message msg = new Message("PushTopic",
                    "push",
                    "1",
                    "Just for test.".getBytes());
    
            SendResult result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
    
            msg = new Message("PushTopic",
                    "push",
                    "2",
                    "Just for test.zhangxinyu111".getBytes());
    
            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
    
            msg = new Message("PullTopic",
                    "pull",
                    "1",
                    "Just for test.".getBytes());
    
            result = producer.send(msg);
            System.out.println("id:" + result.getMsgId() +
                    " result:" + result.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace();
        }finally{
            producer.shutdown();
        }
    }
    

    }

    2、消费者
    package com.example.demo.rocketmq;

    /**

    • @Description:
    • @author: YuanTong-ZXY
    • @Date: 2019/2/22 9:42
      */

    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.common.message.MessageExt;

    import java.util.List;

    public class Consumer {
    public static void main(String[] args){
    DefaultMQPushConsumer consumer =
    new DefaultMQPushConsumer("PushConsumer");
    consumer.setNamesrvAddr("192.168.205.196:9876");
    try {
    //订阅PushTopic下Tag为push的消息
    consumer.subscribe("PushTopic", "push");
    //程序第一次启动从消息队列头取数据
    consumer.setConsumeFromWhere(
    ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    consumer.registerMessageListener(
    new MessageListenerConcurrently() {
    public ConsumeConcurrentlyStatus consumeMessage(
    List<MessageExt> list,
    ConsumeConcurrentlyContext Context) {
    Message msg = list.get(0);
    System.out.println(msg.toString());
    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
    }
    );
    consumer.start();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    }

    相关文章

      网友评论

          本文标题:rocketmq自测

          本文链接:https://www.haomeiwen.com/subject/sifeyqtx.html