美文网首页
两个系统大数据量对接手记

两个系统大数据量对接手记

作者: 董二弯 | 来源:发表于2019-10-02 16:24 被阅读0次

在前几天做了一个需求:外围系统下发业务数据到我方系统做业务处理。当时对方负责人说最多每次只有6万数据量,他们分1000条数据一个包传输到我方系统。

实现方式

提供实时的rest 接口

这种方式是写好处理程序,暴露出去,提供给外围系统rest接口。完成开发后交付测试。然后对方不按约定出牌,2000条一个包,一共发了50几个包,最终结果就是把我方系统测试机跑挂了。所以说最重要的是保证自己系统的健壮性,需求往往是变化的。这种实时方式只适合少量的数据,太大的数据会对系统产生影响。于是我修改模式,换为了第二种方式。

消息队列中间存储

整体流程

  • 接收外围系统数据,先不做处理,500条一次存到消息队列。
  • 消费者处理消息队列中的数据,每处理一个就提交事务,这样避免了在第一种实时方法因为提交事务时间长了而超时。
  • 消费设计:建一个线程池,最大为10,后续为等待线程,消费者使用线程池启动线程,每个线程放500数据。这种方式根据不同的消息队列做不同的设计,有的消息队列自带并发消费模式,这时可以使用自带的模式,不用开线程池。

代码实现

客户为我提供了rocketmq集群的队列,我直接用,省去了我在服务器安装等操作。这里直接上主要代码。

  • 消息生成者
    单例模式,通过spring @value注入消息队列的集群地址。
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息队列生产者
 *
 */
@Component
public class Producer {
   // 定义group
    private static final String GROUP = "";

    private static String namesrvAddr;

    private static DefaultMQProducer producer = new DefaultMQProducer(GROUP);
    private static int initialState = 0;

    private Producer() {

    }

    public static DefaultMQProducer getDefaultMQProducer() {
        if (producer == null) {
            producer = new DefaultMQProducer(GROUP);
        }
        if (initialState == 0) {
            producer.setNamesrvAddr(getNamesrvAddr());
            try {
                producer.start();
            } catch (MQClientException e) {
                e.printStackTrace();
                return null;
            }

            initialState = 1;
        }
        return producer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        this.namesrvAddr = namesrvAddr;
    }
}

  • 消息消费者
    单例模式
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

/**
 * rocketmq消息队列消费者
 *
 */
@Component
public class Consumer {
   // 要和生产者的group相同
    private static final String GROUP = "";
    private static DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(GROUP);
    private static int initialState = 0;
    private static String namesrvAddr;

    private Consumer() {

    }

    public static DefaultMQPushConsumer getDefaultMQPushConsumer() {
        if (consumer == null) {
            consumer = new DefaultMQPushConsumer(GROUP);
        }

        if (initialState == 0) {
            consumer.setNamesrvAddr(getNamesrvAddr());
            consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
            // 设置并发数量
            consumer.setConsumeThreadMin(5);
            consumer.setConsumeThreadMax(10);
            initialState = 1;
        }

        return consumer;
    }

    public static String getNamesrvAddr() {
        return namesrvAddr;
    }

    @Value("${rocketmq.namesrvAddr}")
    public void setNamesrvAddr(String namesrvAddr) {
        Consumer.namesrvAddr = namesrvAddr;
    }
}
  • 消息存储
    封装发送方法,其中topic需要到rocketmq控制台配置,tag自定义,生产和消费保持一致即可。
private void sendMsg(List newList) {
        // 获取消息生产者
        DefaultMQProducer producer = Producer.getDefaultMQProducer();
        try {
            Message msg = new Message(
                    TOPIC,
                    TAG,
                    JSON.toJSONString(newList).getBytes());
            SendResult sendResult = producer.send(msg);
            LOGGER.info("sendResult:{}", sendResult);
        } catch (MQClientException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (RemotingException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (MQBrokerException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        } catch (InterruptedException e) {
            LOGGER.info("-----发送失败:" + e.toString());
        }
    }

接收到数据,分500一次,调用封装的发送方法即可存到消息队列。

  • 消息消费
    设置消费者为开机启动,只要有消息就消费
import com.alibaba.fastjson.JSONArray;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
 * 消费开启执行
 *
 */
@Component
public class ConsumerInit implements CommandLineRunner {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerInit.class);
   //和生产者保持一致
    private static final String TAG = "";
    //和控制台配置保持一致
    private static final String TOPIC = "";
    @Autowired
    private FixedAssetsService fixedAssetsService;

    @Override
    public void run(String... strings) throws Exception {
        receiveMsg();
    }

    private void receiveMsg() {

        // 获取消息生产者
        DefaultMQPushConsumer consumer = Consumer.getDefaultMQPushConsumer();

        // 订阅主体
        try {
            consumer.subscribe(TOPIC, TAG);
            //MessageListenerConcurrently 并行消费
            consumer.registerMessageListener(new MessageListenerConcurrently() {

                /**
                 * * 默认msgs里唯独一条消息,能够通过设置consumeMessageBatchMaxSize參数来批量接收消息
                 */
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(
                        List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                    MessageExt msg = msgs.get(0);
                    if (msg.getTopic().equals(TOPIC)) {
                        if (msg.getTags() != null && msg.getTags().equals(TAG)) {
                            String message = null;
                            try {
                                message = new String(msg.getBody(),"UTF-8");
                            } catch (UnsupportedEncodingException e) {
                                LOGGER.info("message转换失败");
                            }
                            List<TransferSapResultVO> list = JSONArray.parseArray(message, TransferSapResultVO.class);
                            // 消费消息
                            fixedAssetsService.consumerMqMessage(list);
                        }
                    }
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                }
            });

            /**
             * Consumer对象在使用之前必须要调用start初始化。初始化一次就可以<br>
             */
            consumer.start();

            LOGGER.info("Consumer Started.");
        } catch (MQClientException e) {
            LOGGER.info("消费消息错误" + e.toString());
        }
    }
}
  • 测试结果
    根据上述步骤,主要的流程代码已经展示。主要就是接收消息-存储消息-并发消费消息。 开发完成后交付测试。和第一种方式一样2000条数据一个包,50几个包,系统很稳定,速度比第一种方式快了至少3倍,数据也能正确处理完成。

这种方式在性能,速度等方面都非常良好。但是好景不长,说推过来的10几万数据少了3条,让我检查为什么少。我第一反应是难道是丢包了? 如果是丢包,我500条一个包,只能少500的整数倍啊。然后百度发现rocketmq的安全性是可以保证的,几乎不会出现丢包的情况。然后我问对方技术是不是根本就没有传这三条数据,但对方就是咬死说传了。为了验证我说重新在传输一次,结果和第一次一样,还是丢了这3条。我自己感觉是对方肯定没有传,但这种消息队列的方式的缺点就暴露了出来,缺少监控。于是又切换到第三种方式。

中间表模式

整体流程

  • 外围系统插入到我方中间表。插入过程不用我自己管,插入不走程序接口,不会影响系统性能。
  • 做定时任务。每30分钟并发消费中间表数据。已经消费的更新消费标识为1。其中消费标识为0为未消费,1为已消费。
  • 做定时任务。每7天执行一次。删除一个星期前已经消费的数据。
  • 消费设计:每30分钟查询所有未消费的数据,并发进行消费。此时有一个问题,如果前一次没有消费完的数据,在下一次任务又会被查询出来,出现了重复消费的问题。自己的解决方式是,增加批次号字段,设定消费标志为2 是处理中。
    -- 第一步查询出所有未消费的数据。
    -- 设置批次号和消费标志为2(批量跟新)。
    -- 查询当前批次的数据。
    -- 消费数据。完成后把标志置为1。

这种方式有了监控的功能,如果说少了数据,直接在中间表中查,看是否外围系统推送了数据。

总结

当数据量较少时,可以用第一种实时的方式,比较方便。
数据量大且不需要监控功能用第二种方式较好。
数据量大且需要监控功能用第三种方式。
这次需求自己相当于开发了3次。用了3种模式。 积累了经验。以后在做这种需求时,两个问题, 数据量大不大? 是否需要监控?

相关文章

  • 两个系统大数据量对接手记

    在前几天做了一个需求:外围系统下发业务数据到我方系统做业务处理。当时对方负责人说最多每次只有6万数据量,他们分10...

  • 分布式基础-负载均衡

    前言 我理解只所以要分布式系统,无非两个原因数据和计算,单机系统无法保存这么大的数据量,所以要分布式系统来保存; ...

  • 产品经理如何处理系统对接(API对接)类需求?

    一、什么是系统对接类需求? 系统对接,是指两个或多个系统间进行数据交互(系统间一般通过API接口来实现数据交互)。...

  • 2018年驾考新规,一定要看一看

    最近出了18年的驾考新规 先来看看新规的四大亮点在哪里: 一、驾培与公安系统对接 系统对接以后科二、科三学员需要完...

  • 后端产品间系统对接

    在后端产品中,各个子系统之间的对接或者子系统与外部系统之间的对接非常常见,对接的本质是为了实现数据信息的传输与交换...

  • AOP实现记录系统对接外围接口日志记录

    一. 概述 在日常开发中, 我们经常会和其他系统做对接, 系统间的对接有时会出现异常, 为了方便排查接口对接异常,...

  • 政采平台

    B端客户对接—分为多种对接模式 过单—模式 第三方客户有的采取这样的模式 业务系统 库房系统 财务系统(进项发票...

  • 记录一次通过性能日志处理线上性能问题的过程

    在项目发展初期,可能由于数据量和用户访问量的原因,系统不会出现性能问题,但是随着项目发展,数据量发生具体变化,系统...

  • 大数据技术生态

    大数据,大数据,首先表示数据量非常大,一般至少是T级或者P级数据。数据量太大,就会遇到两个最直接的问题:数据如何存...

  • Android 应用push ,so无法加载

    作者君主要做SDK开发,对接一些厂商或运行商的普通应用或系统应用。当对接系统应用时,由于系统应用是由于覆盖机型比较...

网友评论

      本文标题:两个系统大数据量对接手记

      本文链接:https://www.haomeiwen.com/subject/sakuyctx.html