美文网首页技术进阶
MMKV的原理与实现(三)

MMKV的原理与实现(三)

作者: PanGeng | 来源:发表于2020-01-10 17:26 被阅读0次

    MMKV的原理与实现(三)

    MMKV多线程设计

    1. C/C++中的线程:POSIX线程

    POSIX,全称为可移植性操作系统接口。它包括了系统应用程序接口(简称API)。该标准的目的是定义了标准的基于UNIX操作系统的系统接口和环境来支持源代码级的可移植性,致力于提供基于不同语言的规范。POSIX的线程标准,定义了创建和操纵线程的一套API。

    2. 基本使用

    #include <iostream>
    #include <queue>
    #include <pthread.h>
    void *run(void* args) {
        //异步方法
        int i = *(int*)i; // 10
        cout << "i === " << i << endl;
        return nullptr;
    }
    int main() {
        int i = 10;
        pthread_t pid;
        pthread_create(&pid, nullptr, run, &i);
        pthread_join(pid, nullptr);
        return 0;
    }
    

    上面运行会将10打印出来,pthread_create中有4个参数:

    1. pid, 线程id,传入pthread_t类型的地址引用
    2. attr, 这个是线程的类型,后面会详细讲到
    3. 相当于回调函数
    4. 将值传入到回调函数中

    3.线程同步

    同java一样,多线程同时读写同一份共享资源的时候,可能会引起冲突。需要引入线程“同步”机制,即各线程之间有序地对共享资源进行操作。

    拿以下举例:

    queue<int> q;
    void *pop(void *args) {
    
        if (!q.empty()) {
            printf("取出数据:%d\n", q.front());
            q.pop();
        } else {
            printf("无数据\n");
        }
    
        return 0;
    }
    
    int main() {
        for (size_t i = 0; i < 5; i++) {
            q.push(i);
        }
        pthread_t pid[10];
        for (size_t i = 0; i < 10; i++) {
            pthread_create(&pid[i], 0, pop, &q);
        }
        return 0;
    }
    
    在这里插入图片描述

    我们创建了5个队列,然后创建10个线程分别从队列里面取值,可以看到前3个线程都取了0,这就是线程不同步导致的问题,那么怎么解决呢?

    互斥量

    首先要了解一下互斥量(pthread_mutex_t)的概念:互斥量就是一把锁。 当一个线程要访问一个共享变量时,先用锁把变量锁住,操作完了之后再释放掉锁。
    当另一个线程也要访问这个变量时,发现这个变量被锁住了,此时无法访问,一直等待直到锁没了,才能够上锁与使用。
    使用互斥量前要先初始化,使用的函数如下:

    queue<int> q;
    
    pthread_mutex_t mutex; //加入互斥量:锁
    void *pop(void *args) {
        // 锁
        pthread_mutex_lock(&mutex);
        if (!q.empty()) {
            printf("取出数据:%d\n", q.front());
            q.pop();
        } else {
            printf("无数据\n");
        }
        // 放
        pthread_mutex_unlock(&mutex);
        return 0;
    }
    
    int main() {
        //初始化互斥锁
        pthread_mutex_init(&mutex, 0);
        for (size_t i = 0; i < 5; i++) {
            q.push(i);
        }
        pthread_t pid[10];
        for (size_t i = 0; i < 10; i++) {
            pthread_create(&pid[i], 0, pop, &q);
        }
        //需要释放
        pthread_mutex_destroy(&mutex);
        return 0;
    }
    

    在以上的例子中,我们加入互斥量,这样打印出来就正常了。但是pthread_mutex_t锁是默认是非递归的,即不可重入锁。如果一个线程多次获取同一个非递归锁,则会产生死锁:

    (以下代码在win编辑器中可能不会报错,但是程序并不会正常结束)

    queue<int> q;
    
    pthread_mutex_t mutex; //加入互斥量:
    
    void test(){
        pthread_mutex_lock(&mutex); //线程阻塞,死锁
        printf("队列大小:%d\n", q.size());
        pthread_mutex_unlock(&mutex);
    }
    
    void *pop(void *args) {
        // 锁
        pthread_mutex_lock(&mutex);
        if (!q.empty()) {
            printf("取出数据:%d\n", q.front());
            q.pop();
        } else {
            printf("无数据\n");
        }
        // test中也锁了一下,这样就产生了死锁
        test();
        // 放
        pthread_mutex_unlock(&mutex);
        return 0;
    }
    
    int main() {
        //初始化互斥锁
        pthread_mutex_init(&mutex, 0);
        for (size_t i = 0; i < 5; i++) {
            q.push(i);
        }
        pthread_t pid[10];
        for (size_t i = 0; i < 10; i++) {
            pthread_create(&pid[i], 0, pop, &q);
        }
        //需要释放
        pthread_mutex_destroy(&mutex);
        return 0;
    }
    

    pthread_mutex_init(&mutex, 0);

    互斥量初始化有两个参数,如果要让互斥量成为一把可重入锁,需要在初始化的时候设置attr的类型:

    // 锁的属性 : pthread_mutex_t锁默认是非递归的
    pthread_mutexattr_t attr;
    pthread_mutexattr_init(&attr);
    // 设置为递归锁
    pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    // 初始化mutex
    pthread_mutex_init(&mutex, &attr);
    // 完成初始化后即可释放
    pthread_mutexattr_destroy(&attr);
    

    这样初始化test中的打印也就正常了。

    4. 条件变量(干货,不感兴趣可跳过)

    条件变量是线程间进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起;另一个线程使"条件成立",从而唤醒挂起线程

    举个例子:

    template <class T>
    class SafeQueue {
        public:
            SafeQueue() {
                pthread_mutex_init(&mutex,0);
            }
            ~SafeQueue() {
                pthread_mutex_destory(&mutex);
            }
            void enqueue(T t) {
                pthread_mutex_lock(&mutex);
                q.push(t);
                pthread_mutex_unlock(&mutex);
            }
            int dequeue(T& t) {
                pthread_mutex_lock(&mutex);
                if (!q.empty())
                {
                    t = q.front();
                    q.pop();
                    pthread_mutex_unlock(&mutex);
                    return 1;
                }
                pthread_mutex_unlock(&mutex);
                return 0;
            }
        private:
            queue<T> q;
            pthread_mutex_t mutex;
    };
    

    上面的模板类存放数据T,并使用互斥锁保证对queue的操作是线程安全的。这就是一个生产/消费模式。如果需要在取出数据的时候,queue为空,则一直等待,直到下一次enqueue加入数据。此时可以加入条件变量使 “dequeue” 挂起,直到由其他地方唤醒:

    #include <queue>
    using namespace std;
    template <class T>
    class SafeQueue {
        queue<T> q;
        pthread_mutex_t mutex;
        pthread_cond_t cond; //条件变量
        public:
            SafeQueue() {
                pthread_mutex_init(&mutex,0);
                pthread_cond_init(&cond, 0); //初始化
            }
            ~SafeQueue() {
                pthread_mutex_destory(&mutex);
                pthread_cond_destory(&cond); //销毁
            }
            void enqueue(T t) {
                pthread_mutex_lock(&mutex);
                q.push(t);
                //发出信号 通知挂起线程
                //1、由系统唤醒一个线程(随机)
                //pthread_cond_signal(&cond);
                //2、广播 唤醒所有等待条件线程
                pthread_cond_broadcast(&cond);
                pthread_mutex_unlock(&mutex);
            }
            int dequeue(T& t) {
                pthread_mutex_lock(&mutex);
                //可能因为某些特殊条件虚假唤醒 所以while循环等待唤醒。(与Java的wait一样)
                while (q.empty())
                {
                    pthread_cond_wait(&cond, &mutex); //等待并自动释放互斥锁
                }
                t = q.front();
                q.pop();
                pthread_mutex_unlock(&mutex);
                return 1;
            }
    };
    

    比如存在三个线程,分别为:生产者P、消费者C1与C2。

    1、C1从队列中取出数据,此时队列为空;
    2、C2也想从队列中获取一个元素,但此时队列为空,C2进入阻塞(cond.wait()),等待队列非空;
    3、 P将一个元素入队,并唤醒条件变量;
    4、C1与C2接收到唤醒信号,解除阻塞状态,上锁并获取队列中的元素;
    5、C2优先获取到锁,移除队列元素并释放锁;
    6、C1此时操作的队列为空,被虚假唤醒。

    5. 锁的自动管理

    每次创建一个线程时,都需要初始化(init)和销毁(destroy), 使用时也需要锁(lock)和解锁(unlock)。为了防止忘记手动销毁和解锁,同时方便使用,MMKV利用了C++中对象的构造函数和析构函数进行了封装,实现锁的自动管理。

    这里我们看一下源码: ThreadLock.cpp

    #include "ThreadLock.h"
    #include "MMKVLog.h"
    
    ThreadLock::ThreadLock() {
        // 创建一把递归锁
        pthread_mutexattr_t attr;
        pthread_mutexattr_init(&attr);
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
    
        pthread_mutex_init(&m_lock, &attr);
    
        pthread_mutexattr_destroy(&attr);
    }
    
    ThreadLock::~ThreadLock() {
        pthread_mutex_destroy(&m_lock);
    }
    
    void ThreadLock::lock() {
        auto ret = pthread_mutex_lock(&m_lock);
        if (ret != 0) {
            // 失败
            MMKVError("fail to lock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
        }
    }
    
    bool ThreadLock::try_lock() {
        auto ret = pthread_mutex_trylock(&m_lock);
        if (ret != 0) {
            MMKVError("fail to try lock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
        }
        return (ret == 0);
    }
    
    void ThreadLock::unlock() {
        auto ret = pthread_mutex_unlock(&m_lock);
        if (ret != 0) {
            MMKVError("fail to unlock %p, ret=%d, errno=%s", &m_lock, ret, strerror(errno));
        }
    }
    

    这个类只是对锁进行了封装,使用的时候还需要手动调用lock()和unlock()方法。我们是不是可以把这一步也省略掉呢?当然可以,我们来对这把锁进行二次封装:

    ScopedLock.cpp源码

    #ifndef MMKV_SCOPEDLOCK_HPP
    #define MMKV_SCOPEDLOCK_HPP
    
    #include "MMKVLog.h"
    
    template <typename T>
    class ScopedLock {
        T *m_lock;
    
        // just forbid it for possibly misuse
        ScopedLock(const ScopedLock<T> &other) = delete;
    
        ScopedLock &operator=(const ScopedLock<T> &other) = delete;
    
    public:
        // 构造方法中上锁
        ScopedLock(T *oLock) : m_lock(oLock) {
            assert(m_lock);
            lock();
        }
        // 析构方法解除锁
        ~ScopedLock() {
            unlock();
            m_lock = nullptr;
        }
    
        void lock() {
            if (m_lock) {
                m_lock->lock();
            }
        }
    
        bool try_lock() {
            if (m_lock) {
                return m_lock->try_lock();
            }
            return false;
        }
    
        void unlock() {
            if (m_lock) {
                m_lock->unlock();
            }
        }
    };
    
    //宏函数 __COUNTER__:初始值为0,编译单元内每出现一次出现该宏,便会加1。
    #define SCOPEDLOCK(lock) _SCOPEDLOCK(lock, __COUNTER__)
    #define _SCOPEDLOCK(lock, counter) __SCOPEDLOCK(lock, counter)
    #define __SCOPEDLOCK(lock, counter) ScopedLock<decltype(lock)> __scopedLock##counter(&lock)
    
    //#include <type_traits>
    //#define __SCOPEDLOCK(lock, counter) 
    //decltype:推断变量类型;__scopedLock##counter(&lock):##为连接符ScopedLock<std::remove_pointer<decltype(lock)>::type> __scopedLock##counter(lock)
    
    #endif //MMKV_SCOPEDLOCK_HPP
    

    可以看到,这个类在构造方法中上锁,析构方法中解锁,使用的时候只需要初始化这个类就可以了,函数调用完成之后,会自动调用这个类的析构函数,这样就完成了自动上锁 和解锁的操作。

    为了更加方便,下面定义了一系列的宏函数,最终会调用到ScopedLock<decltype(lock)> __scopedLock##counter(&lock)这个方法。具体可以看下注释。

    使用的时候直接调用SCOPEDLOCK()宏函数即可

    //使用
    ThreadLock lock;
    void test(){
        //创建 ScopedLock<ThreadLock> __scopedLock1对象,使用lock上锁
        SCOPEDLOCK(lock);
        //退出方法 执行__scopedLock1析构,解锁lock
    }
    

    这样自动推导lock的类型并自动lock和unlock。我们在MMKV的源码中也可以找到大量这样的语句:

    在这里插入图片描述

    不得不说C++还真是博大精深呐! 这一篇多线程的处理方案就到此为止了。那么,MMKV的跨进程是如何实现的?

    MMKV多进程设计

    1. 文件锁

    和多线程一个道理,在多个进程同时操作同一份文件的过程中,很容易导致文件中的数据混乱,需要锁操作来保证数据的完整性。 在最新版本的MMKV中使用flock文件锁来完成多进程操作文件的同步

    #include <sys/file.h>
    // Returns 0 on success, or -1 on error
    int flock (int fd, int operation);
    

    flock()系统调用是在整个文件中加锁,通过对传入的fd所指向的文件进行操作,然后在通过operation参数所设置
    的值来确定做什么样的操作。operation可以赋如下值:

    • LOCK_SH,共享锁,多个进程可以使用同一把锁:读锁;

    • LOCK_EX,排他锁,同时只允许一个进程使用:写锁;

    • LOCK_UN,释放锁

    • LOCK_BN,发起非阻塞请求,如:LOCK_SH|LOCK_BN。

    任意数量的进程可同时持有一个文件上的共享锁(读锁),但只能有一个进程能够持有一个文件上的互斥锁(写
    锁)。flock支持锁升级:只有自己进程存在读锁,可以直接升级为写锁,在转换的过程中首先会删除既有的锁,然
    后创建新锁 。若其他进程存在读锁,需要等待释放读锁;

    在设计MMKV中的文件锁需要实现:

    以下内容可见官方文档,比较详细,这里把重要 的知识点罗列出来,具体细节可以去深究以下:

    https://github.com/Tencent/MMKV/wiki/android_ipc

    • 递归锁

      意思是如果一个进程/线程已经拥有了锁,那么后续的加锁操作不会导致卡死,并且解锁也不会导致外层的锁
      被解掉。对于文件锁来说,前者是满足的,后者则不然。因为文件锁是状态锁,没有计数器,无论加了多少
      次锁,一个解锁操作就全解掉。只要用到子函数,就非常需要递归锁。

    • 锁升级/降级

      锁升级是指将已经持有的共享锁,升级为互斥锁,亦即将读锁升级为写锁;锁降级则是反过来。文件锁支持
      锁升级,但是容易死锁:假如 A、B 进程都持有了读锁,现在都想升级到写锁,就会陷入相互等待的困境,产生死锁。另外,由于文件锁不支持递归锁,也导致了锁降级无法进行,一降就降到没有锁。

    为了解决这两个难题,需要对文件锁进行封装,增加读锁、写锁计数器

    • 加写锁时,如果当前已经持有读锁,那么先尝试加写锁(try_lock ),try_lock 失败说明其他进程持有了读
      锁,我们需要先将自己的读锁释放掉,再进行加写锁操作,以避免死锁的发生。
    • 解写锁时,假如之前曾经持有读锁,那么我们不能直接释放掉写锁,这样会导致读锁也解了。我们应该加一
      个读锁,将锁降级。

    对于读写锁的计数器,我们来看一下官方文档的图:

    读锁计数器 写锁计数器 加读锁 加写锁 解读锁 解写锁
    0 0 加读锁 加写锁 - -
    0 1 +1 +1 - 解写锁
    0 N +1 +1 - -1
    1 0 +1 解读锁再加写锁 解读锁 -
    1 1 +1 +1 -1 加读锁
    1 N +1 +1 -1 -1
    N 0 +1 解读锁再加写锁 -1 -
    N 1 +1 +1 -1 加读锁
    N N +1 +1 -1 -1

    需要注意的地方有两点:

    • 加写锁时,如果当前已经持有读锁,那么先尝试加写锁,try_lock 失败说明其他进程持有了读锁,我们需要先将自己的读锁释放掉,再进行加写锁操作,以避免死锁的发生。
    • 解写锁时,假如之前曾经持有读锁,那么我们不能直接释放掉写锁,这样会导致读锁也解了。我们应该加一个读锁,将锁降级。

    2. MMKV源码解读

    基于以上原理,封装flock文件锁C++类为:

    #ifndef MMKV_INTERPROCESSLOCK_H
    #define MMKV_INTERPROCESSLOCK_H
    
    #include <cassert>
    #include <fcntl.h>
    
    enum LockType {
        SharedLockType,
        ExclusiveLockType,
    };
    
    // a recursive POSIX file-lock wrapper
    // handles lock upgrade & downgrade correctly
    class FileLock {
        // 文件句柄
        int m_fd;
        // 读锁计数器
        size_t m_sharedLockCount;
        // 解锁计数器
        size_t m_exclusiveLockCount;
        
        bool doLock(LockType lockType, bool wait);
    
        bool isFileLockValid() { return m_fd >= 0; }
    
        // just forbid it for possibly misuse
        FileLock(const FileLock &other) = delete;
    
        FileLock &operator=(const FileLock &other) = delete;
    
    public:
        FileLock(int fd) : m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0) {}
    
        bool lock(LockType lockType);
    
        bool try_lock(LockType lockType);
    
        bool unlock(LockType lockType);
    };
    
    class InterProcessLock {
        FileLock *m_fileLock;
        LockType m_lockType;
    
    public:
        InterProcessLock(FileLock *fileLock, LockType lockType)
            : m_fileLock(fileLock), m_lockType(lockType), m_enable(true) {
            assert(m_fileLock);
        }
    
        bool m_enable;
    
        void lock() {
            if (m_enable) {
                m_fileLock->lock(m_lockType);
            }
        }
    
        bool try_lock() {
            if (m_enable) {
                return m_fileLock->try_lock(m_lockType);
            }
            return false;
        }
    
        void unlock() {
            if (m_enable) {
                m_fileLock->unlock(m_lockType);
            }
        }
    };
    
    #endif //MMKV_INTERPROCESSLOCK_H
    

    在实现中,关键点再于读写计数器的操作,加锁:

    bool FileLock::doLock(LockType lockType, bool wait) {
        if (!isFileLockValid()) {
            return false;
        }
        bool unLockFirstIfNeeded = false;
        // 读锁
        if (lockType == SharedLockType) {
            m_sharedLockCount++;
            // 以下两个判断:如果本进程之前被上过读锁或者写锁 还未释放,那么不再加读锁
            if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
                return true;
            }
        } else {
            m_exclusiveLockCount++;
        
            // 如果本进程之前上过写锁还未释放
            if (m_exclusiveLockCount > 1) {
                return true;
            }
            // 如果当前已经持有读锁,那么先尝试加写锁,
            // try_lock 失败说明其他进程持有了读锁,需要先将自己的读锁释放掉,再进行加写锁操作,以免其他进           程也在请求加写锁造成死锁
            if (m_sharedLockCount > 0) {
                unLockFirstIfNeeded = true;
            }
        }
    
        int realLockType = LockType2FlockType(lockType);
        // LOCK_NB: 不阻塞
        int cmd = wait ? realLockType : (realLockType | LOCK_NB);
        if (unLockFirstIfNeeded) {
            // // try lock,这里肯定就是 LOCK_EX|LOCK_NB ,
            auto ret = flock(m_fd, realLockType | LOCK_NB);
            if (ret == 0) { //加锁成功
                return true;
            }
            // 加锁失败, 先把自己的读锁释放
            ret = flock(m_fd, LOCK_UN);
            if (ret != 0) {
                MMKVError("fail to try unlock first fd=%d, ret=%d, error:%s", m_fd, ret,
                          strerror(errno));
            }
        }
    
        //加锁lock方法都是阻塞
        auto ret = flock(m_fd, cmd);
        if (ret != 0) {
            MMKVError("fail to lock fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
            return false;
        } else {
            return true;
        }
    }
    

    解锁:

    bool FileLock::unlock(LockType lockType) {
        if (!isFileLockValid()) {
            return false;
        }
        bool unlockToSharedLock = false;
        //读锁
        if (lockType == SharedLockType) {
            if (m_sharedLockCount == 0) {
                //如果一个锁都没有,还解什么,失败
                return false;
            }
            m_sharedLockCount--;
            // don't want shared-lock to break any existing locks
            // 还存在读锁(读锁计数器不为0)或者还存在写锁,不执行解锁
            if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
                //本次解锁完成
                return true;
            }
        } else {
            //写锁计数器为0,不操作
            if (m_exclusiveLockCount == 0) {
                return false;
            }
            //写锁计数器-1,不为0,同样不操作
            m_exclusiveLockCount--;
            if (m_exclusiveLockCount > 0) {
                return true;
            }
            // restore shared-lock when all exclusive-locks are done
            //到这一步表示无写锁了(写锁计数器为0)
            // 同时还存在读锁,不能解锁,需要降级写锁为读锁
            if (m_sharedLockCount > 0) {
                unlockToSharedLock = true;
            }
        }
        //是否降级
        int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
        auto ret = flock(m_fd, cmd);
        if (ret != 0) {
            MMKVError("fail to unlock fd=%d, ret=%d, error:%s", m_fd, ret, strerror(errno));
            return false;
        } else {
            return true;
        }
    }
    

    上面加锁和解锁的步骤,具体细节都在注释里面了,大家可以看看。

    多进程的讲解官方文档比较详细,这里再把官方地址贴一下,这里大家只需要了解加锁和解锁的步骤就完全oK

    了。

    <a href="https://github.com/Tencent/MMKV/wiki/android_ipc">https://github.com/Tencent/MMKV/wiki/android_ipc</a>

    相关文章

      网友评论

        本文标题:MMKV的原理与实现(三)

        本文链接:https://www.haomeiwen.com/subject/dblmactx.html