中间件大比拼
主流的中间有ActiveMq、kafka、rocketMq、rabbitMq
- ActiveMq是jms规范的中间,是老牌的中间件了,其优点就是API丰富、支持多种集群模式、并且与其他组件有非常完善和稳定的集成体系,但是在性能方面比上述的几个要低很多,适用于低并发场景
- kafka是中间件业界的性能之王,追求高吞吐量,是开源的分布式发布订阅消息系统,其优点就是高性能,高可用,支持持久化,在廉价的机器中也能达到每秒处理100kb以上数据的能力,但是对消息的重复、丢失、错误没有严格的要求,也就是说kafka的特点就是对消息精准度要求低,对性能吞吐量要求高,低并发、高并发场景都可以适用,适用于对数据精度度要求不高的业务场景,常用于互联网的数据收集业务
- rocketMq阿里巴巴开源的中间件,经过了数次淘宝双十一的洗礼后依然屹立不倒而声名鹊起,是纯java语言编写,rocketMq是基于kafka的基础上做的优化,其对kafka的消息传输可靠性上做了提升,并支持事务,所以rocketMq同样具备了高性能,高可用性的特点,适合大规模高并发的分布式系统,广泛应用于充值交易、消息推送等业务场景
- rabbitMq是Erlang语言编写,它是开源的基于AMQP协议的消息代理和消息队列服务器,由于内部结构是基于AMQP规范,其消息通过内部路由,路由键,消息队列等组件的传送,对消息传递的稳定性,可靠性,安全性的要求比较高,甚至可以做到消息百分之百的精准投递,但是对性能的要求排在其次,适用于对数据的传输精准度、数据的一致性、安全性、可靠性很高的场景。
总结
在性能要求上kafka是最好的,在数据的传输的可靠性上rabbitMq是最好的,在性能和数据传输可靠性上择中的话就是rocketMq比较适合
AMQP简述
AMQP用于提供统一消息模式服务,它是应用层的高级消息队列模式,也是消息中间件设计规范(jms也是消息中间件设计规范)的一种,rabbitma就是遵循AMQP的规范
AMQP模型结构
image.pngAMQP核心概念
- server:又称broker,是AMQP的服务端,接受客户端的连接,发送消息到给客户端,实现了AMQP的实体服务
- connection:连接,客户端与服务端的网络连接
- channel:网络信道,客户端与服务端进行消息读写的通道。客户端可以建立多个channel,每个channel都代表着一个会话任务
- message:是客户端与服务之间传送的消息,message由properties和body组成,body主要由消息的实际内容组成,而properties是由消息的高级特性组成,properties可以设用于置消息的优先级、延迟等属性
- vitaulhost:虚拟地址,主要是进行逻辑隔离,rabbitMq中可以包含多个VitaulHost,每个vitaulhost中可以包含多个exchange和queue,但是同一个VitaulHost中不能含有相同名称的exchange和queue,客户端可以选择投递到哪个VitaulHost中,vitaulhost默认是"/"
- exchange:交互机,绑定queue,并接收客户端消息,通过routingKey将消息投递到绑定的queue中
- binding:exchange和queue的虚拟连接,bingding中包含了routingKey,每个binding都绑定着一个队列,当binding的routingKey和exchange的routingKey有对应的关系时(这个对应的关系还得看exchange的类型,例如Direct exchange就要求routingKey要完全相等才对应的上)exchange才会把相应的信息投递到bingding绑定的队列中
- routingKey:路由规则,exchange通过routingKey知道如何进行路由
- queue:消息队列,是存放消息的物理结构的定义,客户端通过监听queue即可获取消息
rabbitmq
概述
rabbitmq是Erlang语言编写,它是开源的基于AMQP协议的消息代理和消息队列服务器,rabbitmq具有高性能、高可用、高可靠的特性,具有消息投递模式丰富的特点,rabbitmq用于使完全不同的应用之间能够共享数据互相通信。由于rabbitmq是使用Erlang编写,Erlang广泛应用于交换机领域,Erlang语言在数据交互和数据同步方面性能优秀与原生Socket一样有着出色的延迟,使得rabbitmq在数据交互上具备了天然的优势
rabbitMq消息流转过程
客户端(生产者)将预先指定消息投递到哪个VitaulHost中的路由(exchange)上,同时客户端也会指定routingKey,exchange通过消息的routingKey决定将消息投递到哪个binding的queue中,监听了相应的queue的客户端(消费者)可以从queue中获取相应的消息
image.pngrabbitMq收发消息的简单代码演示
//properties文件配置
rabbitmq.id=127.0.0.1
rabbitmq.port=5672
rabbitmq.username=guest
rabbitmq.password=guest
rabbitmq.virtualHost=testhost
将rabbitMq注册的spring容器
@Configuration
@PropertySource("classpath:/myApplication.properties")
@ComponentScan(value = "com.example.rabbitmq.demo")
public class RabbitMqConfig {
@Value("${rabbitmq.id}")
private String ip;
@Value("${rabbitmq.port}")
private int port;
@Value("${rabbitmq.username}")
private String username;
@Value("${rabbitmq.password}")
private String password;
/* @Value("${rabbitmq.virtualHost}")
private String virtualHost;*/
@Bean
public Connection connectionFactory(){
try {
ConnectionFactory connectionFactory=new ConnectionFactory();
connectionFactory.setHost(ip);
connectionFactory.setPort(port);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
return connectionFactory.newConnection();
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
}
消息调用
@RestController
public class Test {
@Autowired
private Connection connection;
private Channel channel=null;
@PostConstruct
public void newChannel() throws Exception{
this.channel = this.connection.createChannel();
}
@PreDestroy
public void destroyChannel() throws Exception{
this.channel.close();
this.connection.close();
}
@GetMapping("/consummer")
public String consummer(){
if (this.connection!=null){
try {
Channel channel = this.channel
DefaultConsumer defaultConsumer=new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
long deliveryTag = envelope.getDeliveryTag();
//消息id
System.out.println("消息id: " + deliveryTag);
//交换机
System.out.println("交换机: " + envelope.getExchange());
//路由key
System.out.println("路由key: " + envelope.getRoutingKey());
//接受到的消息
System.out.println("收到的消息: " + new String(body, "utf-8"));
System.out.println("---------------------------------");
/*channel.basicAck(deliveryTag, false);*/
}
};
String exchangeName="myexchange";
String exchangeType="direct";
String queueName="rutingkey";
String routingKey="rutingkey";
channel.exchangeDeclare(exchangeName, exchangeType, true, false, null);
//创建队列("rutingkey","持久化","独占队列","脱离exchange是否被自动删除","拓展参数")
channel.queueDeclare(queueName,true,false,false,null);
//设置exchange和queue的绑定关系,并且声明队列的routingKey
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, defaultConsumer);
} catch (Exception e) {
e.printStackTrace();
}
return "不是空的";
}else {
return "是空的";
}
}
@GetMapping("/procuder")
public String procuder(){
if (this.connection!=null){
try {
Channel channel = this.channel;
String exchangeName="myexchange";
String routingKey="rutingkey";
String s="helloword"+UUID.randomUUID().toString();
//发布消息时使用rutingkey规则上
channel.basicPublish(exchangeName,routingKey,null,s.getBytes());
} catch (Exception e) {
e.printStackTrace();
}
return "不是空的";
}else {
return "是空的";
}
}
}
confirm模式
当生产者发送消息的同时监听一个事件,如果消费者消费成功则监听到相应的ack
生产者
public class Producer {
public static void main(String[] args) throws Exception {
//1 创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
//2 获取Connection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
//4 指定我们的消息投递模式: 消息的确认模式
channel.confirmSelect();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.save";
//5 发送一条消息
String msg = "Hello RabbitMQ Send confirm message!";
channel.basicPublish(exchangeName, routingKey, null, msg.getBytes());
//6 添加一个确认监听
channel.addConfirmListener(new ConfirmListener() {
@Override
public void handleNack(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------no ack!-----------");
}
@Override
public void handleAck(long deliveryTag, boolean multiple) throws IOException {
System.err.println("-------ack!-----------");
}
});
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
//1创建ConnectionFactory
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setPassword("guest");
connectionFactory.setUsername("guest");
//2 获取C onnection
Connection connection = connectionFactory.newConnection();
//3 通过Connection创建一个新的Channel
Channel channel = connection.createChannel();
String exchangeName = "test_confirm_exchange";
String routingKey = "confirm.#";
String queueName = "test_confirm_queue";
//4 声明交换机和队列 然后进行绑定设置, 最后制定路由Key
channel.exchangeDeclare(exchangeName, "topic", true);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//5 创建消费者
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, queueingConsumer);
while(true){
Delivery delivery = queueingConsumer.nextDelivery();
String msg = new String(delivery.getBody());
System.err.println("消费端: " + msg);
}
}
}
限流
如果消费端的消费速度跟不上生产端的消息生产速度,那么很有可能会造成,消息大量堆积,每次都会有大量的消息一次打到消费端,给消费端造成很大的压力
设置手工签收,并且消费成功后向broker发送ack,才会让消费者继续消费消息,不然消息就会排队等待消费
basicQos的参数:
- prefetchSize:消费消息的大小,0表示不做限制;
- prefetchCount:同时消费消息的数量;
- global:限流策略应用的级别,true:Channel级别;false:consumer级别;
生产者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_qos_exchange";
String routingKey = "qos.save";
String msg = "Hello RabbitMQ QOS Message";
for(int i =0; i<5; i ++){
channel.basicPublish(exchange, routingKey, true, null, msg.getBytes());
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_qos_exchange";
String queueName = "test_qos_queue";
String routingKey = "qos.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
//参数1:消费消息的大小,0表示不做限制
//参数2:消费者一次最多消费消息的数量
//参数3:限流策略应用的级别,true:Channel级别;false:consumer级别
channel.basicQos(0, 1, false);
//参数2 限流方式 第一件事就是 autoAck设置为 false,也就是手工签收消息
//参数3:消费端自定义监听
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
消费端自定义监听
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
//消费成功则发送ACK到broker中
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
不可路由消息的处理
image.png生产者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_return_exchange";
String routingKey = "return.save";
String routingKeyError = "abc.save";
String msg = "Hello RabbitMQ Return Message";
channel.addReturnListener(new ReturnListener() {
@Override
public void handleReturn(int replyCode, String replyText, String exchange,
String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("---------handle return----------");
System.err.println("replyCode: " + replyCode);
System.err.println("replyText: " + replyText);
System.err.println("exchange: " + exchange);
System.err.println("routingKey: " + routingKey);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
});
//参数3,true:监听器会接收到不可路由消息,然后我们可以对消息进行后续处理:存库、重发或者记录日志等。false:broker端会自动删除不可路由消息,监听器是监听不到的
channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
//channel.basicPublish(exchange, routingKeyError, true, null, msg.getBytes());
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_return_exchange";
String routingKey = "return.#";
String queueName = "test_return_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
ACK与重回队列
Nack:失败应答,重回队列的尾端
Ack:成功应答
生产者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_ack_exchange";
String routingKey = "ack.save";
for(int i =0; i<5; i ++){
//自定义头信息
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("num", i);
//RabbitMQ发送消息附带BasicProperties属性详解
//BasicPropertie属性字段详解
// contentType:消息的内容类型,如:text/plain
// contentEncoding:消息内容编码
// headers:设置消息的header,类型为Map<String,Object>
// deliveryMode:1(nopersistent)非持久化,2(persistent)持久化
// priority:消息的优先级
// correlationId:关联ID
//replyTo:用于指定回复的队列的名称
// expiration:消息的失效时间
// messageId:消息ID
// timestamp:消息的时间戳
// type:类型
// userId:用户ID
// appId:应用程序ID
// custerId:集群ID
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.headers(headers)
.build();
String msg = "Hello RabbitMQ ACK Message " + i;
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchangeName = "test_ack_exchange";
String queueName = "test_ack_queue";
String routingKey = "ack.#";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
channel.queueDeclare(queueName, true, false, false, null);
channel.queueBind(queueName, exchangeName, routingKey);
// 手工签收 必须要关闭 autoAck = false
channel.basicConsume(queueName, false, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
private Channel channel ;
public MyConsumer(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("body: " + new String(body));
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
if((Integer)properties.getHeaders().get("num") == 0) {
// 参数2:设置为true 批量消息处理,设置为false单条消息处理
// 参数3:设置为true 会把消费失败的消息从新添加到队列的尾端,设置为false不会重新回到队
channel.basicNack(envelope.getDeliveryTag(), false, true);
} else {
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
}
DLX:死信队列
- 死信队列,英文缩写:DLX 。Dead Letter Exchange(死信交换机),当消息成为Dead message后,可以被重新发送到另一个交换机,这个交换机就是DLX。
- DLX是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时,RabbitMQ就会自动地将这个消息重新发布到设置的DLX上去,进而被路由到另一个队列,即死信队列
消息变成死信有以下几种情况 :
- 队列消息长度到达限制;
- 消费者拒接消费消息,basicNack/basicReject,并且不把消息重新放入原目标队列,requeue=false;
- 原队列存在消息过期设置,消息到达超时时间未被消费
死信队列的设置 :
- 首先要设置死信队列的exchange和queue, 然后进行绑定
Exchange : dlx.exchange
Queue : dlx.queue
RoutingKey : # - 然后正常声明交换机, 队列, 绑定, 只不过需要在队列加上一个扩展参数即可 : arguments.put(“x-dead-letter-exchange”, “dlx.exchange”);
- 这样消息在过期, reject或nack(requeue要设置成false), 队列在达到最大长度时, 消息就可以直接路由到死信队列
生产者
public class Producer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
String exchange = "test_dlx_exchange";
String routingKey = "dlx.save";
String msg = "Hello RabbitMQ DLX Message";
for(int i =0; i<1; i ++){
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder()
.deliveryMode(2)
.contentEncoding("UTF-8")
.expiration("10000")
.build();
channel.basicPublish(exchange, routingKey, true, properties, msg.getBytes());
}
}
}
消费者
public class Consumer {
public static void main(String[] args) throws Exception {
ConnectionFactory connectionFactory = new ConnectionFactory();
connectionFactory.setHost("192.168.11.76");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
// 这就是一个普通的交换机 和 队列 以及路由
String exchangeName = "test_dlx_exchange";
String routingKey = "dlx.#";
String queueName = "test_dlx_queue";
channel.exchangeDeclare(exchangeName, "topic", true, false, null);
Map<String, Object> agruments = new HashMap<String, Object>();
agruments.put("x-dead-letter-exchange", "dlx.exchange");
//这个agruments属性,要设置到声明队列上
channel.queueDeclare(queueName, true, false, false, agruments);
channel.queueBind(queueName, exchangeName, routingKey);
//要进行死信队列的声明:
channel.exchangeDeclare("dlx.exchange", "topic", true, false, null);
channel.queueDeclare("dlx.queue", true, false, false, null);
channel.queueBind("dlx.queue", "dlx.exchange", "#");
channel.basicConsume(queueName, true, new MyConsumer(channel));
}
}
public class MyConsumer extends DefaultConsumer {
public MyConsumer(Channel channel) {
super(channel);
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.err.println("-----------consume message----------");
System.err.println("consumerTag: " + consumerTag);
System.err.println("envelope: " + envelope);
System.err.println("properties: " + properties);
System.err.println("body: " + new String(body));
}
}
image.png
exchange:接收消息,并根据路由键转发消息所绑定的队列
exchange的属性
- name:交换机名称
- type:交换机类型,direct、topic、fanout、headers
- durable:是否持久化,RabbitMQ关闭后,没有持久化的Exchange将被清除
- autoDelete:是否自动删除exchange,如果没有与之绑定的Queue,直接删除该exchange
- internal:Exchange是否RabbitMQ内部使用,如果为true,只能通过Exchange到Exchange,默认为false
- arguments:扩展参数,用于Exchange自定化属性
exchange的类型:
-
Direct exchange(直连交换机): 所有发送到Direct exchange中的消息都会被转发到RoutingKey指定的队列中,该模式下exchange binding的队列和消息的routingKey必须匹配才能进行消息的流转,否则该消息将会被丢弃。如果消息没有被指定使用的是哪种exchange,那么消息将会默认的使用rabbitMq中的default exchange,default exchange 默认使用的模式是Direct
image.png
-
Topic exchange(主题交换机): exchange将消息的routingKey和队列的routingKey进行模糊匹配(使用通配符)
image.png
-
Fanout exchange(广播交换机): 不处理routingKey,将消息简单的投递到exchange上binding的所有queue,由于不处理routingKey,是性能最好的模式
image.png
100%投递方案:
step1:将数据落地,先将业务数据存库,同时将消息记录也存库
step2:将消息投递到Mq服务器
step3:设置消息的响应状态,例如0:发送中,1:成功,2:失败,消费者等待服务端回应。
step4:消费者获取回应状态,并且将状态存库,更新库中消息的状态
step5:定时任务将超过固定时间内状态为未发送的消息抽取
step6:重新发送消息
step7:重发超过一定次数的消息,对其进行失败的标记,然后人工发送或者其他的方式进行消息的补偿
image.png
网友评论