我们为什么要用mq?
我们试想一下这个应用场景:用户下单支付,一个请求通过前端,调用服务端的支付系统,订单系统请求第三方支付,比如支付宝,微信,连连支付等,再返回支付结果给到前端,前端展示支付结果。非常简单的一个应用场景,但是中间会有这样的问题:
请求三方支付的时候,三方支付的处理时间一般会比较久,可能并不一定直接返回的就是支付成功,也可能返回支付处理中,那么当支付成功后(一般都是三方支付通过notify或者我们主动查询来得知支付结果)后续我们如何来通知订单系统呢?http或者dubbo吗,http和dubbo这类型的通讯方式都需要同步等待返回结果,当系统并发量不高,资源够用的时候,没问题,但是一旦资源紧张,这样的处理方式无疑是致命的,这时候mq就是比较好的解决方案
我们可以用这样的思路来解决这个问题,当支付系统拿到支付结果后,我们将支付结果的报文放入消息队列中,由订单系统来消费这条消息,订单系统根据消息的报文内容去处理自己的业务逻辑(比如修改订单状态为支付成功),至于订单系统什么时候去消费这条消息,是否成功消费了这条消息,支付系统是不care的。总结一下,在多系统之间进行的异步通信,可以采用mq来进行通信,是节约资源非常好的选择
rabbitmq简介
rabbitmq是AMQP协议(一个提供统一消息服务的应用层标准高级消息队列协议)的一个商业实现。consumer,producer和rabbitmq server之间通过TCP来建立连接。
Channels
消费者和生产者采用TCP连接和broker通信,但是频繁的建立和关闭TCP连接有很大的系统开销,所以在TCP之上建立channels,消费者和生产者可以并发的重复利用这些Channels来和broker进行通信
Queue
Queue是用于存储消息的对象,生产者生产消息存放在Queue中,消费者从Queue获取消息并消费,这里需要注意的是,多个消费者可以订阅同一个Queue,但是这些消费者只能均摊(由rabbitmq采用轮训的方式来做负载均衡)这个Queue里面的消息,而不是每一个消费者都收到所有消息,比如订单系统在集群中有A,B,C三台服务,Queue中有十条支付系统发来的消息,这十条会被均摊A,B,C这三台服务器上。
问题1: 这样会存在一个问题,当C机器上的订单系统从Queue中获取到一个支付成功的消息后,还未及时修改完订单状态,减少库存,这时候C机器或者订单服务挂了,发生了消息丢失,这样就会发生很严重的bug,比如订单状态不正确,客户一看,卧槽钱都扣了,怎么还是处理中状态,那心态还能不炸?
解决方案: rabbitmq提供了ack机制,当一个消费者消费完了消息后,会发送一个回执给到rabbitmq服务,rabbitmq收到这个回执后,才会将消息删除,注意,这里消费方消费消息并没有超时机制(这是和ActiveMq的一个不同的地方),意思是只要rabbitmq判断这个消费方没有断开连接,那么这个消费方可以无限时长的消费这条消息,直到发送回执给rabbitmq服务,rabbitmq服务才会将这条消息删除,一旦rabbitmq判断这个消费者断开了连接却并没有发送ack,那么会将这条消息派发给其他消费者去消费,这样就解决了以上问题。这里切记,消费者一定要发送ack,否则rabbitmq服务里面的消息会越积越多,最终导致资源耗尽。
看一段spring+rabbitmq关于ack的配置(acknowledge="auto"就是自动发送ack回执):
<!-- queue litener,观察/监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="1">
<rabbit:listener queues="xxxQueue" ref="xxxQueueListener" />
</rabbit:listener-container>
问题1解决了,但是由问题1也会引发另外一个问题,如果消费者是将消息处理完了(比如已经修改订单状态,也已经修改了库存),这时候再挂掉了,那么支付成功的那条消息又会被发送给其他消费者处理,这样就会导致库存被扣除两次,我一般的处理方式是,在收到mq进行业务处理的时候,做一次幂等性判断,比如订单系统收到了支付成功的MQ消息进行处理的时候,先判断一次当前订单的状态是否是处理中状态,如果已经是支付成功状态了,那么说明消息已经被消费过一次,这时候就不需要再进行处理了。
我们来看一段申明Queue的配置:
<rabbit:queue id="xxxQueueId" durable="true" auto-delete="false" exclusive="false" name="xxxQueue">
</rabbit:queue>
其中durable="true"为设置消息可持久化,即rabbitmq重启后,也能从磁盘中加载消息,
auto-delete即Queue中的消息被消费完了之后这个Queue会不会被删除,
exclusive为排他性设置,如果设置为true,则这个Queue只能在申明这个Queue的连接中被访问,其他连接访问这个Queue的时候,会报错,Connection conn = factory.newConnection(); 其中conn就是一个连接。
exchange
上面提到了,生产者生产消息投递到Queue中,其实不是这样子的,生产者生产的消息是投递到某一个exchange里面的,再由这个exchange根据路由策略路由到Queue中,exchange就是一个消息中转站。
说到消息的路由策略,就得提到bindingkey和routingkey,exchange,bindingkey,queue组成一个绑定关系,可以这么形象的解释,我们把消息当成一个快递,生产者就是商家,exchange就是快递公司的中转站,queue就是你家(或者公司),而收件人就是消息的消费者,bindingkey就是快递中转站到你家的一条路线。快递从商家出来,到快递公司的中转站,再由中转站选定一条路线,将快递送到你家里面,这样,你就可以享用这个快递了(快递里面是娃娃??)
影响路由规则的还有一个东西,就是exchange的type,exchange分四种类型fanout、direct、topic、headers,这四种类型各有不同的路由规则
exchange type
- fanout
这种路由规则是最简单的,exchange会将生产者投递过来的消息,路由到所有与它绑定的queue中(所有queue均分这些消息) - direct
direct的路由规则在fanout的基础上添加了routingkey,exchange会将消息路由到那些bindingkey和routingkey完全匹配的queue上面去(生产者生产消息并投递的时候会指明routingkey) - topic
topic的规则是direct的加强版,direct只能匹配完全一样的bindingkey,而topic类型的bindingkey可以模糊匹配routingkey。bindingkey和routingkey是由英文的句号(.)分割开来的单词,bindingkey中存在*和#,分别来匹配一个和多个单词,比如一个bindingkey是: *.pay.notify,那么以下routingkey的消息都将被路由到与这个bindingkey绑定的queue中:
order.pay.notify,stock.pay.notify - headers
这种类型是根据发送消息时的headers来确定路由规则的,和routingkey与bindingkey无关,用的比较少,不做过多介绍
我在实际开发中rabbitmq的使用场景
- 普通的异步消息处理
投保在承保的时候会做支付动作且下单,但是,一定是需要在支付完成之后才能下单到保险公司,如果这些操作都是同步在一个线程里面做的,情况应该是这样的:客户填完一堆材料后app上点击承保,开始转圈圈,后端先调用支付请求,支付到银联,银联再给返回结果,这个请求需要很长的时间,网络不好就更久了,碰上银行清算的话,那就完了,支付完成后再调用保险公司的承保接口,也需要很长时间,所以前端客户需要等待很久的时间才能看见承保的结果,如果http设置的超时时间比较短的话,就直接http超时了(tomcat还有超时时间,也会导致超时)。
所以这种体验是非常不好的,那么现在有了mq后,我们可以将这个过程异步处理:
客户点击承保按钮,服务端开始调用支付接口,调用银联支付后银联立即给到支付结果(http同步给到的结果一般是支付中),前端客户立即看到承保结果为承保支付中,服务端任务调度系统定时去查询支付结果或者等待银联支付的notify,支付系统拿到支付结果后mq异步通知订单系统(因为支付系统只需要知道通知到了,不需要知道订单系统的处理结果),订单系统修改订单状态,前端刷新承保状态,这时候客户看见的应该是承保成功或者失败 - RPC消息(用的非常少,更多的时候RPC用dubbo了)
在实际应用中,有时候我们需要同步处理,我们需要知道消息的消费者的消费结果才能进行下一步操作,在做一款保险产品的时候,我们就用到了rabbitmq的RPC特性。在发送消息的时候会给消息设置特性的属性,比如ContentEncoding,设置消息的编码,要实现RPC需要设置两个属性:replyTo和correlationId,其中replyTo设置的是一个queue的名称,用于告诉服务器处理完成后将通知我的消息发送到这个Queue中,correlationId则是某一次请求的标示号,消息生产者生产消息的时候设置这个属性,消费者消费完消息后再发送消息的时候将这个属性重新设置到消息中,消息生产者根据这个id就能知道这个是那条消息的处理返回。 - 延迟队列
实际生产中很多时候会用到延迟队列,比如公募非交易时间下单后,需要延迟到开市之后才推恒生或者金正,又比如,在一个异步请求请求失败后(比如网络问题),需要每隔一定时间重新请求一次,也可以用到延迟队列。但是rabbitmq本身是没有提供延迟队列的,这时候我们可以通过一个比较巧妙的设置来实现延迟队列。rabbitmq可以对queue设置两个属性,使得queue中的消息在一定时间里面没有被消费掉的话(成为死信),就会被重新push到指定的exchange+routingkey中,消息成为死信的时间使用Expiration属性来设置,成为死信后重新push的exchange通过x-dead-letter-exchange来设置,routingkey通过x-dead-letter-routing-key来设置,至于重试的次数可以通过算法来实现。在做公募的时候,当用户申购完成了,需要选一定的客户进行做回访,回访的数据需要调用另外部门的服务做留痕,很明显,做留痕是异步的,不应该阻塞申购主流程,所以当用户回访的时候,服务端会立即将回访的请求放进rabbitmq中,然后同步返回前端,服务端监听mq消息,将回访消息拿出来(消费者消费消息)做留痕(http调用其他部门的服务将回访的数据发送给他们),这种情况会发生网络延迟,或者对方服务重启之类的事情,我们的做法是,在http请求发生异常的时候,将消息重新放到一个队列中,并设置消息的Expiration超时时间,这个队列我们设置了x-dead-letter-exchange,x-dead-letter-routing-key,这样就实现了这条消息在Expiration时间之后会重新push到x-dead-letter-exchange,x-dead-letter-routing-key对应的queue中被重新消费,这样就实现了消息的重试和延迟功能(很棒!!!)。
下面放一份完整的spring + rabbitmq的配置和监听重试机制代码
配置如下:
<rabbit:connection-factory id="connectionFactory" host="${rabbitmq.host}" username="${rabbitmq.username}" password="${decryption.rabbitmq.password}" port="${rabbitmq.port}" virtual-host="${rabbitmq.virtual-host}"/>
<rabbit:admin connection-factory="connectionFactory" />
<!-- 延迟队列 -->
<rabbit:queue id="returnVisitErrorQueue" durable="true" auto-delete="false" exclusive="false" name="${return.visit.error.queue}" >
<rabbit:queue-arguments>
<entry key="x-dead-letter-exchange" value="${return.visit.exchange}"></entry>
<entry key="x-dead-letter-routing-key" value="${return.visit.routingkey}"></entry>
</rabbit:queue-arguments>
</rabbit:queue>
<!--消费者queue-->
<rabbit:queue id="returnVisitQuquq" durable="true" auto-delete="false"
exclusive="false" name="${return.visit.queue}" >
</rabbit:queue>
<!-- exchange和queue的binging -->
<rabbit:direct-exchange id="returnVisitExchange" durable="true"
auto-delete="false" name="${return.visit.exchange}">
<rabbit:bindings>
<rabbit:binding queue="${return.visit.queue}" key="${return.visit.routingkey}" />
<rabbit:binding queue="${return.visit.error.queue}" key="${return.visit.error.routingkey}" />
</rabbit:bindings>
</rabbit:direct-exchange>
<bean id="returnVisitConsumerQueueListener" class="com.noah.fund.service.publicfund.consumer.ReturnVisitQueueListener" />
<!-- queue litener,观察/监听模式,当有消息到达时会通知监听在对应的队列上的监听对象 -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto" concurrency="1">
<rabbit:listener queues="${return.visit.queue}" ref="returnVisitConsumerQueueListener" />
</rabbit:listener-container>
<!-- 消息转换器,可以在生产消息的时候设置一些消息的属性 -->
<bean id="jsonMessageConverter" class="com.noah.fund.service.publicfund.consumer.FastJsonMessageConverter"></bean>
<!-- 消息发送模板 -->
<rabbit:template exchange="${return.visit.exchange}" id="sendAmqpTemplate" connection-factory="connectionFactory" message-converter="jsonMessageConverter" />
监听代码如下:
/**
* queue监听类,队列收到信息,会触发onMessage方法
* @author wuzhen
*
*/
public class ReturnVisitQueueListener implements MessageListener {
private static final String RETRY_FAILED_COUNT = "retryFailedCount";
Logger logger = LoggerFactory.getLogger(ReturnVisitQueueListener.class);
@Autowired
private AmqpTemplate sendAmqpTemplate;
@Autowired
private KoConfigMapper koInsuranceConfigMapper;
@Autowired
private ReturnVisitMqHandler returnVisitMqHandler;
@Autowired
private MsgSenderImpl msgSenderImpl;
public void onMessage(Message message) {
logger.info("监听到一条消息,消息内容为:{}",message.toString());
try{
execute(message);
}catch(Exception ex){
logger.error("异常信息为:{}", ex);
logger.info("当前处理线程为:{}",Thread.currentThread().getName());
String configValue = getConfigByKey("return_visit_mq_retry_interval").getConfigValue();
logger.info("重试间隔时长为:{}",configValue);
message.getMessageProperties().setExpiration(configValue);
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
Map<String,Object> headerMap = message.getMessageProperties().getHeaders();
if(isNeedToRetry(headerMap)){//消息重试策略
logger.info("将消息推入延迟队列,开始重试消息");
sendAmqpTemplate.send(DecryptPropertyPlaceholderConfigurer.getPropertyByName("return.visit.error.routingkey"), message);
}else{
// 重试n次后仍然发生异常,短信提醒
Map<String, String> m = new HashMap<>();
m.put("message", JSON.toJSONString(message));
msgSenderImpl.sendMsgByTemplate(MessageTemplates.RETURN_VISIT_TRANSFINITE_ERROR.getTemplateAlias(), m, "15202161542,13564232413");
}
}
}
protected void execute(Message message) throws Exception{
message.getMessageProperties().getMessageId();
String jsonMessage = new String(message.getBody(),"UTF-8");
logger.info("消息转换为json格式:{}",jsonMessage);
returnVisitMqHandler.handler(message);
logger.info("处理器处理消息完成");
}
/**
* 获取数据库中配置的重试次数
* @param key
*/
public KoConfig getConfigByKey(String key){
KoConfigExample koInsuranceConfigExample = new KoConfigExample();
koInsuranceConfigExample.createCriteria().andConfigNameEqualTo(key);
List<KoConfig> list = koInsuranceConfigMapper.selectByExample(koInsuranceConfigExample);
if(!list.isEmpty()){
return list.get(0);
}
return null;
}
/**
* queue里的消息处理失败了,需要重试。这个方法就是判断是否需要重试
* @param headerMap
* @return
*/
private boolean isNeedToRetry(Map<String,Object> headerMap){
boolean needToSend = true;
if(null == headerMap){
logger.info("headerMap为空");
headerMap = new HashMap<>();
headerMap.put(RETRY_FAILED_COUNT, 1);
logger.info("当前重试次数为:{}",headerMap.get(RETRY_FAILED_COUNT));
}else if(!headerMap.containsKey(RETRY_FAILED_COUNT)){
logger.info("未匹配到retryFailedCount");
headerMap.put(RETRY_FAILED_COUNT, 1);
logger.info("当前重试次数为:{}",headerMap.get(RETRY_FAILED_COUNT));
}else{
int retryFailedCount = (int) headerMap.get(RETRY_FAILED_COUNT);
String configValue = getConfigByKey("return_visit_mq_retry_count").getConfigValue();
logger.info("mq重试总次数为:{}",configValue);
if(retryFailedCount < Integer.parseInt(configValue)){
headerMap.put(RETRY_FAILED_COUNT, ++retryFailedCount);
logger.info("当前重试次数为:{}",retryFailedCount);
}else{
needToSend = false;
}
}
return needToSend;
}
}
rabbitmq和rocketmq的区别
- rabbitmq不保证消息的顺序性,rocketmq可以有顺序
-- rabbitmq顺序性解决方案:保证 生产者 - broker - queue - 消费者 是一对一对一对一的关系,但是这样性能瓶颈就有了
-- rocketmq的解决方案:使用订单号来做取模操作,这样保证同一个订单号的消息,会被发送到同一个队列中 - rabbitmq和rocketmq都不保证消息不重复,由消息消费者自己去做幂等判断或者记录消息的id来做去重判断
- rabbitmq和rocketmq均会持久化消息到磁盘
- rabbitmq的消费者拿到消息处理成功后会发送回执(ack)给到broker,而rocketmq仅仅只是知道消息是否发送成功
- rabbitmq不支持事务,而rocketmq支持事务(可以用来解决分布式事务)
- rabbitmq本身不支持消息重试(可以使用死信队列来重新消费队列),rocketmq支持消息重试
web应用中的各种通讯方式
在web应用中我们通常能用到的各个系统之间的通讯方式有以下几种方式:
- http(https)
- socket
- dubbo
- mq
那么这几种通讯方式有什么不同呢?
http是请求-响应式的通讯,底层是TCP协议,<font color=" red">适用于客户端和服务端通讯频次较低的场景。</font>而socket一般底层协议也是TCP,建立socket连接需要客户端IP+服务port和服务端的IP+服务port,<font color="cadetBlue">客户端和服务端一旦建立连接,会一直处于连接状态,除非某一端断开连接,通过轮训的方式确认服务是否断开,服务端一旦有数据变化,可以主动推送给客户端,这个是http不具有的特性</font>,而且http每一次发送都包含大量的头信息,频繁建立http请求非常占用带宽,所以像聊天室一般都是用socket来实现的。我们再来说dubbo,一个分布式的java RPC框架,<font color="darkOliveGreen">他使得个java系统之间远程调用可以像调用本地方法一样,同时还实现了负载均衡,所以在多个java系统之间,可以采用这种方式来远程调用</font>,非常棒!!而mq则是系统时间的异步消息通讯,前面3中通讯方式均是同步通讯(其中dubbo可以实现异步通信,个人生产中没有用过),<font color="IndianRed">何谓异步,就是我消息发送出去就可以了,不需要等待消息消费方的处理结果,同时我也不需要知道是谁来消费了我的消息,使得各系统之间可以松耦合的进行通讯,且mq也实现了负载均衡,如果使用http和socket是达不到负载均衡的效果的,需要借助其他的软件(nginx)或者硬件(F5)来做负载,同时socket使得各应用系统之间严重耦合</font>。
网友评论