美文网首页
rocketmq集群消费以及广播消费

rocketmq集群消费以及广播消费

作者: 小小的小帅 | 来源:发表于2022-09-01 18:06 被阅读0次

    pom依赖:

            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-client</artifactId>
                <version>4.6.0</version>
                <scope>compile</scope>
            </dependency>
    

    生产者:

    package com.hscs.cux;
    
    import org.apache.rocketmq.client.producer.DefaultMQProducer;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.common.message.Message;
    import org.apache.rocketmq.remoting.common.RemotingHelper;
    
    
    /**
     * Created by hand on 2018/5/2.
     */
    public class Producer {
    
        public static void main(String[] args) throws Exception {
            // 创建DefaultMQProducer类并设定生产者名称HSCS_DEV_TST_MSG
            DefaultMQProducer mqProducer = new DefaultMQProducer("hscs_producer-group-test");
            // 设置NameServer地址,如果是集群的话,使用分号;分隔开
            mqProducer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
            // 消息最大长度 默认4M
            mqProducer.setMaxMessageSize(4096);
            // 发送消息超时时间,默认3000
            mqProducer.setSendMsgTimeout(3000);
            // 发送消息失败重试次数,默认2
            mqProducer.setRetryTimesWhenSendAsyncFailed(2);
            // 启动消息生产者
            mqProducer.start();
    
            // 循环十次,发送十条消息
            for (int i = 1; i <= 10; i++) {
                String msg = "hello, 这是第" + i + "条同步消息";
                // 创建消息,并指定Topic(主题),Tag(标签)和消息内容
                Message message = new Message("HSCS_DEV_TST_MSG", "", msg.getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送同步消息到一个Broker,可以通过sendResult返回消息是否成功送达
                SendResult sendResult = mqProducer.send(message);
                System.out.println(sendResult);
            }
    
            // 如果不再发送消息,关闭Producer实例
            mqProducer.shutdown();
        }
    }
    
    

    消费者:

    package com.hscs.cux;
    
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
    import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
    import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
    import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
    import java.nio.charset.StandardCharsets;
    import java.util.List;
    
    public class Consumer {
    
        public static void main(String[] args) throws Exception {
            // 创建DefaultMQPushConsumer类并设定消费者名称
            DefaultMQPushConsumer mqPushConsumer = new DefaultMQPushConsumer("hscs_consumer-group-test");
    
            // 设置NameServer地址,如果是集群的话,使用分号;分隔开
            mqPushConsumer.setNamesrvAddr("xxx.xxx.xxx.xxx:9876");
    
            // 设置Consumer第一次启动是从队列头部开始消费还是队列尾部开始消费
            // 如果不是第一次启动,那么按照上次消费的位置继续消费
            mqPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    
            // 设置消费模型,集群还是广播,默认为集群
            //mqPushConsumer.setMessageModel(MessageModel.CLUSTERING);
            mqPushConsumer.setMessageModel(MessageModel.BROADCASTING);
            // 消费者最小线程量
            mqPushConsumer.setConsumeThreadMin(5);
    
            // 消费者最大线程量
            mqPushConsumer.setConsumeThreadMax(10);
    
            // 设置一次消费消息的条数,默认是1
            mqPushConsumer.setConsumeMessageBatchMaxSize(1);
    
            // 订阅一个或者多个Topic,以及Tag来过滤需要消费的消息,如果订阅该主题下的所有tag,则使用*
            mqPushConsumer.subscribe("HSCS_DEV_TST_MSG", "*");
    
            // 注册回调实现类来处理从broker拉取回来的消息
            mqPushConsumer.registerMessageListener(new MessageListenerConcurrently() {
                // 监听类实现MessageListenerConcurrently接口即可,重写consumeMessage方法接收数据
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    MessageExt messageExt = msgList.get(0);
                    String body = new String(messageExt.getBody(), StandardCharsets.UTF_8);
                    System.out.println("消费者接收到消息: " + messageExt.toString() + "---消息内容为:" + body);
                    // 标记该消息已经被成功消费
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });
    
            // 启动消费者实例
            mqPushConsumer.start();
        }
    }
    

    spring集成:
    application.yml修改:

    rocketmq:
      # rocketmq地址
      name-server: 192.168.101.11:9876
      producer:
        # 必须填写 group
        group: test-group
    

    生产端:

    package com.anker.scs.ankerinterface.infra.message;
    
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.web.bind.annotation.GetMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    @RestController
    public class TestController {
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 普通消息投递
         */
        @GetMapping("/test")
        public String test(String msg) {
            rocketMQTemplate.convertAndSend("add-bonus", "testMessaging");
            return "投递消息 => " + msg + " => 成功";
        }
    }
    

    消费端:

    package com.anker.scs.ankerinterface.infra.message;
    
    import com.alibaba.fastjson.JSONObject;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
    import org.apache.rocketmq.spring.annotation.ConsumeMode;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQPushConsumerLifecycleListener;
    import org.springframework.stereotype.Component;
    import org.springframework.transaction.annotation.Transactional;
    
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "MDP_TOPIC_COMPANY_INFO_SIT",
            consumerGroup = "TMS_GROUP_AMDP_COMPANY",
            consumeThreadMax = 8, consumeMode = ConsumeMode.ORDERLY)
    public class AmdpCompanyListener implements RocketMQListener<JSONObject>, RocketMQPushConsumerLifecycleListener {
    
        @Override
        public void prepareStart(DefaultMQPushConsumer defaultMQPushConsumer) {
    
        }
    
        @Override
        @Transactional(rollbackFor = Exception.class)
        public void onMessage(JSONObject jsonObject) {
            log.info("MDP_TOPIC_COMPANY_INFO_SIT:{}", jsonObject);
            //List<CompanyDTO> companyDTOList = JSON.parseArray(jsonObject.getString("data"), CompanyDTO.class);
        }
    
    }
    
    

    相关文章

      网友评论

          本文标题:rocketmq集群消费以及广播消费

          本文链接:https://www.haomeiwen.com/subject/oussnrtx.html