美文网首页
消息发送基本类型

消息发送基本类型

作者: 洛美萨斯 | 来源:发表于2020-02-24 00:37 被阅读0次

    1. 步骤分析

    • 导入MQ客户端依赖
    <dependency>
        <groupId>org.apache.rocketmq</groupId>
        <artifactId>rocketmq-client</artifactId>
        <version>4.3.0</version>
    </dependency>
    
    • 消息发送者步骤分析
    1.创建消息生产者producer,并制定生产者组名
    2.指定Nameserver地址
    3.启动producer
    4.创建消息对象,指定主题Topic、Tag和消息体
    5.发送消息
    6.关闭生产者producer
    
    • 消息消费者步骤分析
    1.创建消费者Consumer,制定消费者组名
    2.指定Nameserver地址
    3.订阅主题Topic和Tag
    4.设置回调函数,处理消息
    5.启动消费者consumer
    

    2. 基本样例

    2.1 消息发送

    1) 发送同步消息

    这种可靠性同步地发送方式使用比较广发,比如:重要的消息通知,短信通知。

    public class SyncProducerTest {
        public static void main(String[] args) throws Exception {
            //1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            //2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
            //3.启动producer
            producer.start();
            for (int i = 0; i < 10; i++) {
                //4.创建消息对象,指定主题Topic、Tag和消息体
                /**
                 * 参数1:消息主题Topic
                 * 参数2:消息Tag
                 * 参数3:消息内容
                 */
                Message msg = new Message("base", "Tag1", ("Hello World!+ " + i).getBytes());
                //5.发送消息
                SendResult result = producer.send(msg);
                //发送状态
                SendStatus staus = result.getSendStatus();
                //消息ID
                String msgId = result.getMsgId();
                //消息接收队列ID
                int queueId = result.getMessageQueue().getQueueId();
                System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);
    
                System.out.println("发送结果:" + result);
                //线程睡一秒
                TimeUnit.MILLISECONDS.sleep(100);
            }
            //6.关闭生产者producer
            producer.shutdown();
        }
    }
    

    2)发送异步消息

    异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。

    public class AsyncProducerTest {
        public static void main(String[] args) throws Exception {
            //1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            //2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
            //3.启动producer
            producer.start();
            for (int i = 0; i < 10; i++) {
                //4.创建消息对象,指定主题Topic、Tag和消息体
                /**
                 * 参数1:消息主题Topic
                 * 参数2:消息Tag
                 * 参数3:消息内容
                 */
                Message msg = new Message("base", "Tag2", ("Hello World!+ " + i).getBytes());
                //5.发送消息
                producer.send(msg, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult result) {
                        //发送状态
                        SendStatus staus = result.getSendStatus();
                        //消息ID
                        String msgId = result.getMsgId();
                        //消息接收队列ID
                        int queueId = result.getMessageQueue().getQueueId();
                        System.out.println("发送状态:" + staus + ",消息ID:" + msgId + ",队列:" + queueId);
    
                        System.out.println("发送结果:" + result);
    
                    }
    
                    @Override
                    public void onException(Throwable e) {
                        e.printStackTrace();
                    }
                });
                //线程睡一秒
                TimeUnit.MILLISECONDS.sleep(100);
            }
            //6.关闭生产者producer
            producer.shutdown();
        }
    

    3) 单向发送消息

    这种方式主要用在不特别关心发送结果的场景,比如,日志发送

    public class OnewayProducerTest {
        public static void main(String[] args) throws Exception {
            //1.创建消息生产者producer,并制定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("group1");
            //2.指定Nameserver地址
            producer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
            //3.启动producer
            producer.start();
            for (int i = 0; i < 100; i++) {
                //4.创建消息对象,指定主题Topic、Tag和消息体
                /**
                 * 参数1:消息主题Topic
                 * 参数2:消息Tag
                 * 参数3:消息内容
                 */
                Message msg = new Message("base", "Tag3", ("Hello World!+单向消息 " + i).getBytes());
                //5.发送消息
                producer.sendOneway(msg);
                // 睡眠100ms
                TimeUnit.MILLISECONDS.sleep(100);
            }
            //6.关闭生产者producer
            producer.shutdown();
        }
    }
    
    

    2.2 消费消息

    1)负载均衡模式

    消费者采用负载均衡方式消费消息,多个消费者共同消费队列消息,每个消费者处理的消息不同。

    public class ClusterConsumerTest {
        public static void main(String[] args) throws Exception {
            //1.创建消费者Consumer,制定消费者组名
            DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
            //2.指定Nameserver地址
            comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
            //3.订阅主题Topic和Tag
            comsumer.subscribe("base", "Tag2");
            //设置负载均衡消费(默认模式)
            comsumer.setMessageModel(MessageModel.CLUSTERING);
            //4.设置回调函数,处理消息
            comsumer.registerMessageListener(new MessageListenerConcurrently() {
                //接受消息内容
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //5.启动消费者consumer
            comsumer.start();
        }
    }
    

    2) 广播模式

    消费者采用广播的方式消费消息,每个消费者消费的消息都是相同的。

    public class BroadcastConsumerTest {
        public static void main(String[] args) throws Exception {
            //1.创建消费者Consumer,制定消费者组名
            DefaultMQPushConsumer comsumer = new DefaultMQPushConsumer("group1");
            //2.指定Nameserver地址
            comsumer.setNamesrvAddr("192.168.52.139:9876;192.168.52.140:9876");
            //3.订阅主题Topic和Tag
            comsumer.subscribe("base", "Tag1");
            //广播模式消费
            comsumer.setMessageModel(MessageModel.BROADCASTING);
            //4.设置回调函数,处理消息
            comsumer.registerMessageListener(new MessageListenerConcurrently() {
                //接受消息内容
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    for (MessageExt msg : msgs) {
                        System.out.println(new String(msg.getBody()));
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
            //5.启动消费者consumer
            comsumer.start();
        }
    }
    

    相关文章

      网友评论

          本文标题:消息发送基本类型

          本文链接:https://www.haomeiwen.com/subject/ykscfhtx.html