1、RabbitMQ实时监听消费消息队列
2、使用缓存减轻数据库压力
package com.chinaso.modules.rank.mq;
import com.alibaba.fastjson.JSONObject;
import com.chinaso.common.ajax.AjaxResponse;
import com.chinaso.modules.rank.bean.FinacialMediaBean;
import com.chinaso.modules.rank.dao.FinacialMediaDao;
import com.chinaso.modules.rank.service.FinacialMediaService;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeoutException;
/**
* @author boying.cheng on 2020/6/12
*/
//类实现ApplicationRunner的run方法,容器启动的时候设置缓存数据
@Component
public class RabbitMQConsumer implements ApplicationRunner {
@Resource(name="recieveRabbitTemplate")
private RabbitTemplate rabbitTemplate;
//通过注解把配置文件队列名注入
@Value("${mq.socialmedia.routingKey.queues}")
private String queues;
@Autowired
private FinacialMediaDao finacialMediaDao;
@Autowired
private FinacialMediaService finacialMediaService;
private final Logger logger = LoggerFactory.getLogger("RabbitMQConsumerLog");
//设置缓存 存储融媒体数据 当需要更新的数据在缓存中有才更新mysql 避免频繁的数据库IO操作
public static Cache<Object, Object> finacialMediaBeansCache = CacheBuilder.newBuilder()
.maximumSize(10000)
.build();
//
@RabbitHandler
//指定某方法作为消息消费的方法,监听某 Queue 里面的消息,当有收到消息的时候,就交给 @RabbitHandler 的方法处理
@RabbitListener(queues ={"${mq.socialmedia.routingKey.queues}"},containerFactory="websearchFactory")
public void recieved() throws IOException, TimeoutException, InterruptedException {
//接收mq队列的数据方法
Object convert = rabbitTemplate.receiveAndConvert(queues);
if (convert == null){
return;
}
//parseObject解析mq队列的数据
JSONObject tao_social_media = JSONObject.parseObject(convert.toString().trim());
String userid = (String)tao_social_media.get("userid");
String publish_time = (String)tao_social_media.get("publish_time");
String source = (String) tao_social_media.get("source");
logger.info("=======消费mq消息成功:userid:{},publish_time:{},source:{}=======",userid,publish_time,source);
//数据库业务逻辑更新方法
finacialMediaService.updateDetailUpdateTime(source, userid, publish_time);
}
//
@Override
public void run(ApplicationArguments applicationArguments) throws Exception {
List<String> uuids = new ArrayList<>();
List<FinacialMediaBean> finacialMediaBeans = new ArrayList<FinacialMediaBean>();
try {
finacialMediaBeans = finacialMediaDao.getFinacialMedias();
if((finacialMediaBeans != null && finacialMediaBeans.size() > 0)){
for(FinacialMediaBean bean : finacialMediaBeans){
bean.set_total(null);
if (bean.getWechatUuid() != null){
uuids.add(bean.getWechatUuid());
}if (bean.getWeiboUuid() != null){
uuids.add(bean.getWeiboUuid());
}
}
}
} catch (Exception e) {
e.printStackTrace();
}
//缓存设置融媒体上线成功的的数据
finacialMediaBeansCache.put("finacialMediaBeans",uuids);
}
}
@Value("${spring.rabbitmq.websearch.concurrentConsumers}") int concurrentConsumers;
@Value("${spring.rabbitmq.websearch.maxConcurrentConsumers}") int maxConcurrentConsumers;
@Value("${spring.rabbitmq.websearch.prefetchCount}") int prefetchCount;
//配置工厂方法
@Bean(name="recieveConnectionFactory")
public ConnectionFactory recieveConnectionFactory(
@Value("${spring.rabbitmq.websearch.addresses}") String addresses,
@Value("${spring.rabbitmq.websearch.username}") String username,
@Value("${spring.rabbitmq.websearch.password}") String password,
@Value("${spring.rabbitmq.websearch.virtual-host}") String virtualHost
){
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(username);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(virtualHost);
connectionFactory.setConnectionTimeout(150000);
return connectionFactory;
}
@Bean(name="websearchFactory")
public SimpleRabbitListenerContainerFactory websearchFactory(
SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("recieveConnectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
factory.setConcurrentConsumers(concurrentConsumers);
factory.setMaxConcurrentConsumers(maxConcurrentConsumers);
factory.setPrefetchCount(prefetchCount);
factory.setDefaultRequeueRejected(true);
return factory;
}
@Bean(name="recieveRabbitTemplate")
public RabbitTemplate sendRabbitTemplate(
@Qualifier("recieveConnectionFactory") ConnectionFactory connectionFactory
){
RabbitTemplate websearchRabbitTemplate = new RabbitTemplate(connectionFactory);
return websearchRabbitTemplate;
}
pom.xml文件引入依赖
//缓存
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>23.0</version>
</dependency>
</dependencies>
<!-- RabbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
业务代码
public AjaxResponse updateDetailUpdateTime(String source, String userId, String detailUpdateTime) {
AjaxResponse ajaxResponse = new AjaxResponse(AjaxResponse.AJAX_CODE_FAIL);
//消费的mq数据 更新mysql逻辑
List<String> uuidList = (List<String>) finacialMediaBeansCache.getIfPresent("finacialMediaBeans");
if (uuidList != null) {
//如果缓存中不存在该条数据则直接返回
if(!uuidList.contains(userId)){
logger.info("====此数据不在排行榜!userid:{},publish_time:{},source:{}=====",userId,detailUpdateTime,source);
return new AjaxResponse(AjaxResponse.AJAX_CODE_SUCCESS);
}
}
try {
finacialMediaDao.updateDetailUpdateTime(source,userId,detailUpdateTime);
Query query = Query.query(Criteria.where("weiboUuid").is(userId).andOperator(Criteria.where("detailUpdateTime").lt(detailUpdateTime)));
if (Objects.equals(source, "weixin")){
query = Query.query(Criteria.where("wechatUuid").is(userId).andOperator(Criteria.where("detailUpdateTime").lt(detailUpdateTime)));
}
Update update = new Update();
update.set("detailUpdateTime", detailUpdateTime);
WriteResult user = mongoTemplate.updateFirst(query, update, FinacialMediaBean.class,mediaCollection);
ajaxResponse = new AjaxResponse(AjaxResponse.AJAX_CODE_SUCCESS);
} catch (Exception e) {
loggerError.error("====mysql、mongo数据库更新异常!userid:{},publish_time:{},source:{}=====",userId,detailUpdateTime,source);
e.printStackTrace();
}
logger.info("====mysql、mongo数据库更新完成!userid:{},publish_time:{},source:{}=====",userId,detailUpdateTime,source);
return ajaxResponse;
}
网友评论