美文网首页面试精选redis
使用OpenResty+Redis+RocketMQ构建秒杀系统

使用OpenResty+Redis+RocketMQ构建秒杀系统

作者: 肥兔子爱豆畜子 | 来源:发表于2021-10-23 22:22 被阅读0次

看了https://mp.weixin.qq.com/s/8DFA36YvDdRSaM4JwNBWpQ之后,也仿写了一个秒杀系统,称为v1版。最近对openresty和lua有了一些技术储备、就开发了一套新的v2版秒杀。

seckill-v2秒杀系统

一、功能介绍

秒杀系统v2,主要提供3个接口方法:

  1. /seckill/rest/OutletAndStock GET

    查询接口,返回当前开放的预约网点以及库存信息。

  2. /seckill/rest/appointment POST

    下单预约接口,接收姓名、身份证、手机号、预约网点等信息,执行预约逻辑。

  3. /seckill/rest/refreshRation POST

    更新了数据库里的配给和库存之后,同步到redis里。给管理端调用的。

二、软件架构设计

seckill-v2.png

1、负责均衡层

openresty + lua 来做负载均衡层,可以集群部署,上面用F5或lvs来做接入。

这一层主要是解决限流(nginx限流),防刷逻辑(比如同一个ip+token每x秒只允许一次请求),还一个是做库存感知、没货以后马上阻挡(比如返回一个静态页面)接下来的无效请求到后端核心服务。

(1)库存感知timer 每1s查询redis里网点的剩余库存,更新到openresty本地share-dict,share-dict里边存放剩余库存给前端展示用,另外下单预约请求先查一下share-dict、如果库存没了则直接返回前端“活动已结束”。

(2)网点和库存查询接口 返回本地share-dict里的剩余库存给前端。

(3)预约购买次数限制校验 每人每5天只能预约购买一次。redis里边维护一份预约购买列表,由timer加载到share_dict。(核心服务层还会做一次这个校验)

(4)限流模块
虽然前面的库存感知和预约次数限制校验可以在这里限制大部分无效请求进入,但是考虑到极端情况,比如1秒内有10万这种级别的流量涌入、库存感知和次数校验还来不及与redis同步一致,所以这些请求流量会穿过openresty到达后面的核心服务层,对于核心服务层的java应用来说这个级别的请求处理起来是比较吃力的,所以我们需要在负载均衡层这里做一下限流。这里采用nginx自带的限流功能:
nginx.conf http:

limit_req_zone $binary_remote_addr zone=perip_rps:10m rate=5r/s; #单ip每秒限制5个请求
limit_req_zone $server_name zone=perserver_rps:10m rate=3000r/s; #每个server每秒限制处理3000个请求

server:

limit_req zone=perserver_rps burst=2000 nodelay;  #server每秒请求限流

location:

limit_req zone=perip_rps burst=10 nodelay; #每个ip每秒请求如果超过limit_req_zone的配置,最多可以缓冲10个

这里我们利用limit_req做了两个维度的限流,首先是单个ip限制每秒5个请求、突发最多允许10个,这里配置了nodelay意思是正常情况下应该是200ms漏桶通过一个请求,但如果一下来了10个请求的话也是可以给通过,只不过后续要过2s才可以通过下一个请求。
然后是整个server限制3000的rps,允许突发2000。

2、核心服务层

由springboot + redis组成,redis里边存订单和库存信息。

(1)下单预约接口,由redis lua script保证扣库存操作的原子性,然后将订单生成请求提交给mq。

除了库存判断之外,这里还要做一次预约购买次数限制的校验,原因在于nginx上面是定时(比如500ms一次)去redis读取blacklist的。在扣库存之前一定要校验一下预约次数的规则。

再一个可选的查询商品信息和库存的接口、之所以可选是因为如果是特异化的秒杀系统,完全可以把商品信息静态化到cdn上的页面上去。另外库存不显示也没太大问题,库存没了会由负载均衡层动态判定并拦截掉、或者活动结束以后直接挂一个活动结束的页面上去。

3、异步服务层

核心服务层我们尽量做到功能单一化,把可以异步处理的逻辑用RocketMQ从核心服务中剥离出来,只保留必要的逻辑供负载均衡层过来的流量进行同步调用。RocketMQ这里起到的便是削峰缓冲的作用了,提高整体的吞吐能力。这样异步逻辑由于不直接承载C端的流量,并且异步服务作为末端业务逻辑相比最前端的负载均衡层流量要下降几个数量级(想象10万人抢100个商品,最后走到异步服务层去生成订单落库),可以作为mq的消费端以较少的算力资源进行部署。这些异步逻辑可能包括订单写入数据库等等。

(1)异步订单入库: 从RocketMQ中拿订单,然后写入MySQL。消费和入库都使用批量处理,以提高效率。

(2)每天去redis更新维护一下购买记录,做每日购买次数限制用。

三、数据结构

redis数据结构

某网点库存
key:outlet:{id}:date:{date}:stock
value:String类型,存放网点的库存
例如,key: outlet:1:date:2021-10-01:stock , value: 3000

某日内已预约的身份证名单
key:appointment:idNos:{date}
value:Set类型 idNo1 ... idNon

mysql数据结构

网点表

CREATE TABLE `t_seckill_outlet` (
  `outlet_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `address` varchar(150) DEFAULT NULL COMMENT '网点地址',
  `outlet_name` varchar(50) DEFAULT NULL COMMENT '网点名称',
  PRIMARY KEY (`outlet_id`)
) 

预约记录明细表

CREATE TABLE `t_seckill_appointment` (
  `appointment_id` bigint(20) NOT NULL AUTO_INCREMENT,
  `date` varchar(10) DEFAULT NULL COMMENT '日期',
  `id_card` varchar(18) DEFAULT NULL COMMENT '身份证号',
  `mobile` varchar(11) DEFAULT NULL COMMENT '手机号',
  `name` varchar(15) DEFAULT NULL COMMENT '姓名',
  `outlet_id` mediumtext COMMENT '网点ID',
  PRIMARY KEY (`appointment_id`)
) 

四、核心代码

openresty上的lua代码和nginx配置文件:
定时timer我们放在nginx.conf的http位置
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;
task_timer.lua代码如下:

--[[
    定时从redis加载网点和库存到本地内存
]]
local redis = require "wangan.common.redis_iresty"
local cache = require "wangan.common.share_cache"

local red = redis:new({
    ip = "122.51.114.183",
    port = 6379,
    password = "7474@628",
    timeout = 2000,
    db_index = 0,
    max_idle_ms = 60000,
    pool_size = 32
})


local delay = 1  --每delay秒跟redis同步一次数据

local handlerRepeat

handlerRepeat = function ( ... )
    --ngx.log(ngx.INFO, "从redis加载网点和库存到本地内存...")
    local len = red:llen("outlets:ids")

    local outlets = red:lrange("outlets:ids", 0, len)

    local today = ngx.today()

    for _, v in pairs(outlets) do --遍历outlet id
        local stock_key = "outlet:" .. v .. ":date:" ..today .. ":stock"

        local stock = red:get(stock_key)    --从redis查到当日这个网点的库存

        if stock then
            --ngx.log(ngx.INFO, stock_key)
            --ngx.log(ngx.INFO, stock)
            local cache_v = cache.get_from_cache(stock_key)

            local ok, err = cache.set_to_cache(stock_key, stock, 30)  --缓存在本地内存shared_dict

            if not ok then
                ngx.log(ngx.ERR , "写入本地缓存失败:", err)
            end

            --[[
            if cache_v then
                ngx.log(ngx.INFO, "shared_dict中" .. stock_key .. "的库存是" .. cache_v)
            end

            if not cache_v then
                ngx.log(ngx.INFO, cache_v)
            end
            ]]
        end

    end
end

local ok, err = ngx.timer.every(delay, handlerRepeat)
if not ok then
    ngx.log(ngx.ERR, "创建timer.every(delay, handlerRepeat)失败:", err)
    return
end

如上面这样openresty的本地内存里边就有每个网点的剩余库存了,且1秒跟redis同步一次,数据比较实时。当库存没了可以直接从本地内存查到并返回客户端,不用再去redis或者去后边的核心服务去查了。很大程度上提高了性能。

所以接下来就是在请求的access阶段去做这个库存校验:

--[[
    预约校验
]]

local cache = require "wangan.common.share_cache"

local json = require "cjson"

--先读request body
ngx.req.read_body()
--从request body里获取参数
--local args = ngx.req.get_post_args()


--获取request body data
local request_body_data = ngx.req.get_body_data()

if not request_body_data then
    ngx.say("request body is nil")
    return
end

ngx.log(ngx.INFO, "request body string", request_body_data)
--将request body data解析为json
local request_body_json = json.decode(request_body_data)

--ngx.log(ngx.INFO, "request body json", request_body_json)


--请求参数校验
local outletId = request_body_json.outletId

if not outletId then
    ngx.say("网点id不可为空")
    return
end


--库存剩余校验
local stock_key = "outlet:" .. outletId .. ":date:" .. ngx.today() .. ":stock"
ngx.log(ngx.INFO, "stock_key ", stock_key)
local stock = cache.get_from_cache(stock_key);

if not stock then
    ngx.say("未查到库存,该网点尚未开始预约")
    return
end

ngx.log(ngx.INFO, stock_key .. " , 当前库存: ", stock)

if tonumber(stock)<=0 then
    ngx.say("库存已空,已预约完毕,感谢参与");
    return
end

关于openresty的执行阶段,可以进一步参考:https://blog.51cto.com/lisea/2425794

然后是nginx.conf配置:

lua_shared_dict my_cache 128m;
    
init_worker_by_lua_file lua/wangan/seckill/task_timer.lua;

upstream seckillcore {
    server 127.0.0.1:8080;
}
server {
        listen       80;
        server_name  localhost;

        #开发调试模式、关闭lua代码缓存,生产环境请勿关闭
        lua_code_cache off;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location / {
            root   html;
            index  index.html index.htm;
        }

        location /seckill/rest/appointment {
            default_type text/html;
            access_by_lua_file lua/wangan/seckill/appointment_check.lua;
            proxy_pass http://seckillcore;
            proxy_redirect default;
        }
}

核心服务里的java代码:

/**
 * 核心业务逻辑
 * */
@Slf4j
@Service
public class SeckillService {
    
    @Autowired
    private RedisDao redisDao;
    
    @Autowired
    private DefaultRedisScript<List> deductMyStock;
    
    @Autowired
    private GeneralMqProducer generalMqProducer;

    /**
     * 预约业务逻辑
     * 
     * */
    public String appointment(@RequestBody AppointmentDto dto) {

        AppointmentDetail appointDetail = new AppointmentDetail();
        appointDetail.setName(dto.getName());
        appointDetail.setDate(LocalDate.now().toString());
        appointDetail.setIdCard(dto.getIdCard());
        appointDetail.setMobile(dto.getMobile());
        appointDetail.setOutletId(dto.getOutletId());
        log.info(JSON.toJSONString(appointDetail));
        
        //使用redis script扣库存, 如成功则添加此身份证号到redis里的已预约列表
        List<String> keys = new ArrayList<>();
        keys.add("outlet:"+dto.getOutletId()+":date:"+LocalDate.now().toString()+":stock");
        
        Map<String, Object> args = new HashMap<>();
        args.put("buyNum", 5);
        args.put("idCard", dto.getIdCard());
        args.put("dates", dateList5());
        
        List result = redisDao.executeScript(deductMyStock, keys, args);
        Long errCode = (Long)result.get(0);
        String errMsg = (String)result.get(1);
        
        //预约,扣库存。
        if (errCode.longValue()==0) {

            //用rocketmq异步写预约记录
            sendAppointmentToMq(appointDetail);

            return "预约成功";
            
        } else {
            if(errCode.longValue()==1) {
                return "您" + errMsg + "预约过";
            }else if(errCode.longValue()==2) {
                return "网点尚未开放预约,请耐心等待";
            }else if(errCode.longValue()==3) {
                return "库存不足";
            }
            return "预约失败";
        }
    }
    
    /**
     * 异步写预约记录
     * */
    private void sendAppointmentToMq(AppointmentDetail appointDetail) {
        String appointJson = JSON.toJSONString(appointDetail);
        EventMessage eventMsg = new EventMessage();
        eventMsg.setTopic("order");
        eventMsg.setTag("newOrder");
        eventMsg.setMsgBody(appointJson);
        
        generalMqProducer.asyncPublish(eventMsg, new SendCallback() {

            @Override
            public void onSuccess(SendResult sendResult) {
                log.info("预约订单入库消息写入rocketmq成功,消息ID:{}", sendResult.getMsgId());
                
            }

            @Override
            public void onException(Throwable e) {
                //如果与mq通信故障了,那么可以从日志文件里找到预约记录,手工执行写入mysql
                log.error("预约订单写入rocketmq失败:{}, exception detail:{}" , appointJson , e.getMessage());
            }
            
        });
    }
    
    /**
     * 返回从今天还是算往前5天的日期列表
     * */
    private List<String> dateList5(){
        List<String> dates = new ArrayList<>();
        LocalDate today = LocalDate.now();
        for(int i=0; i<5; i++) {
            dates.add(today.minusDays(i).toString());
        }
        return dates;
    }
}

扣库存、进行5日内已预约校验的redis lua脚本:

--[[
    扣减redis库存lua script
    KEYS[1] 库存key名称,例如outlet:1:date:2021-10-01:stock
    ARGV[1] 参数,json字符串
    buyNum表示一次扣多少库存
    idCard表示预约人身份证号
    dates:从当日开始往前倒排5天的日期的一个列表{"2021-10-06","2021-10-05","2021-10-04","2021-10-03","2021-10-02"}
    
    返回 {int, string} 
    0成功, 1已经5天内预约过, 2网点尚未开放, 3库存不足 
]]

local stock_key = KEYS[1]
local args = ARGV[1]

redis.log(redis.LOG_NOTICE, stock_key)
redis.log(redis.LOG_NOTICE, args)

local args_json =  cjson.decode(args)
local buy_num = args_json.buyNum
local id_card = args_json.idCard
local dates = args_json.dates

--查询该身份证是否已预约过, 5日内
for _,v in pairs(dates) do
    local is= redis.call("sismember", "appointment:idNos:" .. v, id_card)
    if is==1 then
        return {1, v} --返回在哪天预约过
    end
end

--扣库存
local current_stock = redis.call("get", stock_key)
--redis.log(redis.LOG_NOTICE, type(current_stock))

if not current_stock then
    return {2, "该网点尚未正式开放预约"}
end

if tonumber(current_stock) >= buy_num then
        redis.call("set", stock_key, tonumber(current_stock) - buy_num) --库存减去buy_num
        redis.call("sadd", "appointment:idNos:" .. dates[1], id_card) --把身份证号写入当日预约记录
        return {0, "ok"}
end

return {3, "库存不足"} --库存不足

异步服务批量处理一次从RocketMQ轮询到的订单、批量入库:

//注册consumer,并使其订阅相应的topic、tag
private void registConsumer(MQMsgHandler msgHandler, String consumerGroup, String topic, String tag) {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup, getAclRPCHook(), new AllocateMessageQueueAveragely());
    try {
        consumer.setNamesrvAddr(mqurl);
        consumer.setConsumeThreadMin(consumeThreadCorePoolSize);
        consumer.setConsumeThreadMax(consumeThreadCorePoolSize);
        consumer.setPullBatchSize(32); //一次长轮询最多从mq里拿多少个消息,默认32
        consumer.subscribe(topic, tag);
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                List<EventMessage> eventMsgs = new ArrayList<>();
                String msgContent = null;
                try {
                    for(MessageExt msg : msgs) {
                        msgContent = new String(msg.getBody(),"utf-8");
                        EventMessage eventMsg = JSON.parseObject(msgContent, EventMessage.class);
                        log.debug(JSON.toJSONString(eventMsg));
                        eventMsgs.add(eventMsg);
                    }
                    
                    msgHandler.handleMsg(eventMsgs);    //批量处理本次拉取的消息,执行业务逻辑
                } catch (UnsupportedEncodingException e) {
                    e.printStackTrace();
                    log.error("消息编码错误:" + e.getMessage(), e);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
                
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        consumer.start();
    }catch(Exception e) {
        log.error("注册消费者出错" + e.getMessage(), e);
        
    }
}

批量入库:

@Slf4j
@Component
@MsgConsumer(consumerGroup = "newOrder-consumer-group", tag = "newOrder", topic = "order")
public class NewOrderMsgHandler implements MQMsgHandler{
    
    @Autowired
    private AppointmentDetailRepository appointmentDetailRepository;

    @Override
    public void handleMsg(List<EventMessage> eventMessages) {
        
        log.debug("收到mq消息: {}", JSON.toJSONString(eventMessages));
        
        List<AppointmentDetail> appointmentDetails = new ArrayList<>();
        for(EventMessage eventMsg : eventMessages) {
            AppointmentDetail appointmentDetail = JSON.parseObject(eventMsg.getMsgBody(), AppointmentDetail.class);
            appointmentDetails.add(appointmentDetail);
        }

        appointmentDetailRepository.saveAll(appointmentDetails); //批量入库
    }

}

五、改进与优势

相比V1版,相当于把原来本地java内存里的操作搬到redis上,然后一些个接口服务提前:由openresty调用redis,把一些业务逻辑直接在负载均衡层做掉。

V2版的优势还在于可以横向扩展算力来增加整体系统的性能。其实如果v1版单机承受范围内的话,直接读写本地内存不见得比v2版性能差、可能还略好。但是请求量再大的话,v1就没办法了单机算力配置是有限的,而v2的优势就发挥出来了、因为可以继续扩容算力,也就是说是可以横向扩展的架构。

相关文章

网友评论

    本文标题:使用OpenResty+Redis+RocketMQ构建秒杀系统

    本文链接:https://www.haomeiwen.com/subject/xgulaltx.html