美文网首页
GO在秒杀场景下的并发问题(库存超卖)

GO在秒杀场景下的并发问题(库存超卖)

作者: Bill_Li_GB | 来源:发表于2020-03-16 15:42 被阅读0次

先来一下小概念

CAP定理

任何一个分布式系统都无法同时满足一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance),最多只能同时满足两项

《在多线程的软件世界里,对共享资源的争抢过程(Data Race)就是并发》

《而对共享资源数据进行访问保护的最直接办法就是引入锁!》

那么库存超卖是如何产生

比如下单时由于以下的情况导致了实际库存只有1000件商品,却出现了两笔999件商品的订单。1000件卖出了1998件商品,那么多出来的998件商品怎么办呢,没货可发了。此时多出来的就是超卖现象。所以看看以下两种情况。


超卖现象

以上一种情况库存出现负数,一个出现实际售出数量已经大于现有库存了。如果并发足够大,100个人都购买10份,并发发生。那么库存只有15的,就会卖出1000份。应该能说明问题了。

如何解决

  • 库存超卖问题是有很多种技术解决方案的,比如悲观锁,分布式锁,乐观锁,队列串行化,Redis原子操作,等等一下子想到了很多解决方案。

  • 高并发秒杀场景下的库存超卖解决方案有那么多,各种方案的优缺点以及实践如何呢?

一、分布式锁

分布式锁应该是怎么样的

基本原理:用一个状态值表示锁,对锁的占用和释放通过状态值来标识。

  • 可以保证在分布式部署的应用集群中,同一个方法在同一时间只能被一台机器上的一个线程执行。
  • 这把锁要是一把可重入锁(避免死锁)
  • 这把锁最好是一把阻塞锁(根据业务需求考虑要不要这条)
  • 有高可用的获取锁和释放锁功能
  • 获取锁和释放锁的性能要好

线程锁:主要用来给方法、代码块加锁。当某个方法或代码使用锁,在同一时刻仅有一个线程执行该方法或该代码段。线程锁只在同一JVM中有效果,因为线程锁的实现在根本上是依靠线程之间共享内存实现的,比如synchronized是共享对象头,显示锁Lock是共享某个变量(state)。
进程锁:为了控制同一操作系统中多个进程访问某个共享资源,因为进程具有独立性,各个进程无法访问其他进程的资源,因此无法通过synchronized等线程锁实现进程锁。
分布式锁:当多个进程不在同一个系统中,用分布式锁控制多个进程对资源的访问。

分布式锁设计考虑维度

单点问题:单点问题又称为单点故障,(HA/keepalived)主备模式\主从模式
失效时间:一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁
非阻塞的:没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
非重入的:同一个线程在没有释放锁之前无法再次获得该锁。(主机信息和线程信息)

分布式式锁的实现方式

  • 基于数据库实现分布式锁
    利用数据库的排它锁
    做了唯一性约束,这里如果有多个请求同时提交到数据库的话,
    数据库会保证只有一个操作可以成功,那么我们就可以认为操作成功的那个线程获得了该方法的锁

操作简单、容易理解。但是存在单点问题、数据库性能开销较大、不可重入。

问题

1、锁强依赖数据库的可用性,数据库是一个单点,一旦数据库挂掉,会导致业务系统不可用。
2、锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在数据库中,其他线程无法再获得到锁。
3、锁只能是非阻塞的,因为数据的insert操作,一旦插入失败就会直接报错。没有获得锁的线程并不会进入排队队列,要想再次获得锁就要再次触发获得锁操作。
4、这把锁是非重入的,同一个线程在没有释放锁之前无法再次获得该锁。因为数据中数据已经存在了。

  • 基于缓存实现分布式锁
    Redis的原子特性:
    Setnx:若给定的key已经存在,则SETNX不做任何动作。
    Lpush:如果key不存在,能插入一个或多个值。如果key存在则返回错误在key对应list的头部添加字符串元素。
    Lrem:根据参数COUNT的值,移除列表中与参数VALUE相等的元素。
    Llen:命令用于返回列表的长度。如果列表key不存在,则key被解释为一个空列表,返回0 。如果key不是列表类型,返回一个错误。

问题

1、锁没有失效时间,一旦解锁操作失败,就会导致锁记录一直在缓存中,其他线程无法再获得到锁。
2、锁只能是非阻塞的,无论成功还是失败都直接返回。
3、锁是非重入的,一个线程获得锁之后,在释放锁之前,无法再次获得该锁,因为使用到的key在缓存中已经存在。无法再执行put操作。

/**
 * 基于Redis实现分布式锁实现
 * 
 * 1、 采用Redis集群模式,避免单点故障问题。
 * 单例模式采用双重锁机制保障Reids集群实例线程安全,避免并发量大的时候消耗内存和CPU资源。
 * vehicle-common. JedisClusterFactory只支持Redis集群模式。
 * 2、 锁失效机制,尝试获取锁机制,死锁防止机制,避免锁操作失败影响其他线程。
 * 3、 线程获取锁排队机制,锁获取获取数限制机制,尝试获得锁机制,基于等待阻塞。
 * 4、 全局唯一事件码,标识同一个线程,解决锁可重入。
 * 获取唯一的事件编码规则:8位长度的随机字符串+YYYYMMDDHHSSmm+并发计数器。
 * 持有锁,且可重新获得锁。
 *
 */

    /**
     * 在时间范围内尝试获取锁,超时则放弃返回失败。
     * 
     * @param timeout
     * @param unit
     * @return
     */
    @Override
    public boolean tryLock(long timeout, TimeUnit unit) {
        boolean holdLock = true;
        //获取redis集群连接
        jedisCluster = coordinator.getJedisCluster();
        try {
            Long waitCount = jedisCluster.lpush(lockName, eventId);
            jedisCluster.expire(lockName, maxLiveSeconds);

            if (waitCount == 1) {
                return holdLock;
            }

            if (waitCount > maxWaitLimit) {
                holdLock = false;
                throw new LockException(LOCK_ERROR_CODE,
                        String.format("RedisDistributeLock Lock[%s] Too many wait", lockName));
            }

            if (timeout == 0) {
                holdLock = false;
                return holdLock;
            }

            holdLock = coordinator.await(lockName, eventId, unit.toMillis(timeout));

            if (!holdLock) {
                throw new LockException(LOCK_ERROR_CODE,
                        String.format("RedisDistributeLock Lock[%s] Timeout", lockName));
            }
            return holdLock;
        } finally {
            if (!holdLock) {
                jedisCluster.lrem(lockName, 1, eventId);
            }
        }
    }

    /**
     * 释放锁
     */
    @Override
    public void unlock() {
        if (Objects.isNull(jedisCluster)) {
            throw new LockException(LOCK_ERROR_CODE, "cann't unlock,because Lock not found ,redis conn is null");
        }
        try {
            Long lrem = jedisCluster.lrem(lockName, 1, eventId);
            LOGGER.debug("unlock lockName:{},eventId:{},lrem:{}", lockName, eventId, lrem);
            coordinator.notifyNext(jedisCluster, lockName);
        } finally {

        }
    }
  • 基于Zookeeper实现分布式锁


    image.png
/**
     * zookeeper节点的监视器
     */
    public void process(WatchedEvent event) {
        if (this.latch != null) {
            if (event.getType() == EventType.NodeDeleted) {
                this.latch.countDown();
            }
        }
    }

    public void lock() {
        try {
            if (this.tryLock()) {
                return;
            } else {
                waitForLock(waitNode, sessionTimeout);
            }
        } catch (KeeperException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        } catch (InterruptedException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        }
    }

    public boolean tryLock() {
        try {
            // 创建临时子节点
            myZnode = zk.create(ROOT_PATH + "/" + lockName + LOCK_KEY_SUFFIX, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            // 取出所有子节点
            List<String> subNodes = zk.getChildren(ROOT_PATH, false);
            if (subNodes.size() == 1) {
                System.out.println("get lock");
                return true;
            }
            // 取出所有lockName的锁
            List<String> lockObjNodes = new ArrayList<String>();
            for (String node : subNodes) {
                if (node.split(LOCK_KEY_SUFFIX)[0].equals(lockName)) {
                    lockObjNodes.add(node);
                }
            }
            Collections.sort(lockObjNodes);
            // 如果是最小的节点,则表示取得锁
            if (myZnode.equals(ROOT_PATH + "/" + lockObjNodes.get(0))) {
                return true;
            }
            // 如果不是最小的节点,找到比自己小1的节点
            String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
            waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
        } catch (KeeperException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        } catch (InterruptedException e) {
            throw new LockException(LOCK_ERR_EXCE, e);
        }
        return false;
    }

    public boolean tryLock(long time, TimeUnit unit) {
        try {
            if (this.tryLock()) {
                return true;
            }
            return waitForLock(waitNode, time);
        } catch (Exception e) {
            e.printStackTrace();
        }
        return false;
    }

    private boolean waitForLock(String waitNode, long waitTime) throws InterruptedException, KeeperException {
        Stat stat = zk.exists(ROOT_PATH + "/" + waitNode, true);
        // 判断比自己小一个数的节点是否存在,如果不存在则无需等待锁
        if (stat != null) {
            this.latch = new CountDownLatch(1);
            this.latch.await(waitTime, TimeUnit.MILLISECONDS);
            this.latch = null;
        }
        return true;
    }

    public void unlock() {
        try {
            zk.delete(myZnode, -1);
            myZnode = null;
            zk.close();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }

通过有序临时节点实现锁机制,自己对应的节点需要最小,则被认为是获得了锁。优点:集群可以透明解决单点问题,避免锁不被释放问题,同时锁可以重入。缺点:性能不如缓存方式,吞吐量会随着zk集群规模变大而下降。

三种分布式实现方式对比

数据库分布式锁实现缺点:
1.db操作性能较差,并且有锁表的风险
2.非阻塞操作失败后,需要轮询,占用cpu资源;
3.长时间不commit或者长时间轮询,可能会占用较多连接资源

Redis(缓存)分布式锁实现缺点:
1.锁删除失败 过期时间不好控制
2.非阻塞,操作失败后,需要轮询,占用cpu资源;

ZK分布式锁实现缺点:
性能不如redis实现,主要原因是写操作(获取锁释放锁)都需要在Leader上执行,然后同步到follower。

  • 从理解的难易程度角度(从低到高)数据库 > 缓存 > Zookeeper从实现的复杂性角度 (从低到高)
    Zookeeper >= 缓存 > 数据库
  • 从性能角度(从高到低)缓存 > Zookeeper >= 数据库�从可靠性角度 (从高到低)
    Zookeeper > 缓存 > 数据库

《分布式锁这个东西保证了数据的准确性,但是他天然并发能力有点弱,在高并发场景下如何优化分布式锁的并发性能》

二、分布式锁的优化,或者解决超卖的其他方案

分析分布式锁在高并发场景下的问题

  • 分布式锁一旦加锁之后,对所有客户端都必须对同一个商品的库存锁进行加锁。这样会导致该商品库存被强制串行化执行,一个接一个的处理。
  • 看看处理性能:假如加锁之后,释放锁之前,流程是查库存->创建订单->扣减库存,这个单业务处理能力应该20毫秒左右。
  • 那么1秒1000毫秒,只能容纳50个这样的商品请求串行处理。那相当于1秒钟只能秒杀50个请求,且是一个一个来。这样性能瓶颈是很大的。影响业务体验和服务器性能。

从上面的流程可以知道通过分布式锁解决超卖问题是存在很大缺陷的,同一个商品多用户同时下单的时候,会基于分布式锁串行化处理,导致没法同时处理同一个商品的大量下单请求。

所以分布式锁适合普通电商,低并发、秒杀这种场景就显得不太适合了。

如何对分布式锁进行高并发优化

锁数据分段方式:思路是把数据分成若干段,每个段是一个单独的锁,所以多个线程并发修改数据的时候,可以并发的修改不同段的数据。
如果分成20段,那么1秒就可以处理 20 * 50 = 1000个请求了。

  • 数据分段的不足和留意点
    1、如果一个分段库存不足,要立即释放,再次尝试获得其他分段的锁
    2、实现起来非常的不方便,有点复杂。
    3、需要自己实现优化逻辑,有点工作量,非常麻烦。

三、队列串行化

  • 将存库从MySQL前移到Redis中,所有的写操作放到内存中,由于Redis中不存在锁故不会出现互相等待,并且由于Redis的写性能和读性能都远高于MySQL,这就解决了高并发下的性能问题。然后通过队列等异步手段,将变化的数据异步写入到DB中。

  • 引入队列,然后将所有写DB操作在单队列中排队,完全串行处理。当达到库存阀值的时候就不在消费队列,并关闭购买功能。这就解决了超卖问题。
    优点:解决超卖问题,略微提升性能。
    缺点:性能受限于队列处理机处理性能和DB的写入性能中最短的那个,另外多商品同时抢购的时候需要准备多条队列。

  • 将提交操作变成两段式,先申请后确认。然后利用Redis的原子自增操作(相比较MySQL的自增来说没有空洞),同时利用Redis的事务特性来发号,保证拿到小于等于库存阀值的号的人都可以成功提交订单。然后数据异步更新到DB中。
    优点:解决超卖问题,库存读写都在内存中,故同时解决性能问题。
    缺点:由于异步写入DB,可能存在数据不一致。另可能存在少买,也就是如果拿到号的人不真正下订单,可能库存减为0,但是订单数并没有达到库存阀值。

以上是一些优化高并发场景下解决超卖的一些思路。

四、其他参考

《如何解决秒杀的性能问题和超卖的讨论》
以下链接很有参考价值,可以仔细看看
https://www.jianshu.com/p/ed4f75ef2213

相关文章

网友评论

      本文标题:GO在秒杀场景下的并发问题(库存超卖)

      本文链接:https://www.haomeiwen.com/subject/zccmshtx.html