美文网首页
rocketmq-producer

rocketmq-producer

作者: 划水者 | 来源:发表于2018-08-13 09:49 被阅读0次

rocketmq的producer发送消息,大致会分为如下几种消息

同步消息,发送者必须同步等待;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

异步消息,消息发送成功,异步通知;

    DefaultMQProducer producer = new DefaultMQProducer("Jodie_Daily_test");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    producer.setRetryTimesWhenSendAsyncFailed(0);

    for (int i = 0; i < 10000000; i++) {
        try {
            final int index = i;
            Message msg = new Message("Jodie_topic_1023",
                "TagA",
                "OrderID188",
                "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
            producer.send(msg, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());
                }

                @Override
                public void onException(Throwable e) {
                    System.out.printf("%-10d Exception %s %n", index, e);
                    e.printStackTrace();
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    producer.shutdown();

单向消息,消息直接发送,不管成功失败;

    DefaultMQProducer producer = new DefaultMQProducer("ExampleProducerGroup");
    producer.setNamesrvAddr("localhost:9876");
    producer.start();
    for (int i = 0; i < 100; i++) {
        Message msg = new Message("TopicTest" , "TagA", ("Hello RocketMQ " +
                        i).getBytes(RemotingHelper.DEFAULT_CHARSET)
        );
        producer.sendOneway(msg);

    }
    producer.shutdown();

乱序消息,消费者不能保证按照生产者发送的顺序消费;在发送消息的时候不需要指定消息的keys

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                Message msg = new Message("TopicTest",
                    "TagA",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

顺序消息,消费者按照生产者生产顺序消费;发送消息时,需要指定消息的发送队列,broker会需要保持有序的消息放到同一个队列中,消费者消费同一个队列中的消息可以保证有序,注意不同队列之间消息不能保证有序性

    MQProducer producer = new DefaultMQProducer("example_group_name");
    producer.start();
    String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
    for (int i = 0; i < 100; i++) {
        int orderId = i % 10;
        Message msg = new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,
                ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
        SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
            @Override
            public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                Integer id = (Integer) arg;
                int index = id % mqs.size();
                return mqs.get(index);
            }
        }, orderId);

        System.out.printf("%s%n", sendResult);
    }
    producer.shutdown();

实时消息,发送者发送的消息,对消费者立刻可见;上述所写所有demo其实都是属于实时消息,只要发送,对消费者是实时可见的

延迟消息,发送者发送的消息,需要等待一段时间对消费者可见;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                //指定keys为OrderID188
                Message msg = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                //设置延迟消费级别
                msg.setDelayTimeLevel(3);
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

rocketmq的延迟消费,只支持特定时间的延迟,通过设置延迟时间的级别来确定延迟的时间
默认的延迟级别的配置如下,可进行修改
messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
如果设置delayTimeLevel为3,表示延迟时间为10s

单个消息,发送者每次发送到服务端是一条消息;上述列举的demo都是属于单个消息发送

批量消息,发送者每次发送到服务端是多条消息;

    DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");

    producer.setNamesrvAddr("localhost:9876");

    producer.start();

    for (int i = 0; i < 128; i++)
        try {
            {
                //指定keys为OrderID188
                Message msg1 = new Message("TopicTest",
                    "TagA",
                    "OrderID188",
                    "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                Message msg2 = new Message("TopicTest",
                        "TagA",
                        "OrderID189",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                List<Message> msgs = new ArrayList<>();
                msgs.add(msg1);
                msgs.add(msg2);
                SendResult sendResult = producer.send(msgs);
                System.out.printf("%s%n", sendResult);
            }

        } catch (Exception e) {
            e.printStackTrace();
        }

    producer.shutdown();

发送批量消息主要目的是为了减少io交互

相关文章

  • rocketmq-producer

    rocketmq的producer发送消息,大致会分为如下几种消息 同步消息,发送者必须同步等待; 异步消息,消息...

  • RocketMQ-Producer

    rocketmq 发送端的设计1、需要考虑什么? 容错设计 如果投递到某个服务端失败了,在多主的情况下可以选择可用...

  • rocketMq-producer介绍

    producer介绍 producer在rocketMq扮演的角色是消息的发送过程,其实宏观上来讲其实就包括两大块...

  • 三、RocketMQ-Producer启动流程

    一、UML图例 二、大致流程说明: 流程为非事务消息流程 在main方法中调用 new DefaultMQProd...

  • RocketMQ-Producer生产者解析

    Producer 概念说明* 初始化流程&流程图&相关类关系说明* 消息发送过程* 批量消息发送* 发送顺序消息...

  • 四、RocketMQ-Producer的Send方法

    一、概述 RocketMQ的producer默认有两个,一个是DefaultMQProducer,另一个是Tran...

网友评论

      本文标题:rocketmq-producer

      本文链接:https://www.haomeiwen.com/subject/barybftx.html