RocketMQ(三)消息发送与接收

作者: 我犟不过你 | 来源:发表于2020-11-26 17:28 被阅读0次

    一、搭建项目引入依赖

    访问https://mvnrepository.com/获取rocketmq的starter依赖。

    mvn仓库

    在pom中添加下依赖:

    <!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
            <dependency>
                <groupId>org.apache.rocketmq</groupId>
                <artifactId>rocketmq-spring-boot-starter</artifactId>
                <version>2.1.1</version>
           </dependency>
    

    刷新maven依赖后,在配置文件增加以下配置:

    rocketmq:
      name-server: http://101.200.36.168:9876
      producer:
        #指定消息发送者的组,在控制台查询时会用到
        group: test
    

    二、测试代码

    2.1 producer消息发送者

    package com.cloud.bssp.message;
    
    import org.apache.rocketmq.client.producer.SendCallback;
    import org.apache.rocketmq.client.producer.SendResult;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.messaging.support.MessageBuilder;
    import org.springframework.stereotype.Component;
    
    /**
     * RocketMqProducer
     * @date: 2020/11/26
     * @author weirx
     * @version 3.0
     */
    @Component
    public class RocketMqProducer {
    
        @Autowired
        private RocketMQTemplate rocketMQTemplate;
    
        /**
         * 发送普通消息
         */
        public void sendMsg(String msgBody) {
            rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());
        }
    }
    
    

    2.2 consumer 消息接收者

    package com.cloud.bssp.message;
    
    import lombok.extern.slf4j.Slf4j;
    import org.apache.rocketmq.common.message.MessageExt;
    import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
    import org.apache.rocketmq.spring.core.RocketMQListener;
    import org.apache.rocketmq.spring.core.RocketMQTemplate;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    
    /**
     * RocketMqListener 
     * @date: 2020/11/26
     * @author weirx
     * @version 3.0
     */
    @Slf4j
    @Component
    @RocketMQMessageListener(topic = "queue_test_topic", selectorExpression = "*", consumerGroup = "queue_group_test")
    public class RocketMqListener implements RocketMQListener<MessageExt> {
    
        @Override
        public void onMessage(MessageExt messageExt) {
            byte[] body = messageExt.getBody();
            String msg = new String(body);
            log.info("接收到消息:{}", msg);
    
        }
    }
    
    

    2.3 测试代码

    这里直接创建一个Controller:

    package com.cloud.bssp.message;
    
    import com.cloud.bssp.util.R;
    import lombok.extern.slf4j.Slf4j;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.stereotype.Component;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * RocketMqTest
     * @date: 2020/11/26
     * @author weirx
     * @version 3.0
     */
    
    @Slf4j
    @Component
    @RestController
    @RequestMapping("/message")
    public class RocketMqTest {
    
        @Autowired
        private  RocketMqProducer rocketMqProducer;
    
        @RequestMapping("/send")
        public void sendMsg(){
            rocketMqProducer.sendMsg("我来测试一下rocketmq");
        }
    }
    

    2.4 发送消息实验

    启动项目并调用上一步中的接口:http://localhost:8085/message/send

    收到以下结果,至此我们的代码与rocketmq成功建立了连接:

    消息发送结果

    相关文章

      网友评论

        本文标题:RocketMQ(三)消息发送与接收

        本文链接:https://www.haomeiwen.com/subject/ehxziktx.html