秒杀系统介绍
秒杀,是电商网站中常见的功能。
如果采用普通的开发结构系统,那么最大的瓶颈是在底层的数据库端。因为底层数据库(MySQL)是磁盘存储的,所以读写 IO 较慢,而且连接数有限。在秒杀业务场景,最大的特点就是瞬时的高并发,即在短时间内会有大量的请求到来。让所有请求都打到底层数据库上,很大可能会造成数据库直接崩掉,即使数据库能承受住大量的连接请求,但大量的请求读写都会导致大量的锁冲突,导致响应速度大大减慢。而响应速度对于用户体验来说,无疑是十分重要的。
通过上面的描述,需要明确第一个目标:让尽可能少,尽可能有效的请求打到底层数据库。
一般商品库存可能只有抢购用户数的百分之一,甚至更少。这样,可以用队列来对这些抢购请求做排队,在排队当中有抢购成功的用户和抢购不成功的商品,抢购成功的商品需要用户在指定时间内完成支付,如果完成不了支付,这个商品就应该有被用户抢购的可能,而不能把未支付的商品留到秒杀结束,这就需要抢购成功到支付这一区间段的把控,Redis 缓存的到期时间就很容易满足这样的需求,因此在对商品抢购的用户可以用队列来保存,对抢购成功到支付的时间限定由Redis 到期时间来实现,支付完成后就可以完成对数据库的操作。
20210228132829499.png
商品抢购消息队列的选择
这里保存商品抢购用户的商品队列我们采用消息队列 RabbitMQ。消息队列使用的时候,要求生产者 Producter 和 消费者 Consumer 。生产者负责生成消息,消费者使用处理消息。生产,指的是将消息放入消息队列。消费,指的是读取并处理消息。通常一个消息被消费后,就应该从消息队列中删除。不过使用 RabbitMQ 可能有些人感到疑惑,消息队列不是可以通过 Redis 来实现吗?为什么使用 RabbitMQ ,Redis 做消息队列,可以使用 List 这个数据类型。List 里面有两个命令 lpush / rpush 操作来实现入队,然后使用 lpop / rpop 实现出列。但如果需要较高地保证消息的可靠性,就不易使用 Redis ,因为在客户端中,我们会维护一个死循环来不停的从队列中 pop 读取数据,如果队里中有消息,则直接读取,如果没有,就会陷入死循环,直到下一次有消息进入。这种死循环会造成大量的资源浪费,这个时候我们可以使用 b;pop / brpop 去处理,相当于 lpop 的阻塞,当没有消息到来的时候就会休眠,直到消息来临,才唤醒,pop 去读取数据。读取的这个数据发生宕机未处理的时候,就会导致消息意外消失,同时一个消息如果存在重复消费,Redis 也是做不到的,它的特点是一旦消费了就会被删除。
消息队列的特点
解耦性
以用户下单举例,订单系统需要通知库存系统。传统的套路就是订单系统调用库存系统的接口。
20210228132921366.png
如果引入了消息队列
20210228132931185.png
使用这种情况,当用户在下单时,库存系统如果不能正常使用,也不影响下单,因为下单后。订单系统写入消息队列就不再关心其他的后续操作了。实现了订单系统与库存系统的应用解耦。
异步提升效率
以用户注册功能举例,需要用户发送注册邮件和注册短信。传统的做法有两种 “串行”,“并行”。
-
串行:将注册信息写入数据库成功后,发送注册邮件,再发送注册短信。
20210228132941379.png -
并行:将注册信息写入数据库成功后,发送注册邮件的同时发送注册短信。
20210228132950921.png
并行的方式相比串行的方式可以提高处理的时间。
引入消息队列,改造后的架构如下:
20210228133012361.png
流量销峰
流量销峰也是消息队列中的常用场景,一般在秒杀或团抢活动中使用广泛。
以一个秒杀系统每秒请求量 1000 个来举例,在这种情况下,系统是可以稳定运行的。如果系统每天晚上八点有秒杀活动,每秒并发请求量增至 10000 条,但是系统最大的处理能力只能每秒处理 1000 个请求,于是系统崩溃了,服务器宕机了。这里引入消息队列,就变成下图所示:
20210228133021437.png
就算有 100万用户在高峰期的时候发出请求,每秒请求数有 5000个左右,将这 5000 请求一并写入消息队列 MQ 里面,而对系统来说,根据规则每秒最多只能处理 2000 请求。
实现Redis 抢购成功的商品计数
假如当前的秒杀系统只有 100 件商品,但可能会收到 10万条抢购请求。这不要紧,只需要 Redis 中定义规则就可以,满足规则的就抢购成功,不满足规则的就抢购失败。最简单直接的方法,实现一个 count 变量,每个请求进入都加 1 ,当 count 大于 100 时则直接返回失败即可。
redis_pool = redis.ConnectionPool(host="localhost", port=6479, password="", max_connections=512)
redis_conn = redis.Redis(connection_pool=redis_pool)
def plus_counter(goods_id, storage=100):
count = redis_conn.incr("counters: " + str(goods_id))
if count > storage: return False
return True
这段代码中函数pluscounter的逻辑还是很容易理解的,其中的storage参数意义在于只有100件商品需要秒杀。在函数pluscounter前的redis_pool变量实现了redis的连接池,在RedisPool函数中初始化host参数,指代redis服务器地址,port参数指代redis服务的端口号,password参数指代redis服务的密码,这里没有密码,所以是空(“”),maxconnection指代了连接池的最大连接量。然后将这个初始化的连接池RedisPool作为Redis的参数,与redis服务器进行连接。这样才会发生pluscounter函数中逻辑,每次请求就对对抢购中的商品数量增加一个,当这个数量大于100的库存数量时就返回错误,否则返回抢购成功的正确布尔值。
注意,这段代码中如果使用redis的内容,需要使用pip3 install redis来安装redis相关服务,同时也需要从网络上下载redis-server的安装包,使当前电脑成为redis服务器(这里使用本机作为redis服务器)。
抢购成功后商品订单的存储
当用户请求的数量在 Redis 定义的 plus_counter 这个规则数量时候,就成功抢到了商品,Redis 就要完成抢购商品订单的存储,这里可以用 hash 表来存储用户抢到的商品订单,以表明数据的唯一性。hash 表的键可以使用商品的id,用户的id和订单的id号一起构成。
def create_order(order_info):
user_id = order_info.get("user_id")
order_id = order_info.get("order_id")
goods_id = order_info.get("goods_id")
redis_conn.hset("order:"+str(goods_id),str(order_id), str(user_id))
return True
代码中函数 create_order 是在 Redis 中形成用户的一个抢购成功订单信息,这个信息是一个 hset 函数建立的哈希表,键是由商品id 变量 goodsid ,订单id 变量 order_id和用户id user_id 形成,这三个变量的值是通过 create_order 的字典参数 order_info 传递过来的, order_info 就是主程序中形成的。
Python 操作 RabbitMQ 服务器端订单生产者程序
rabbitmq程序可以从网络上下载rabbitmq的安装包,直接安装成功后通过下面命令安装rabbitmq的管理包。
rabbitmq-plugins enable rabbitmq_management
Rabbitmq-plugins的执行需要在windows环境中设定环境变量,在环境变量中设定包含rabbitmq安装路径,就可以通过http://localhost:15672的网页地址访问rabbitmq的管理程序。访问页面如下。
20210228133034231.png上图界面中默认的username和password均是guest。进入界面后就可以用网页界面的格式观看rabbitmq的管理信息了。
这里把当前服务器也当成了rabbitmq服务器。
首先Python需要连接这样的服务器,Python连接rabbitmq服务器需要使用pika这个模块,可以使用pip3管理包进行安装,安装命令如下。
pip3 install pika
接下来,就可以使用 pika 模块来连接 RabbitMQ 服务器。由于我们用网页管理的形式连接 RabbitMQ 时需要用户名和密码认证,用 pika 模块连接 RabbitMQ 时也需要认证用户名和密码信息。
credentials=pika.PlainCredentials("guest","guest")
rabbitmq_conn=pika.BlockingConnection(pika.ConnectionParameters(host="localhost",credentials=credentials))
代码中首先用 pika 中 PlanCredentials 类中指定用户名和密码实现验证信息,接下来调用 BlockingConnection 阻塞连接模块完成对 RabbitMQ 的连接。
下面就进行 RabbitMQ 订单信息处理
RabbitMQ 不能单纯地理解成把信息直接放在队列中去等待,实际上任何 message model 消息不直接发送到 queue 队列中,中间有个 exchange 是做消息分发的,producer 甚至不知道发送到那个队列中。因此,当 exchange 收到 message 时,必须准确知道如何分发,这种信息通过 exchange 分门别类放在一起,有助于信息的管理,并不是所有的信息都聚在一个队列中,不利于管理。然后再 append 到一定规则的 queue ,还是 append 到多个 queue 中,还是被丢弃?这些规则都是通过 exchange 的 type 去定义的。
topic exchange 是最灵活的 exchange ,它会把 exchange 的 routing_key 与绑定队列的 routing_key 进行模式匹配。Routing_key 中可以包含"" 和 "#" 两种符号。# 号可以用来匹配一个或多个单词, 用来匹配一个单词。
在使用 exchange 交换机进行 queue 分配前,调用 rabbitmq 的 channel 通道,通道就消息的通信流。获取通道的代码语句如下:
channel = rabbitmq_conn.channel()
生产者需要把信息在通道内交给交换机,exchange_declare 函数起到了对交换器进行声明,函数中的参数 exchange 是交换器的名称,type 是交换器的类型,这里有很多中类型,订单的生产者我们使用 topic 类型,通过 routing_key 进行队列模式匹配,参数 durable 是设置是否持久化,其值为 true 表示持久化,反之是非持久化,持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。声明 exchange 交换器语句如下:
exchange="order.exchange"
channel.exchange_declare(exchange=exchange,exchange_type="topic",durable=True)
紧接着,通过 exchange 交换器参数进行匹配队列的绑定,queue_bind 就是对 rabbitmq 队列进行绑定的函数。代码语句如下:
queue = "order.queue"
channel.queue_bind(exchange=exchange, queue=queue)
将队列绑定和交换器声明后,就可以通过 basicPublish 发送消息,指明发送消息时 exchange 名称, routing_key 的名称,在body 参数中指发送的消息内容,消息内容就是订单的具体信息,其中一定包含有商品的id,订单的id,用户的id 。
routing_key="order."+str(goods_id)+"."+str(user_id)
channel.basic_publish(exchange=exchange,routing_key=routing_key,body=message)
代码语句中的 rongting_key 是由 goods_id 和 user_id 构成的,对应的用户和商品名称下的订单信息,也可以只由 user_id 构成,对应的用户下面的订单信息,这样,订单信息队列的 rabbitmq 生产者代码如下:
def enter_order_queue(order_info):
user_id=order_info.get("user_id")
order_id=order_info.get("order_id")
goods_id=order_info.get("goods_id")
if user_id is None or order_id is None or goods_id is None:
return False
channel=rabbitmq_conn.channel()
exchange="order.exchange"
queue="order.queue"
routing_key="order."+str(goods_id)+"."+str(user_id)
channel.exchange_declare(exchange=exchange,exchange_type="topic",durable=True)
channel.queue_bind(exchange=exchange,queue=queue)
message=json.dumps(order_info)
channel.basic_publish(exchange=exchange,routing_key=routing_key,body=message)
return True
代码中的函数使用orderinfo订单信息做为参数,代码中首先获取orderinfo中的goodsid、orderid和userid等参数信息,如果这些信息都为空,就证明是错误的信息,将返回错误,接下来代码获取rabbitmq的通道信息,exchangedeclare语句进行交换机声明,queuebind进行队列绑定,orderinfo的json信息转化,最后通过basic_publish进行消息的发送。
Python操作 RabbitMQ 服务器支付过期生产者程序
前面完成了 RabbitMQ 服务器订单生产者程序,通过 exchange_declare 交换机声明,queue_bind 队列额绑定,然后 basic_publish 消息发送。支付过期生产者与订单生产者程序类似,只不过需要过期时间队列的声明,队列声明时指定参数 x-message-ttl 来更改队列 TTL 值的有效时间,参数x-dead-letter-exchange 指明出现死信 dead letter 之后将 dead letter 重新发送到指定的 exchange ,参数 x-dead-letter-routing-key 指明出现死信 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送。这种声明也是 RabbitMQ 对死信队列的操作,死信,死掉的信息,就是消费者未处理就已经丢失,例如消费者未启动,生产者发出消息至交换机,交换机没有找到相应的队列,此消息就会丢失。如果这些消息很重要,而我们又需要,现在就有一种方法可将这些死信消息存下来,那就是死信交换机DLX(Dead Letter Exchange),DLX 也没那么复制,它就是个普通的交换机,它可以是 topic 也可以是 fanout 等类型。
fanout 类型是处理逻辑最简单的 exchange 类型。实际上它没有任何逻辑,它把进入该 exchange 的消息全部转发给每一个绑定的队列中,如果这个 exchange 没有队列与之绑定,消息会被丢弃。原理图如下: 20210228133055628.png除了 fanout 类型外,还有一种 direct 类型,这种类型的交换机更智能一些,它会根据 routing key 来决定把消息具体扔到哪个队列中。通过 exchange 发消息的时候会指定一个 routing key ,只有当 routing key 和与队列绑定的 routing key 一样的时候,消息才对发送到对应的消息队列。即,如果与某个队列绑定的 routing key 叫 hello.world 则通过 exchange 发送的 routing key 必须也是 hello.world ,该队列才能接收到消息。这种情况下,队列之间是互斥关系,一个消息最多只能进入一个队列。
20210228133102234.png
我们在处理死信队列时使用 fanout 类型。死信队列的实现步骤:
- 业务队列里配置好死信交换机和 routing key
- 消息发送到业务交换机
- 业务交换机转发到业务队列
- 业务队里将死信发送到死信交换机
- 再由死信交换机转发到死信队列
逻辑的具体流程如下:
20210228133110292.png
代码如下:
def enter_overtime_queue(order_info,timeout=15):
user_id=order_info.get("user_id")
order_id=order_info.get("order_id")
goods_id=order_info.get("goods_id")
if user_id is None or goods_id is None or order_id is None:
return False
channel=rabbitmq_conn.channel()
delay_exchange="overtime.exchange.delay"
delay_queue="overtime.queue.delay"
exchange="overtime.exchange"
queue="overtime.queue"
channel.exchange_declare(exchange=exchange,exchange_type="fanout",durable=True)
channel.queue_bind(exchange=exchange,queue=queue)
arguments={
"x-message-ttl":1000*60*timeout,
"x-dead-letter-exchange":exchange,
"x-dead-letter-routing-key":queue
}
channel.exchange_declare(exchange=delay_exchange,exchange_type="fanout",durable=True)
channel.queue_declare(queue=delay_queue,durable=True,arguments=arguments)
channel.queue_bind(exchange=delay_exchange,queue=delay_queue)
message=json.dumps(order_info)
channel.basic_publish(exchange=delay_exchange,body=message,routing_key="")
return True
代码定义的函数有两个参数,一个是 order_info 订单信息,一个是 delay 死信的 ttl 时间,程序开始也是把 order_info 中的相关信息如 user_id,goods_id,order_id 等内容提取出来,如果提取出来的为空,则返回错误。声明 业务交换机,绑定在 delay 延时的业务队列中,接下来进行死信交换机的声明,死信队里的声明,queue_declare 声明的就是一个死信队列,arguments 参数指明了 ttl 的死信时间以及出现死信发送到 exchange 和 routing_key 。再把 order_info 作为 json 数据,最后 basic_publish 发送消息。这样就完成了死信队列的逻辑代码。
flask 实现秒杀系统的整合
这里通过用户输入地址访问抢购接口的逻辑是通过 flask 框架实现的,通过用户请求地址时传送的user_id 和 goods_id 构建 order_info 的订单信息,通过 Redis 的秒杀总数逻辑控制,RabbitMQ 的订单信息生产者和死信队列逻辑,如果执行成功,就会返回 True 标志,最终将订单抢购成功的 json 信息发送前端。这一过程中,如果发生错误,就将订单抢购失败的 json 信息返回前端。代码如下:
from flask import Flask,request
from tst7 import plus_counter,create_order
from tst9 import enter_order_queue
from tst9 import enter_overtime_queue
import uuid
import jsonify
import json
app=Flask(__name__)
@app.route("/purchase")
def purchase():
user_id=request.args.get("user_id")
goods_id=request.args.get("goods_id")
res={
"status":False,
"msg":""
}
flag=plus_counter(goods_id)
if flag:
order_id=uuid.uuid1()
order_info={
"goods_id":goods_id,
"user_id":user_id,
"order_id":str(order_id)
}
try:
create_order(order_info)
enter_order_queue(order_info)
enter_overtime_queue(order_info)
res["status"]=True
res["msg"]="抢购成功,请在15分钟内付款"
res["order_id"]=str(order_id)
res=json.dumps(res,ensure_ascii=False)
print(res)
print("成功")
return app.response_class(res,content_type="application/json")
except Exception as e:
print("log:",e)
res["status"]=False
res["msg"]="抢购出错,请重试"+str(e)
res=json.dumps(res,ensure_ascii=False)
print(res)
print("失败")
return app.response_class(res,content_type="application/json")
else:
res["status"]=False
res["msg"]="商品已售馨"
res=json.dumps(res,ensure_ascii=False)
print(res)
print("失败")
return app.response_class(res,content_type="application/json")
if __name__=="__main__":
app.run()
网友评论