美文网首页
rocketmq-producer

rocketmq-producer

作者: 划水者 | 来源:发表于2018-08-13 09:49 被阅读0次

    rocketmq的producer发送消息,大致会分为如下几种消息

    同步消息,发送者必须同步等待;

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
        producer.setNamesrvAddr("localhost:9876");
    
        producer.start();
    
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        producer.shutdown();
    

    异步消息,消息发送成功,异步通知;

        DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        producer.setRetryTimesWhenSendAsyncFailed(0);
    
        for (int i = 0; i < 10000000; i++) {
            try {
                final int index = i;
                Message msg = new Message("Jodie_topic_1023",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        System.out.printf("%-10d Exception %s %n", index, e);
                        e.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    

    单向消息,消息直接发送,不管成功失败;

        DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 100; i++) {
            Message msg = new Message("TopicTest" , "TagA", ("Hello RocketMQ " +
                            i).getBytes(RemotingHelper.DEFAULT_CHARSET)
            );
            producer.sendOneway(msg);
    
        }
        producer.shutdown();
    

    乱序消息,消费者不能保证按照生产者发送的顺序消费;在发送消息的时候不需要指定消息的keys

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
        producer.setNamesrvAddr("localhost:9876");
    
        producer.start();
    
        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        producer.shutdown();
    

    顺序消息,消费者按照生产者生产顺序消费;发送消息时,需要指定消息的发送队列,broker会需要保持有序的消息放到同一个队列中,消费者消费同一个队列中的消息可以保证有序,注意不同队列之间消息不能保证有序性

        MQProducer producer = new DefaultMQProducer("example_group_name");
        producer.start();
        String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
        for (int i = 0; i < 100; i++) {
            int orderId = i % 10;
            Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
                @Override
                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                    Integer id = (Integer) arg;
                    int index = id % mqs.size();
                    return mqs.get(index);
                }
            }, orderId);
    
            System.out.printf("%s%n", sendResult);
        }
        producer.shutdown();
    

    实时消息,发送者发送的消息,对消费者立刻可见;上述所写所有demo其实都是属于实时消息,只要发送,对消费者是实时可见的

    延迟消息,发送者发送的消息,需要等待一段时间对消费者可见;

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
        producer.setNamesrvAddr("localhost:9876");
    
        producer.start();
    
        for (int i = 0; i < 128; i++)
            try {
                {
                    //指定keys为OrderID188
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    //设置延迟消费级别
                    msg.setDelayTimeLevel(3);
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        producer.shutdown();
    

    rocketmq的延迟消费,只支持特定时间的延迟,通过设置延迟时间的级别来确定延迟的时间
    默认的延迟级别的配置如下,可进行修改
    messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
    如果设置delayTimeLevel为3,表示延迟时间为10s

    单个消息,发送者每次发送到服务端是一条消息;上述列举的demo都是属于单个消息发送

    批量消息,发送者每次发送到服务端是多条消息;

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
    
        producer.setNamesrvAddr("localhost:9876");
    
        producer.start();
    
        for (int i = 0; i < 128; i++)
            try {
                {
                    //指定keys为OrderID188
                    Message msg1 = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    Message msg2 = new Message("TopicTest",
                            "TagA",
                            "OrderID189",
                            "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    List<Message> msgs = new ArrayList<>();
                    msgs.add(msg1);
                    msgs.add(msg2);
                    SendResult sendResult = producer.send(msgs);
                    System.out.printf("%s%n", sendResult);
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
    
        producer.shutdown();
    

    发送批量消息主要目的是为了减少io交互

    相关文章

      网友评论

          本文标题:rocketmq-producer

          本文链接:https://www.haomeiwen.com/subject/barybftx.html