概述
阿里云消息服务(Message Service,原 MQS)是阿里云商用的消息中间件服务,基于阿里的飞天系统,具有大规模、高可靠、高可用以及较强的消息堆积能力。由于项目架构中,对数据的及时性没有过高的要求,故采用消息服务来解耦接口的数据同步功能。
场景描述
在项目中有个前端管理界面称为manager模块,主要功能是配置管理一些配置数据,支持新增、修改、删除以及批量新增、批量修改、批量删除功能,甚至批量从excel文件中导入的功能。对数据操作的同时,需要将数据同步到各个agent的模块中,agent模块需要对这些数据进行入库、入redis的操作,系统架构图如下。
初版架构图.png
在不使用消息服务前,通过接口的方式,在manager模块增删改的时候,将修改的数据实时同步到各个agent中。这会出现一个问题,如果数据量较小的话,同步速度还是可以理解,如果同步量过大的话,会出现很明显的卡顿情况。可能很多朋友会问,为何不做异步尼? 异步的确也是一种解决方法,不过当某一条数据同步失败的时候,我们不好获取这些异常,并且无法追踪这些异常并进行回滚操作,因为需要保持manager模块与agent模块同步回滚,只要agent模块失败,manager模块必须要回滚。由于对这些数据的实时性没有太高的要求,为此引入消息队列。
架构设计
阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull)模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push)模式。上面两种模型基本能满足我们大多数应用场景。
推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。
阿里的官方文档中,其广播拉取消息模型最佳实践正好符合项目的架构模型,故采用其模型。下图是广播拉取消息模型。
广播拉取消息模型.png
根据该模型的改造,改造了符合当前项目场景的业务模型。
架构图.png
接口说明
Java SDK(1.1.5)中的CloudPullTopic 默认支持上述解决方案。其中MNSClient 提供下面两个接口来快速创建CloudPullTopic:
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
其中,TopicMeta 是创建topic的meta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;
生产者manager代码
作为数据生产者的manager端,bean配置文件中配置如下代码的实例bean,目的为不断的插入数据到队列为消费者各个agent提供数据源。
@Value("${aliyun.mns.accessKeyId}")
private String accessKeyId;
@Value("${aliyun.mns.accessKeySecret}")
private String accessKeySecret;
@Value("${aliyun.mns.accountEndpoint}")
private String accountEndpoint;
@Value("${aliyun.mns.queueNameProducers}")
private String queueNameProducers;
@Value("${aliyun.mns.topicNameProducer}")
private String topicName;
@Bean
public MNSClient mnsClient() {
CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
MNSClient client = account.getMNSClient();
return client;
}
@Bean
public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){
String[] queueArr = queueNameProducers.split(",");
List list = Arrays.asList(queueArr);
Vector<String> queueNameList = new Vector<>();
queueNameList.addAll(list);
for (String queueName : queueNameList) {
if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
QueueMeta qMeta = new QueueMeta();
qMeta.setQueueName(queueName);
qMeta.setPollingWaitSeconds(WAIT_SECONDS);
mnsClient.createQueue(qMeta);
}
}
TopicMeta topicMeta = new TopicMeta();
topicMeta.setTopicName(topicName);
return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
}
如上代码中,对当前队列做了一个为空判断,如果第一个队列不存在,采用新增队列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
其中第三个参数true代表当前是重新创建队列的方式。
同步到队列的统一数据格式:
/**
* 结果类
*
* @author huwk
* @date 2018/12/5
*/
public class SynModel {
/**
* 操作方法
*/
private String action;
/**
* 返回对象
*/
private Object result;
public SynModel(){}
public SynModel(String action, Object result) {
this.action = action;
this.result = result;
}
public String getAction() {
return action;
}
public void setAction(String action) {
this.action = action;
}
public Object getResult() {
return result;
}
public void setResult(Object result) {
this.result = result;
}
}
因为业务需要,考虑到队列中会推入不同的格式的数据,故统一定义一个实体类,统一下数据结构,action表示具体操作的类型,result表示放入的实体对象。
如下是发送消息代码:
/* 同步队列 */
try {
/*封装对象*/
EnterpriseWhite white = new EnterpriseWhite();
white.setStatus(status);
white.setEnterpriseId(enterpriseId);
white.setComment(comment);
white.setCreateTime(new Date());
SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
/*发送消息*/
publishObjectMessage(synModel);
} catch (Exception e) {
logger.error("数据接口异常", e);
}
使用了一个QueueAction枚举类型,定义了各种操作某种实体的类型,这样下次消费者去取数据的时候,根据其类型可以给指定的消费者消费。
抽取的推送数据给队列的公共方法
/**
* 抽取解析对象以及发送消息方法
*
* @param synModel
* @throws JsonProcessingException
*/
private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
/*转换为json字符串*/
String objStr = objectMapper.writeValueAsString(synModel);
/*主题消息封装*/
TopicMessage topicMessage = new Base64TopicMessage();
topicMessage.setBaseMessageBody(objStr);
/*发送主题消息*/
cloudPullTopic.publishMessage(topicMessage);
}
消费者agent代码
定义一个线程类,让其在项目启动的时候就开启线程。agent模块根据FIFO,根据不同的SynModel类型去从消息队列中取消息并消费掉,完成一个完整的消息队列的过程。
@Component
public class ManagerSynDataEngine extends Thread {
private Logger logger = LoggerFactory.getLogger(getClass());
@Autowired
private ObjectMapper objectMapper;
@Autowired
private MNSClient mnsClient;
@Autowired
@Qualifier("managerSynDataTreadPoolTaskExecutor")
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Override
public void run() {
try {
MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
while (true) {
Message message = receiver.receiveMessage();
String messageBody = message.getMessageBody();
logger.info("队列中的消息: " + messageBody);
// 当线程池内的线程全部繁忙时,暂停向线程池派发任务,等待线程释放
while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
logger.warn("ManagerSynDataEngine thread busy!");
Thread.sleep(1000);
}
threadPoolTaskExecutor.execute(() -> {
try {
SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
Object object = objectMapper.convertValue(synModel.getResult(), clazz);
operate(synModel, object);
} catch (Exception e) {
logger.error("写入数据库出错。" + "队列名:;" + "消息详情:" + messageBody, e);
} finally {
/*删除队列元素*/
mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
}
});
}
} catch (Exception e) {
logger.error("解析对象错误", e);
}
}
/**
* 业务代码
**/
private void operate(SynModel synModel, Object object) throws Exception {
/*业务代码*/
if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.insert((EnterpriseWhite) object);
} else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.updateById((EnterpriseWhite) object);
} else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
}
}
}
附录
长轮询模式
MNS提供了LongPolling类型的ReceiveMessage的方法,只需要在ReceiveMessage的时候把WaitSecond设为一个1-30之间的数就可以了。使用LongPolling可以让Request一直挂在Server上,等到有Message的时候才返回,在保证了第一时间收到消息的同时也避免用户发送大量无效Request。LongPolling也是MNS的推荐用法。
LongPolling是需要挂HTTP层的长连接在Server上,而对于Server来说,HTTP层的长连接的资源是比较有限的。为了避免受到一些恶意攻击,所以MNS对单用户的LongPolling连接数是有限制的。
这里使用了阿里的推荐的最佳实践长轮询模式,防止当队列中没有数据时候,大量线程去请求的问题,减少不必要的调用量,节省成本。
/**
* 队列消息接收
*
* @author huwk
* @date 2018/12/4
*/
public class MessageReceiver {
private Logger logger = LoggerFactory.getLogger(getClass());
public static final int WAIT_SECONDS = 30;
protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
protected Object lockObj;
protected String queueName;
protected CloudQueue cloudQueue;
public MessageReceiver(MNSClient mnsClient, String queue) {
cloudQueue = mnsClient.getQueueRef(queue);
cloudQueue.popMessage();
queueName = queue;
synchronized (sLockObjMap) {
lockObj = sLockObjMap.get(queueName);
if (lockObj == null) {
lockObj = new Object();
sLockObjMap.put(queueName, lockObj);
}
}
}
public boolean setPolling() {
synchronized (lockObj) {
Boolean ret = sPollingMap.get(queueName);
if (ret == null || !ret) {
sPollingMap.put(queueName, true);
return true;
}
return false;
}
}
public void clearPolling() {
synchronized (lockObj) {
sPollingMap.put(queueName, false);
lockObj.notifyAll();
logger.info("唤醒所有线程开始工作!");
}
}
public Message receiveMessage() {
boolean polling = false;
while (true) {
synchronized (lockObj) {
Boolean p = sPollingMap.get(queueName);
if (p != null && p) {
try {
logger.info(" 线程睡眠!");
polling = false;
lockObj.wait();
} catch (InterruptedException e) {
logger.error("MessageReceiver 中断! 队列名为 " + queueName);
return null;
}
}
}
try {
Message message = null;
if (!polling) {
message = cloudQueue.popMessage();
if (message == null) {
polling = true;
continue;
}
} else {
if (setPolling()) {
logger.info("线程" + " Polling!");
} else {
continue;
}
do {
logger.info("线程" + " 保持 Polling!");
try {
message = cloudQueue.popMessage(WAIT_SECONDS);
} catch (Exception e) {
logger.error("线程异常 polling popMessage: " + e);
}
} while (message == null);
clearPolling();
}
return message;
} catch (Exception e) {
// it could be network exception
logger.error("popMessage时发生异常: " + e);
}
}
}
}
当队列为空的时候,只会保持一个线程去请求,并且保持长连接为30秒,如果30秒内有数据后,唤醒所有的线程去消费,一旦发现队列为空,继续让所有线程等待,只保留一个线程。
至此,一个简单的消息服务实战结束,后续还会有更多的高级应用,一步步等待去开辟。
不会写文章的程序员不是好的吉他手o( ̄︶ ̄)o
网友评论