一、前言
- 本文旨在说明在高并发场景下如何通过聚合请求,充分利用数据库的批量处理更高效地实现业务功能。
- 当然,此示例仅用作抛砖引玉,希望能激发读者更深入的思考。
二、正文
- 本示例选取的背景是并发下单业务。常规情况下,后端创建订单是逐条
insert
的操作。在并发较低的时候,数据库的 insert
操作的确能保持不错的效率,但是当遇到请求数量增多,数据库 频繁地单次 insert
就会让下单业务整体效率变低 (本文简单地假设1次下单=1个 insert
)。
- 通过上面的描述,其实已经很容易想到需要优化的地方了。类比现实生活中乘坐电梯的场景:一架电梯 装满后再上行,可以最快地缓解人流压力。
- 下面我们就来用代码简单实现一下我们思路:
<?php
Swoole\Runtime::enableCoroutine($flags = SWOOLE_HOOK_ALL);
// 最大等待次数
const MAX_TIMES = 10;
// 按批处理时, 每一批的最大请求暂留数量
const MAX_REQUEST = 3;
// 服务端最大超时时间, 避免客户端一直等待
const MAX_TIMEOUT = 5;
Co\run(function () {
// 请求传输的channel, 原因是不要在swoole的协程环境中, 使用多个协程修改同一个全局变量
// 如果是golang, 当然是可以不定义这里的$rqChannel
// 只需要简单的将下面的$rqQueue和$times定义为全局变量即可达到一样的效果
// 但是最好的方式任然是是通过channel共享内存
$rqChannel = new Swoole\Coroutine\Channel(MAX_REQUEST);
// 模拟创建订单
$createOrder = function () use ($rqChannel) {
// 使用数组模拟请求暂留队列
$rqQueue = [];
// 使用等待次数模拟tick效果
$times = MAX_TIMES;
while (true) {
$times--;
// 必须带上timeout参数, 否则channel是阻塞的
$rq = $rqChannel->pop(1);
// 保存1个正常的请求数据
if (!empty($rq)) {
$rqQueue[] = $rq;
}
// 请求数量未达上限或者还有等待次数时, 提前进入下一次循环
if ($times > 0 && count($rqQueue) < MAX_REQUEST) {
continue;
}
// 重置等待次数
$times = MAX_TIMES;
// 初始化SQL
$sql = "INSERT INTO orders VALUES ";
$inserts = [];
// 模拟数据验证
$validator = function ($input): bool {
// 为了缩减代码, 没有真的做数据验证的处理
array_filter($input);
return true;
};
// $rqQueue在协程上下文是并发安全的, 所以遍历时不用担心
foreach ($rqQueue as $index => $rq) {
list($data, $chan) = $rq;
// 这里可以考虑后置执行, 原因是后面可以有一些补救逻辑
unset($rqQueue[$index]);
// 判断$chan是否关闭å
if ($chan->errCode === SWOOLE_CHANNEL_CLOSED) {
$data = null;
continue;
}
$bool = $validator($data);
if ($bool) {
$inserts[] = "({$data['user_name']}, {$data['amount']}, {$data['mobile']})";
$chan->push(['state' => 1]);
} else {
$chan->push(['state' => 0]);
}
// unset($rqQueue[$index]);
}
$sql .= (implode(',', $inserts) . ';');
// 模拟创建订单落库的逻辑
echo $sql;
}
};
// 新手要注意这一句代码的位置, 原因是 $server->start() 之后的代码不会执行
go($createOrder);
// 路由处理器
$orderHandler = function ($rq, $res) use ($rqChannel) {
$chan = new Swoole\Coroutine\Channel(1);
// 使用timeout参数模拟超时
$bool = $rqChannel->push([$rq->post, $chan], MAX_TIMEOUT);
if (!$bool) {
// 关闭$chan
$chan->close();
$res->end('timeout');
}
if (!empty($data = $chan->pop())) {
// 关闭$chan
$chan->close();
// 区分成功或失败状态再输出响应
if ($data['state'] === 1) {
$res->end(microtime());
} else {
$res->end('error');
}
}
};
$server = new Co\Http\Server("0.0.0.0", 9502, false);
$server->handle('/order/create', $orderHandler);
// 当前协程容器的终点
$server->start();
});
- 代码整体上还是很容易理解的,变量
$rqQueue
就是类比电梯,暂留请求等待一定时间的次数 $times
就是类比电梯需要等待人流依次进入。当然最在希望读者注意的一点是:在协程环境下,不要使用共享内存而通信,应该使用通信来共享内存。
三、结语
- 本教程面向新手,更多教程会在日后给出。
- 欢迎联系在下,讨论建议都可以,之后会发布其它的教程。
网友评论