RocketMQ(三)消息发送与接收

作者: 我犟不过你 | 来源:发表于2020-11-26 17:28 被阅读0次

一、搭建项目引入依赖

访问https://mvnrepository.com/获取rocketmq的starter依赖。

mvn仓库

在pom中添加下依赖:

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-spring-boot-starter -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.1.1</version>
       </dependency>

刷新maven依赖后,在配置文件增加以下配置:

rocketmq:
  name-server: http://101.200.36.168:9876
  producer:
    #指定消息发送者的组,在控制台查询时会用到
    group: test

二、测试代码

2.1 producer消息发送者

package com.cloud.bssp.message;

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

/**
 * RocketMqProducer
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Component
public class RocketMqProducer {

    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    /**
     * 发送普通消息
     */
    public void sendMsg(String msgBody) {
        rocketMQTemplate.syncSend("queue_test_topic", MessageBuilder.withPayload(msgBody).build());
    }
}

2.2 consumer 消息接收者

package com.cloud.bssp.message;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * RocketMqListener 
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = "queue_test_topic", selectorExpression = "*", consumerGroup = "queue_group_test")
public class RocketMqListener implements RocketMQListener<MessageExt> {

    @Override
    public void onMessage(MessageExt messageExt) {
        byte[] body = messageExt.getBody();
        String msg = new String(body);
        log.info("接收到消息:{}", msg);

    }
}

2.3 测试代码

这里直接创建一个Controller:

package com.cloud.bssp.message;

import com.cloud.bssp.util.R;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * RocketMqTest
 * @date: 2020/11/26
 * @author weirx
 * @version 3.0
 */

@Slf4j
@Component
@RestController
@RequestMapping("/message")
public class RocketMqTest {

    @Autowired
    private  RocketMqProducer rocketMqProducer;

    @RequestMapping("/send")
    public void sendMsg(){
        rocketMqProducer.sendMsg("我来测试一下rocketmq");
    }
}

2.4 发送消息实验

启动项目并调用上一步中的接口:http://localhost:8085/message/send

收到以下结果,至此我们的代码与rocketmq成功建立了连接:

消息发送结果

相关文章

网友评论

    本文标题:RocketMQ(三)消息发送与接收

    本文链接:https://www.haomeiwen.com/subject/ehxziktx.html