美文网首页
阿里云之消息服务实战

阿里云之消息服务实战

作者: 明逸读书 | 来源:发表于2018-12-17 15:07 被阅读0次

    概述

    阿里云消息服务(Message Service,原 MQS)是阿里云商用的消息中间件服务,基于阿里的飞天系统,具有大规模、高可靠、高可用以及较强的消息堆积能力。由于项目架构中,对数据的及时性没有过高的要求,故采用消息服务来解耦接口的数据同步功能。

    场景描述

    在项目中有个前端管理界面称为manager模块,主要功能是配置管理一些配置数据,支持新增、修改、删除以及批量新增、批量修改、批量删除功能,甚至批量从excel文件中导入的功能。对数据操作的同时,需要将数据同步到各个agent的模块中,agent模块需要对这些数据进行入库、入redis的操作,系统架构图如下。


    初版架构图.png

    在不使用消息服务前,通过接口的方式,在manager模块增删改的时候,将修改的数据实时同步到各个agent中。这会出现一个问题,如果数据量较小的话,同步速度还是可以理解,如果同步量过大的话,会出现很明显的卡顿情况。可能很多朋友会问,为何不做异步尼? 异步的确也是一种解决方法,不过当某一条数据同步失败的时候,我们不好获取这些异常,并且无法追踪这些异常并进行回滚操作,因为需要保持manager模块与agent模块同步回滚,只要agent模块失败,manager模块必须要回滚。由于对这些数据的实时性没有太高的要求,为此引入消息队列。

    架构设计

    阿里云消息服务MNS 已经提供队列(queue)和主题(topic)两种模型。其中队列提供的是一对多的共享消息消费模型,采用客户端主动拉取(Pull)模式;主题模型提供一对多的广播消息消费模型,并且采用服务端主动推送(Push)模式。上面两种模型基本能满足我们大多数应用场景。
    推送模式的好处是即时性能比较好,但是需要暴露客户端地址来接收服务端的消息推送。有些情况下,比如企业内网,我们无法暴露推送地址,希望改用拉取(Pull)的方式。虽然MNS不直接提供这种消费模型,但是我们可以结合主题和队列来实现一对多的拉取消息消费模型。

    阿里的官方文档中,其广播拉取消息模型最佳实践正好符合项目的架构模型,故采用其模型。下图是广播拉取消息模型。


    广播拉取消息模型.png

    根据该模型的改造,改造了符合当前项目场景的业务模型。


    架构图.png

    接口说明

    Java SDK(1.1.5)中的CloudPullTopic 默认支持上述解决方案。其中MNSClient 提供下面两个接口来快速创建CloudPullTopic:

    public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList, boolean needCreateQueue, QueueMeta queueMetaTemplate)
    public CloudPullTopic createPullTopic(TopicMeta topicMeta, Vector<String> queueNameList)
    

    其中,TopicMeta 是创建topic的meta 设置, queueNameList里指定topic消息推送的队列名列表;needCreateQueue表明queueNameList是否需要创建;queueMetaTemplate是创建queue需要的queue meta 参数设置;

    生产者manager代码

    作为数据生产者的manager端,bean配置文件中配置如下代码的实例bean,目的为不断的插入数据到队列为消费者各个agent提供数据源。

        @Value("${aliyun.mns.accessKeyId}")
        private String accessKeyId;
    
        @Value("${aliyun.mns.accessKeySecret}")
        private String accessKeySecret;
    
        @Value("${aliyun.mns.accountEndpoint}")
        private String accountEndpoint;
    
        @Value("${aliyun.mns.queueNameProducers}")
        private String queueNameProducers;
    
        @Value("${aliyun.mns.topicNameProducer}")
        private String topicName;
        @Bean
        public MNSClient mnsClient() {
            CloudAccount account = new CloudAccount(accessKeyId, accessKeySecret, accountEndpoint);
            MNSClient client = account.getMNSClient();
            return client;
        }
    
        @Bean
        public CloudPullTopic cloudPullTopic(@Qualifier("mnsClient")MNSClient mnsClient){
    
            String[] queueArr = queueNameProducers.split(",");
            List list = Arrays.asList(queueArr);
    
            Vector<String> queueNameList = new Vector<>();
            queueNameList.addAll(list);
    
            for (String queueName : queueNameList) {
                if (!mnsClient.getQueueRef(queueName).isQueueExist()) {
                    QueueMeta qMeta = new QueueMeta();
                    qMeta.setQueueName(queueName);
                    qMeta.setPollingWaitSeconds(WAIT_SECONDS);
                    mnsClient.createQueue(qMeta);
                }
            }
    
            TopicMeta topicMeta = new TopicMeta();
            topicMeta.setTopicName(topicName);
    
            return mnsClient.createPullTopic(topicMeta,queueNameList,false,null);
        }
    

    如上代码中,对当前队列做了一个为空判断,如果第一个队列不存在,采用新增队列的方式:createPullTopic(topicMeta,queueNameList,true,queueMetaTemplate);
    其中第三个参数true代表当前是重新创建队列的方式。
    同步到队列的统一数据格式:

    /**
     * 结果类
     *
     * @author huwk
     * @date 2018/12/5
     */
    public class SynModel {
    
        /**
         * 操作方法
         */
        private String action;
    
        /**
         * 返回对象
         */
        private Object result;
    
        public SynModel(){}
    
        public SynModel(String action, Object result) {
            this.action = action;
            this.result = result;
        }
    
        public String getAction() {
            return action;
        }
    
        public void setAction(String action) {
            this.action = action;
        }
    
        public Object getResult() {
            return result;
        }
    
        public void setResult(Object result) {
            this.result = result;
        }
    }
    

    因为业务需要,考虑到队列中会推入不同的格式的数据,故统一定义一个实体类,统一下数据结构,action表示具体操作的类型,result表示放入的实体对象。
    如下是发送消息代码:

    /* 同步队列 */
            try {
                /*封装对象*/
                EnterpriseWhite white = new EnterpriseWhite();
                white.setStatus(status);
                white.setEnterpriseId(enterpriseId);
                white.setComment(comment);
                white.setCreateTime(new Date());
    
                SynModel synModel = new SynModel(QueueAction.CreateEnterpriseWhite.name(),white);
                /*发送消息*/
                publishObjectMessage(synModel);
    
            } catch (Exception e) {
                logger.error("数据接口异常", e);
            }
    

    使用了一个QueueAction枚举类型,定义了各种操作某种实体的类型,这样下次消费者去取数据的时候,根据其类型可以给指定的消费者消费。
    抽取的推送数据给队列的公共方法

    /**
         * 抽取解析对象以及发送消息方法
         *
         * @param synModel
         * @throws JsonProcessingException
         */
        private void publishObjectMessage(SynModel synModel) throws JsonProcessingException {
            /*转换为json字符串*/
            String objStr = objectMapper.writeValueAsString(synModel);
    
            /*主题消息封装*/
            TopicMessage topicMessage = new Base64TopicMessage();
            topicMessage.setBaseMessageBody(objStr);
    
            /*发送主题消息*/
            cloudPullTopic.publishMessage(topicMessage);
        }
    

    消费者agent代码

    定义一个线程类,让其在项目启动的时候就开启线程。agent模块根据FIFO,根据不同的SynModel类型去从消息队列中取消息并消费掉,完成一个完整的消息队列的过程。

    @Component
    public class ManagerSynDataEngine extends Thread {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
    
        @Autowired
        private ObjectMapper objectMapper;
    
        @Autowired
        private MNSClient mnsClient;
    
        @Autowired
        @Qualifier("managerSynDataTreadPoolTaskExecutor")
        private ThreadPoolTaskExecutor threadPoolTaskExecutor;
    
        @Override
        public void run() {
            try {
                MessageReceiver receiver = new MessageReceiver(mnsClient, queueName);
                while (true) {
                    Message message = receiver.receiveMessage();
    
                    String messageBody = message.getMessageBody();
    
                    logger.info("队列中的消息: " + messageBody);
                    // 当线程池内的线程全部繁忙时,暂停向线程池派发任务,等待线程释放
                    while (threadPoolTaskExecutor.getActiveCount() >= threadPoolTaskExecutor.getMaxPoolSize()) {
                        logger.warn("ManagerSynDataEngine thread busy!");
                        Thread.sleep(1000);
                    }
    
                    threadPoolTaskExecutor.execute(() -> {
                        try {
                            SynModel synModel = objectMapper.readValue(messageBody, SynModel.class);
                            Class clazz = QueueAction.valueOf(synModel.getAction()).getClazz();
                            Object object = objectMapper.convertValue(synModel.getResult(), clazz);
                            operate(synModel, object);
    
                        } catch (Exception e) {
                            logger.error("写入数据库出错。" + "队列名:;" + "消息详情:" + messageBody, e);
                        } finally {
                            /*删除队列元素*/
                            mnsClient.getQueueRef(queueName).deleteMessage(message.getReceiptHandle());
                        }
                    });
                }
            } catch (Exception e) {
                logger.error("解析对象错误", e);
            }
        }
    /**
    * 业务代码
    **/
    private void operate(SynModel synModel, Object object) throws Exception {
            /*业务代码*/
            if (QueueAction.CreateEnterpriseWhite.name().equals(synModel.getAction())) {
                enterpriseWhiteService.insert((EnterpriseWhite) object);
            } else if (QueueAction.UpdateEnterpriseWhite.name().equals(synModel.getAction())) {
                enterpriseWhiteService.updateById((EnterpriseWhite) object);
            } else if (QueueAction.DeleteEnterpriseWhite.name().equals(synModel.getAction())) {
                enterpriseWhiteService.deleteByEnterpriseId(((EnterpriseWhite) object).getEnterpriseId());
            }
        }
    }
    

    附录

    长轮询模式

    MNS提供了LongPolling类型的ReceiveMessage的方法,只需要在ReceiveMessage的时候把WaitSecond设为一个1-30之间的数就可以了。使用LongPolling可以让Request一直挂在Server上,等到有Message的时候才返回,在保证了第一时间收到消息的同时也避免用户发送大量无效Request。LongPolling也是MNS的推荐用法。
    LongPolling是需要挂HTTP层的长连接在Server上,而对于Server来说,HTTP层的长连接的资源是比较有限的。为了避免受到一些恶意攻击,所以MNS对单用户的LongPolling连接数是有限制的。

    这里使用了阿里的推荐的最佳实践长轮询模式,防止当队列中没有数据时候,大量线程去请求的问题,减少不必要的调用量,节省成本。

    /**
     * 队列消息接收
     *
     * @author huwk
     * @date 2018/12/4
     */
    public class MessageReceiver {
    
        private Logger logger = LoggerFactory.getLogger(getClass());
        public static final int WAIT_SECONDS = 30;
    
        protected static final Map<String, Object> sLockObjMap = new HashMap<String, Object>();
        protected static Map<String, Boolean> sPollingMap = new ConcurrentHashMap<String, Boolean>();
    
        protected Object lockObj;
        protected String queueName;
        protected CloudQueue cloudQueue;
    
        public MessageReceiver(MNSClient mnsClient, String queue) {
            cloudQueue = mnsClient.getQueueRef(queue);
            cloudQueue.popMessage();
            queueName = queue;
    
            synchronized (sLockObjMap) {
                lockObj = sLockObjMap.get(queueName);
                if (lockObj == null) {
                    lockObj = new Object();
                    sLockObjMap.put(queueName, lockObj);
                }
            }
        }
    
        public boolean setPolling() {
            synchronized (lockObj) {
                Boolean ret = sPollingMap.get(queueName);
                if (ret == null || !ret) {
                    sPollingMap.put(queueName, true);
                    return true;
                }
                return false;
            }
        }
    
        public void clearPolling() {
            synchronized (lockObj) {
                sPollingMap.put(queueName, false);
                lockObj.notifyAll();
                logger.info("唤醒所有线程开始工作!");
            }
        }
    
        public Message receiveMessage() {
            boolean polling = false;
            while (true) {
                synchronized (lockObj) {
                    Boolean p = sPollingMap.get(queueName);
                    if (p != null && p) {
                        try {
                            logger.info(" 线程睡眠!");
                            polling = false;
                            lockObj.wait();
                        } catch (InterruptedException e) {
                            logger.error("MessageReceiver 中断! 队列名为 " + queueName);
                            return null;
                        }
                    }
                }
    
                try {
                    Message message = null;
                    if (!polling) {
                        message = cloudQueue.popMessage();
                        if (message == null) {
                            polling = true;
                            continue;
                        }
                    } else {
                        if (setPolling()) {
                            logger.info("线程" + " Polling!");
                        } else {
                            continue;
                        }
                        do {
                            logger.info("线程" + " 保持 Polling!");
                            try {
                                message = cloudQueue.popMessage(WAIT_SECONDS);
                            } catch (Exception e) {
                                logger.error("线程异常 polling popMessage: " + e);
                            }
                        } while (message == null);
                        clearPolling();
                    }
                    return message;
                } catch (Exception e) {
                    // it could be network exception
                    logger.error("popMessage时发生异常: " + e);
                }
            }
        }
    }
    

    当队列为空的时候,只会保持一个线程去请求,并且保持长连接为30秒,如果30秒内有数据后,唤醒所有的线程去消费,一旦发现队列为空,继续让所有线程等待,只保留一个线程。

    至此,一个简单的消息服务实战结束,后续还会有更多的高级应用,一步步等待去开辟。

    不会写文章的程序员不是好的吉他手o( ̄︶ ̄)o

    相关文章

      网友评论

          本文标题:阿里云之消息服务实战

          本文链接:https://www.haomeiwen.com/subject/ggdqkqtx.html