并发下资源的访问控制

作者: 大蟒传奇 | 来源:发表于2016-11-04 17:02 被阅读569次

    背景

    在开发微信公众号的时候,会和access_token打交道,参照微信的文档

    access_token是公众号的全局唯一票据,公众号调用各接口时都需使用access_token。开发者需要进行妥善保存。access_token的存储至少要保留512个字符空间。access_token的有效期目前为2个小时,需定时刷新,重复获取将导致上次获取的access_token失效。

    按照文档上推荐的做法,需要一个用来获取和刷新access_token的中控服务器,其他业务逻辑服务器所使用的access_token均来自该中控服务器。

    出于安全的考虑,微信对于获取access_token的调用有一定的次数限制,超过这个限制,就无法再刷新token。

    问题

    在实际开发中,中控服务器的做法如下图

    请求access token流程

    假设这样一种情景,在redis中缓存的token刚好过期时,第三方向中控服务器同时发送了大量的请求。为了让问题简化,这里假设收到了A和B两条请求。

    中控服务收到请求A时,查询缓存,没有命中,于是调用微信api,重新获取token,然后写入缓存,实测这个过程大概需要0.1到0.2秒(这个值和所处的网络环境也有关系)。在请求A将token写入缓存前,请求B来了,查询redis,也没有命中,也会调用微信的api来重新获取token。

    实际上在业务中,只需要调用一次微信api来获取token即可。可是在上面的例子中,却调用了两次。如果并发量足够大,让中控服务反复去调用微信的api,很有可能就会超出微信的限制,一旦这种情况发生,对于业务的运营将是灾难性的。

    测试

    为了说明上面的问题,笔者编写了一个小的例子来模拟这种情况。
    服务端采用Django,客户端使用go语言来高并发调用服务端的接口。

    服务端

    服务端代码,这里只是列出关键代码,其他一些配置项之类的代码在这里略过不计。
    创建项目

    django-admin startproject accessTokenTest
    python manage.py startapp index
    

    编写返回token的api

    # view函数
    def index(request):
        cache.incr(settings.CounterKey)
        token = cache.get(settings.TokenKey)
        if token is None:
            token = create_access_token()
            cache.set(settings.TokenKey, token, 5 * 60)
        return HttpResponse(json.dumps({'token': token}), content_type='text/json')
            
    # 模拟调用微信api生成access token
    def create_access_token():
        time.sleep(0.3)
        cache.incr(settings.CreateKey)
        return str(uuid4())
    

    测试的例子采用redis作为缓存,通过sleep来模拟一个网络请求,并且将请求的次数和生成token的次数存在redis里,便于我们得到测试结果。

    使用gunicorn,启用4个进程来模拟服务端

    gunicorn accessTokenTest.wsgi --workers 4
    [2016-11-04 13:04:29 +0800] [12720] [INFO] Starting gunicorn 19.6.0
    [2016-11-04 13:04:29 +0800] [12720] [INFO] Listening at: http://127.0.0.1:8000 (12720)
    [2016-11-04 13:04:29 +0800] [12720] [INFO] Using worker: sync
    [2016-11-04 13:04:29 +0800] [12723] [INFO] Booting worker with pid: 12723
    [2016-11-04 13:04:29 +0800] [12724] [INFO] Booting worker with pid: 12724
    [2016-11-04 13:04:29 +0800] [12725] [INFO] Booting worker with pid: 12725
    [2016-11-04 13:04:29 +0800] [12726] [INFO] Booting worker with pid: 12726
    

    客户端通过GET请求http://127.0.0.1:8000 来请求token

    客户端

    客户端代码如下

    // filename accessToken.go
    
    package main
    
    import(
        "net/http"
        "encoding/json"
    )
    
    type AccessToken struct {
        Token string
    }
    
    func main(){
        channel := make(chan error)
        for ;;{
            token := new(AccessToken)
            go func(){
                channel <- getJson("http://127.0.0.1:8000", token)
            }()
    
        }
    }
    
    func getJson(url string, target interface{}) error {
        r, err := http.Get(url)
        if err != nil {
            return err
        }
        defer r.Body.Close()
        return json.NewDecoder(r.Body).Decode(target)
    }
    
    

    编译后生成可执行文件accessToken

    在测试开始前,启动redis服务,设置对应的key

    127.0.0.1:6379[1]> persist ":1:counter"
    (integer) 0
    127.0.0.1:6379[1]> persist ":1:create"
    (integer) 0
    

    启动客户端,进行测试,运行一段时间后,手动杀死

    ./tokenTest
    ^C
    

    查看测试数据,可以看到在测试的时间内,服务端一共收到了客户端1522次请求,4次生成了新的token。

    127.0.0.1:6379[1]> get ":1:create"
    "4"
    127.0.0.1:6379[1]> get ":1:counter"
    "1522"
    

    分析问题

    这个场景要求获取access token这个操作必须是原子的。
    可以进一步得抽象为在某段时间内对"access_token"这个资源只能有一个进程进行访问。

    解决方法

    说到原子操作,笔者第一反应就是信号量。下面我们将使用信号量来解决这个问题。
    采用posix_ipc模块,只修改服务端的代码

    为了确保每次运行项目,信号量的状态保持一致,修改index/apps.py这个文件,在启动时初始化信号量。

    服务端v2

    from posix_ipc import Semaphore, ExistentialError, O_CREAT
    
    class IndexConfig(AppConfig):
        name = 'index'
    
        def ready(self):
            try:
                sem = Semaphore(settings.TokenSemaphoreName)
                sem.unlink()
            except ExistentialError:
                pass
            finally:
                Semaphore(settings.TokenSemaphoreName, flags=O_CREAT, initial_value=1)
    

    修改视图函数,如下

    def index(request):
        cache.incr(settings.CounterKey)
        sem = Semaphore(settings.TokenSemaphoreName)
        sem.acquire()
        token = cache.get(settings.TokenKey)
        if token is None:
            token = create_access_token()
            cache.set(settings.TokenKey, token, 5 * 60)
        sem.release()
    
        return HttpResponse(json.dumps({'token': token}), content_type='text/json')
    

    测试2

    在测试前,清空之前的数据,并删除缓存的token

    127.0.0.1:6379[1]> del ":1:token"
    (integer) 1
    127.0.0.1:6379[1]> set ":1:counter" 0
    OK
    127.0.0.1:6379[1]> set ":1:create" 0
    OK
    

    和之前一样启动服务端和客户端,在运行一段时间后,退出客户端。
    查看结果,可以看到客户端请求了980次,服务端只生成了一次token,这个结果正是我们想要的。

    127.0.0.1:6379[1]> get ":1:counter"
    "980"
    127.0.0.1:6379[1]> del ":1:create"
    (integer) 1
    

    多主机场景

    看起来问题好像得到解决了?并没有!

    在实际的生产环境中,为了保持服务的高可用,经常会使用负载均衡这样的技术。

    负载均衡

    在这样的场景下使用上面的方案,每台服务器都会生成自己的信号量,在高并发的情况下依然会出现多次请求access token的情况。

    测试

    这里使用nginx来实现负载均衡,使用docker来模拟多主机。

    nginx的相关配置如下

    ...
    upstream back {
        server 127.0.0.1:8080;
        server 127.0.0.1:8087;
    }
    ...
    ...
    location / {
         proxy_pass http://back;
    }
    ...
    

    启动container

    CONTAINER ID        IMAGE                    COMMAND                  CREATED             STATUS                   PORTS                    NAMES
    c845d3123a08        python:2.7               "python2"                40 minutes ago      Up 10 minutes            0.0.0.0:8080->8000/tcp   python
    

    在container中启动线程

    gunicorn accessTokenTest.wsgi --workers 4 -b 0.0.0.0:8000
    

    同时在宿主机上也启动线程

    gunicorn accessTokenTest.wsgi --workers 4 -b 0.0.0.0:8087
    

    和之前一样,测试前清除缓存中的token,并将counter和create设置为0

    在宿主机上启动客户端,在token缓存失效前断掉
    查看结果,可以看到客户端一共请求了2062次,生成了2次token,和预期的一致。

    127.0.0.1:6379[1]> get ":1:create"
    "2"
    127.0.0.1:6379[1]> get ":1:counter"
    "2062"
    

    在多主机的情况下,如果要确保请求access token的原子性,需要一种“分布式锁”。

    新的解决方案

    采用redis来辅助实现分布式锁。尽管有着一定的争论,但是能满足现在的需求。

    实现的算法来自redis作者的文章,这里直接采用redlock-py

    服务端v3

    def index(request):
        cache.incr(settings.CounterKey)
        dlm = Redlock([{"host": "your-host-ip", "port": 6379, "db": 0}, ])
        my_lock = dlm.lock("my_resource_name", 1000)
        token = cache.get(settings.TokenKey)
        
        if token is None:
            token = create_access_token()
            cache.set(settings.TokenKey, token, 5 * 60)
        dlm.unlock(my_lock)
    
        return HttpResponse(json.dumps({'token': token}), content_type='text/json')
    

    测试3

    测试环境和之前一样。更新代码后,重启启动服务端,处理之前的redis缓存

    启动客户端一段时间后断掉。
    查看测试结果, 客户端一共请求了88次,生成了1次token,和预期也是一致的

    127.0.0.1:6379[1]> get ":1:counter"
    "88"
    127.0.0.1:6379[1]> get ":1:create"
    "1"
    

    相关文章

      网友评论

        本文标题:并发下资源的访问控制

        本文链接:https://www.haomeiwen.com/subject/hgtwuttx.html