美文网首页
读取大量数据批量更新

读取大量数据批量更新

作者: 10xjzheng | 来源:发表于2018-12-24 17:48 被阅读11次

    1. 先看结果对比

    • 1.1 测试数据

    测试租户:znfangcadmin

    经纪人数据量:199003

    做的事情:將经纪人表的mobile_tel_encrypted更新为mobile_tel加密后的字符串,原本mobile_tel_encrypted字段为空。

    • 1.2 优化前

    CPU使用率和内存使用率:

    image.png

    可以看到CPU实用率基本在60%徘徊,内存是在11%徘徊。

    耗时:更新工具记录上一次是执行时间:


    image.png
    • 1.3优化后

    image.png

    可以看到CPU实用率基本在16%徘徊,内存是在0.3徘徊。

    耗时:测过3次,基本是3分钟左右


    image.png

    2. 优化思路

    首先优化前用的是yii2框架的each/batch获取数据,每次拿到100条数据后就处理更新(更新在foreach里面完成,直接是赋值后$model->save()),看源码用的是pdo的fetch方法,正常的话,这个获取数据是用游标的方式,但是执行测试过程中,发现cpu和内存居高不下。并且在中途还会报php内存用完的错:
    Allowed memory size of 1073741824 bytes exhausted (tried to allocate 82 bytes)
    太变态了,1g都用完了。。。。什么垃圾代码呀,说好的batch能节省内存的呢?这里头肯定有哪个地方出差错了。刚开始觉得有可能是php版本太低(现在公司是php5.6 历史原因,不会轻易升级的),后来记起来前公司同事之前也试过处理大量权限代码的,前公司用的是php7也是报内存耗尽。

    下午要发版本,不管了,先用mysqli的MYSQLI_USE_RESULT,这个之前试过,可行的。

    于是优化主要方向:

    1. 将pdo的fetch查询改完mysqli的query(并采用MYSQLI_USE_RESULT);
    2. 一条一条更新改完批量更新。

    于是优化后的代码:
    这是自己写的mysqli的一个trait

    <?php
    /**
     * Created by PhpStorm.
     * User: zhengxj
     * Date: 2018/12/24
     * Time: 15:23
     */
    
    namespace common\traits;
    
    trait MysqliTrait
    {
    
        /**
         * 解析租户库连接字符串,并创建数据库连接对象返回
         * @return \mysqli
         */
        private function getOrgDbConn(){
            $result = $dbConnection; //你是数据库连接
            //自己定义读写超时常量
            if (!defined('MYSQL_OPT_READ_TIMEOUT')) {
                define('MYSQL_OPT_READ_TIMEOUT',  11);
            }
            if (!defined('MYSQL_OPT_WRITE_TIMEOUT')) {
                define('MYSQL_OPT_WRITE_TIMEOUT', 12);
            }
            $mysqli = mysqli_init();
            $mysqli->options(MYSQL_OPT_READ_TIMEOUT, 10000);
            $mysqli->options(MYSQL_OPT_WRITE_TIMEOUT, 10000);
    
            //连接数据库
            $port = isset($result['port'])?$result['port']:null;
            $mysqli->real_connect($result['server'], $result['uid'], $result['pwd'],$result['database'],$port);
            if ($mysqli->connect_errno) {
                return null;
            }
            $mysqli->set_charset("utf8");
            return $mysqli;
        }
    
        /**
         * 对每一行进行处理的函数,可被重写
         * @param $row
         * @return array
         */
        public function dealRow(&$row)
        {
            return $row;
        }
    
        /**
         * 从mysql服务器读取大量数据并处理数据
         * @param string $sql 执行的sql
         * @param int $limit 每次处理多少条数据
         * @param callable $handleFunction 处理方法
         * @throws \Exception
         * @return int
         */
        public function handleData($sql, $limit = 500, callable $handleFunction)
        {
            $db = $this->getOrgDbConn();
            $total = 0;
            try {
                $records = [];
                //读取大量的数据时使用 MYSQLI_USE_RESULT
                if ($result = $db->query($sql,MYSQLI_USE_RESULT)){
                    while ($row = $result->fetch_assoc()) {
                        $total ++;
                        $records[] = $this->dealRow($row);
                        if(count($records) >= $limit){
                            call_user_func_array($handleFunction, [$records]);
                            unset($records);
                            $records = [];
                        }
                    }
                    if(count($records)> 0){
                        call_user_func_array($handleFunction, [$records]);
                    }
                    $result->free();
                }else{
                    echo "mysql 查询失败:errno:".$db->errno.",error:".$db->error;
                }
                return $total;
            }  catch(\Exception $e){
                $db->close();
                throw $e;
            }
        }
    }
    

    controller代码

    use MysqliTrait; //使用trait
    public function actionTest($action, $mobiles='')
    {
                $sql = 'SELECT b_regbrokerId,mobile_tel,capacity_des,mobile_tel_encrypted FROM `b_regbroker`';
                if($action == 'part') {
                    $sql .= ' WHERE mobile_tel != "" and mobile_tel_encrypted = ""';
                } elseif ($action == 'mobile') {
                    if(empty($mobiles)) {
                        die('Error mobiles !');
                    }
                    $mobilesStr = '("'. implode(',"', $mobiles) .'")';
                    $sql .= ' WHERE mobile_tel IN '.$mobilesStr;
                }
                echo '开始处理时间: ' . date('Y-m-d H:i:s', time()) . PHP_EOL;
                $db = BrokerEntity::getDb();
                $logger = $this->logger;
                //回调函数
                $function = function ($updateData) use ($db, $logger) {
                    if(empty($updateData)) {
                        return;
                    }
                    $updateSql = 'Update `b_regbroker` set `mobile_tel_encrypted` =  CASE `b_regbrokerId`';
                    foreach ($updateData as $item) {
                        $updateSql .= " WHEN '{$item['b_regbrokerId']}' THEN '{$item['mobile_tel_encrypted']}'";
                    }
                    $updateSql .= ' END WHERE `b_regbrokerId` IN ("'.implode('","', array_column($updateData, 'b_regbrokerId')).'")';
                    try {
                        $db->createCommand($updateSql)->execute();
                    } catch (\Exception $e) {
                        $logger->error('update error:'.$e->getMessage());
                    }
                };
                $total = $this->handleData($sql, 1000, $function); //此方法就是用trait的handleData代码
                echo '完成处理时间: ' . date('Y-m-d H:i:s', time()) . PHP_EOL;
    }
      /**
         * 对每一行进行处理的函数,可被重写
         * @param $row
         * @return array
         */
        public function dealRow(&$row)
        {
            $row['mobile_tel_encrypted'] = TelSecurity::encrypt($row['mobile_tel']);
            return $row;
        }
    

    20181225更新
    昨天完成了优化,今天有点空,研究一下为什么yii2的each/batch没有作用。
    在网上查了很久,发现这个:

    image.png
    直接打开连接:
    http://php.net/manual/en/mysqlinfo.concepts.buffering.php
    • 主要解释:

    Buffered and Unbuffered queries

    Queries are using the buffered mode by default. This means that query results are immediately transferred from the MySQL Server to PHP and then are kept in the memory of the PHP process. This allows additional operations like counting the number of rows, and moving (seeking) the current result pointer. It also allows issuing further queries on the same connection while working on the result set. The downside of the buffered mode is that larger result sets might require quite a lot memory. The memory will be kept occupied till all references to the result set are unset or the result set was explicitly freed, which will automatically happen during request end the latest. The terminology "store result" is also used for buffered mode, as the whole result set is stored at once.

    Note:

    When using libmysqlclient as library PHP's memory limit won't count the memory used for result sets unless the data is fetched into PHP variables. With mysqlnd the memory accounted for will include the full result set.

    Unbuffered MySQL queries execute the query and then return a resource while the data is still waiting on the MySQL server for being fetched. This uses less memory on the PHP-side, but can increase the load on the server. Unless the full result set was fetched from the server no further queries can be sent over the same connection. Unbuffered queries can also be referred to as "use result".

    Following these characteristics buffered queries should be used in cases where you expect only a limited result set or need to know the amount of returned rows before reading all rows. Unbuffered mode should be used when you expect larger results.

    Because buffered queries are the default, the examples below will demonstrate how to execute unbuffered queries with each API.

    我的理解是:默认情况下,查询是以缓存模式进行,这意味着mysql服务器查询的数据返回后会存储在php的内存中。如果查询大数据,就要求给php分配的内存必须足够大。

    缓存模式适用于读取一个有限集合,或者在读取所有行之前先读取一部分数据。而大批量数据则得用到非缓存模式。

    下面是mysqli和pdo的例子:

    • mysqli (之前优化采取的方式就是用这个)
    <?php
    $mysqli  = new mysqli("localhost", "my_user", "my_password", "world");
    $uresult = $mysqli->query("SELECT Name FROM City", MYSQLI_USE_RESULT);
    
    if ($uresult) {
       while ($row = $uresult->fetch_assoc()) {
           echo $row['Name'] . PHP_EOL;
       }
    }
    $uresult->close();
    ?>
    
    • pdo
    $pdo = new PDO("mysql:host=localhost;dbname=world", 'my_user', 'my_pass');
    $pdo->setAttribute(PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);
    
    $uresult = $pdo->query("SELECT Name FROM City");
    if ($uresult) {
       while ($row = $uresult->fetch(PDO::FETCH_ASSOC)) {
           echo $row['Name'] . PHP_EOL;
       }
    }
    

    我马上试了一下,测试代码:

        /**
         * 初始化经纪人加密手机号
         * php yii qdgj/paas-accounts/test-mobile (part/init/mobile) (18812342234,18812343234)  --orgcode=fangzhiadmin_test
         * @param string $action
         * @param string $mobiles 多个,隔开
         * @return mixed
         */
        public function actionTestMobile($action = 'part', $mobiles = '')
        {
            if(!in_array($action, ['part','init','mobile'])) {
                die('Error Params !');
            }
            try {
                /** @var Connection $connection */
                $connection = BrokerEntity::getDb();
                $pdo = new \PDO($connection->dsn, $connection->username, $connection->password);
                $pdo->setAttribute(\PDO::MYSQL_ATTR_USE_BUFFERED_QUERY, false);
    
                $uresult = $pdo->query('SELECT b_regbrokerId,mobile_tel,capacity_des,mobile_tel_encrypted FROM `b_regbroker` WHERE mobile_tel != "" and mobile_tel_encrypted = ""');
                if ($uresult) {
                    while ($row = $uresult->fetch(\PDO::FETCH_ASSOC)) {
                        print_r($row);
                    }
                }
            } catch (\Exception $e) {
                var_dump($e->getMessage());
            }
            return true;
        }
    

    cpu和内存使用率果然降下来了:

    $ ps aux | grep test-mobile
    www      18847  7.4  0.2 336084 23180 pts/1    S+   11:44   0:01 php yii qdgj/paas-accounts/test-mobile part --orgcode=znfangcadmin
    

    回到正题,yii2的each/batch为啥无效,因为人家用的pdo默认都是用缓存模式,代码又没有设置这个模式,当然就没有用了,如果要生效,还必须自己设置那个pdo模式。

    相关文章

      网友评论

          本文标题:读取大量数据批量更新

          本文链接:https://www.haomeiwen.com/subject/itkzkqtx.html