前言:该博客主要是记录自己学习的过程,方便以后查看,当然也希望能够帮到大家。
必学中间件,废话不多说,开整吧。完整代码地址在结尾!!
第一步,在pom.xml加入依赖,如下
<!-- RocketMQ -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.0.4</version>
</dependency>
第二步,在application.yml配置文件配置rocketmq
# 配置rocketmq
rocketmq:
name-server: xxx:9876;xxx:9876
producer:
group: dubbo-demo-service-manager-producer-group-1 #生产者
consumer:
group: dubbo-demo-service-manager-consumer-group-1 #消费者
第三步,创建简单消息体,Message,简单消息请求实体,AddMessageReq
Message
import lombok.Data;
import java.io.Serializable;
/**
* @Description: 消息体
* @Author: jinhaoxun
* @Date: 2020/2/11 上午9:52
* @Version: 1.0.0
*/
@Data
public class Message<T> implements Serializable {
private String id;
private T content;
}
AddMessageReq
import com.luoyu.rocketmq.entity.Message;
import lombok.Data;
/**
* @version 1.0
* @author jinhaoxun
* @date 2018-05-09
* @description 发送RocketMQ消息请求实体类
*/
@Data
public class AddMessageReq {
private String topic;
private String tag;
private Message<String> message;
}
第四步,创建消息监听器,RocketMQConsumerListener,消费者消费消息使用,只要有消息就会进行消费,可以指定不同的topic,tag,消费者
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
@RocketMQMessageListener(topic = "topic1", selectorExpression = "tag1", consumeMode = ConsumeMode.ORDERLY, consumerGroup = "${rocketmq.consumer.group}")
public class RocketMQConsumerListener implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
log.info("接收到消息:" + s);
}
}
第五步,创建服务类器,RocketMQService,RocketMQServiceImpl,如下
RocketMQService
import com.luoyu.rocketmq.entity.request.AddMessageReq;
public interface RocketMQService {
boolean sendMessage(AddMessageReq addMessageReq);
boolean syncSendMessage(AddMessageReq addMessageReq);
}
RocketMQServiceImpl
import com.luoyu.rocketmq.entity.request.AddMessageReq;
import com.luoyu.rocketmq.service.RocketMQService;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
@Slf4j
@Service
public class RocketMQServiceImpl implements RocketMQService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Override
public boolean sendMessage(AddMessageReq addMessageReq) {
//指定topic,tag
rocketMQTemplate.convertAndSend(addMessageReq.getTopic() + ":" + addMessageReq.getTag(), addMessageReq.getMessage());
return true;
}
@Override
public boolean syncSendMessage(AddMessageReq addMessageReq) {
rocketMQTemplate.asyncSend(addMessageReq.getTopic(), addMessageReq.getMessage(), new SendCallback() {
// 实现消息发送成功的后续处理
public void onSuccess(SendResult var1) {
log.info("async onSucess SendResult:{}", var1);
}
// 实现消息发送失败的后续处理
public void onException(Throwable var1) {
log.info("async onException Throwable:{}", var1);
}
});
return true;
}
}
第六步,编写单元测试类,RocketmqApplicationTests,并进行测试,只列举了部分方法
import com.luoyu.rocketmq.entity.Message;
import com.luoyu.rocketmq.entity.request.AddMessageReq;
import com.luoyu.rocketmq.service.RocketMQService;
import lombok.extern.slf4j.Slf4j;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
@Slf4j
// 获取启动类,加载配置,确定装载 Spring 程序的装载方法,它回去寻找 主配置启动类(被 @SpringBootApplication 注解的)
@SpringBootTest
class RocketmqApplicationTests {
@Autowired
private RocketMQService rocketMQService;
@Test
void syncSendMessageTest() throws InterruptedException {
//发送同步消息
Message<String> message = new Message<>();
message.setId("123");
message.setContent("测试一下");
AddMessageReq addMessageReq = new AddMessageReq();
addMessageReq.setTopic("topic2");
addMessageReq.setMessage(message);
rocketMQService.syncSendMessage(addMessageReq);
// 让主线程睡眠10秒
Thread.currentThread().sleep(10000);
}
@Test
void sendMessageTest() throws InterruptedException {
//指定topic,tag
AddMessageReq addMessageReq = new AddMessageReq();
addMessageReq.setTopic("topic1");
addMessageReq.setTag("tag1");
Message<String> message = new Message<>();
message.setId("123");
message.setContent("测试一下");
addMessageReq.setMessage(message);
rocketMQService.sendMessage(addMessageReq);
// 让主线程睡眠10秒
Thread.currentThread().sleep(10000);
}
@BeforeEach
void testBefore(){
log.info("测试开始!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
@AfterEach
void testAfter(){
log.info("测试结束!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
}
完整代码地址:https://github.com/Jinhx128/springboot-demo
注:此工程包含多个module,本文所用代码均在rocketmq-demo模块下
后记:本次分享到此结束,本人水平有限,难免有错误或遗漏之处,望大家指正和谅解,欢迎评论留言。
网友评论