消息中间件
什么是消息中间件
消息中间件:是利用高效可靠的消息传递机制进行异步的数据传输,并基于数据通信进行分布式系统的集成。通过提供消息队列模型和消息传递机制,可以在分布式环境下扩展进程间的通信
消息中间件可以做什么
应用程序之间不采取直接通信,而是使用消息中间作为中介,做到数据的异步通信。开发人员不需要考虑网络协议和远程调用的问题,只需要通过各消息中间件所提供的api,就可以简单的完成消息推送,和消息接收的业务功能。
消息的生产者将消息存储到队列中,消息的消费者不一定马上消费消息,可以等到自己想要用到这个消息的时候,再从相应的队列中去获取消息。这样的设计可以很好的解决,大数据量数据传递所占用的资源,使数据传递和平台分开,不再需要分资源用于数据传输,可以将这些资源用去其他想要做的事情上。
关于为什么使用RocketMQ
1 . 支持严格的消息顺序
2 .支持Topic与Queue两种模式
3 .亿级消息堆积能力
4 .比较友好的分布式特性
5 .同时支持Push与Pull方式消费消息
6 .历经多次天猫双十一海量消息考验
7 .RocketMQ是纯java编写,基于通信框架Netty。
SpringBoot整合
看一下pom.xml的文件内容
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.suning.mq</groupId>
<artifactId>rocketmq</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.2.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
</project>
先定义一个消息保存的载体:
public class Producer {
public static void main(String[] args) throws MQClientException {
DefaultMQProducer producer = new DefaultMQProducer("test-group");
producer.setNamesrvAddr("localhost:9876");
producer.setInstanceName("rmq-instance");
producer.start();
try {
for (int i=0;i<100;i++){
User user = new User();
user.setLoginName("abc"+i);
user.setPwd(String.valueOf(i));
Message message = new Message("log-topic", "user-tag",JSON.toJSONString(user).getBytes());
System.out.println("生产者发送消息:"+JSON.toJSONString(user));
producer.send(message);
}
} catch (Exception e) {
e.printStackTrace();
}
producer.shutdown();
}
/**
* 发送用户消息
*/
static class User implements Serializable{
private String loginName;
private String pwd;
public String getLoginName() {
return loginName;
}
public void setLoginName(String loginName) {
this.loginName = loginName;
}
public String getPwd() {
return pwd;
}
public void setPwd(String pwd) {
this.pwd = pwd;
}
}
定义消息的发送者:
@Component
public class UserProducer {
/**
* 生产者的组名
*/
@Value("${suning.rocketmq.producerGroup}")
private String producerGroup;
/**
* NameServer 地址
*/
@Value("${suning.rocketmq.namesrvaddr}")
private String namesrvAddr;
@PostConstruct
public void produder() {
DefaultMQProducer producer = new DefaultMQProducer(producerGroup);
producer.setNamesrvAddr(namesrvAddr);
try {
producer.start();
for (int i = 0; i < 100; i++) {
UserContent userContent = new UserContent(String.valueOf(i),"abc"+i);
String jsonstr = JSON.toJSONString(userContent);
System.out.println("发送消息:"+jsonstr);
Message message = new Message("user-topic", "user-tag", jsonstr.getBytes(RemotingHelper.DEFAULT_CHARSET));
SendResult result = producer.send(message);
System.err.println("发送响应:MsgId:" + result.getMsgId() + ",发送状态:" + result.getSendStatus());
}
} catch (Exception e) {
e.printStackTrace();
} finally {
producer.shutdown();
}
}
}
定义消息的消费者:
public class Consumer {
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-group");
consumer.setNamesrvAddr("localhost:9876");
consumer.setInstanceName("rmq-instance");
consumer.subscribe("log-topic", "user-tag");
consumer.registerMessageListener(new MessageListenerConcurrently() {
public ConsumeConcurrentlyStatus consumeMessage(
List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
System.out.println("消费者消费数据:"+new String(msg.getBody()));
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
consumer.start();
}
}
网友评论