美文网首页 移动 前端 Python Android Java
MMKV (三) POSIX线程和文件锁

MMKV (三) POSIX线程和文件锁

作者: zcwfeng | 来源:发表于2020-10-13 00:37 被阅读0次

    前言:由于知识点多,分了多个记录。

    MMKV( 一) 了解原理
    MMKV (二)基础知识点和实现流程解析
    MMKV (三) POSIX线程和文件锁

    POSIX线程

    POSIX,全称为可移植性操作系统接口。它包括了系统应用程序接口(简称API)。该标准的目的是定义了标准的 基于UNIX操作系统的系统接口和环境来支持源代码级的可移植性,致力于提供基于不同语言的规范。POSIX的线程 标准,定义了创建和操纵线程的一套API。

    基本使用

    #include <iostream>                                   
    #include <queue>                                      
    #include <pthread.h>                                  
    void *run(void* args) { //异步方法                        
        int i = *(int*)args; // 1                         
        printf("%d\n",i);                                 
        return 0;                                         
    }                                                     
    int main() {                                          
        int i = 1; //线程参数                                 
        pthread_t pid;                                    
        pthread_create(&pid, 0, run, &i);//创建线程           
        pthread_join(pid,0);                              
        // 等待线程结束                                         
        system("pause");                                  
        return 0;                                         
                                                          
    }                                                     
    

    线程同步

    多线程同时读写同一份共享资源的时候,可能会引起冲突。需要引入线程“同步”机制,即各位线程之间有序 地对共享资源进行操作。

    #include <pthread.h>
    using namespace std;
    queue<int> q;
    void *pop(void* args) {
    //线程未同步导致的多线程安全问题 // 会有重复的数据取出并出现异常 if (!q.empty())
    {
    printf("取出数据:%d\n", q.front());
    q.pop(); }
    else { printf("无数据\n");
    }
    
    return 0; }
    int main() {
    for (size_t i = 0; i < 5; i++) {
    q.push(i); }
    pthread_t pid[10];
    for (size_t i = 0; i < 10; i++) {
            pthread_create(&pid[i], 0, pop, &q);
        }
    system("pause");
    return 0; }
      
    

    互斥量

    pthread_mutex_t互斥量就是一把锁。 当一个线程要访问一个共享变量时,先用锁把变量锁住,操作完了之后再 释放掉锁。
    当另一个线程也要访问这个变量时,发现这个变量被锁住了,此时无法访问,一直等待直到锁没了,才能够上锁与 使用。
    使用互斥量前要先初始化,使用的函数如下:

    加入互斥锁

    queue<int> q;
    pthread_mutex_t mutex; //互斥量:锁 void *pop(void* args) {
    // 锁 pthread_mutex_lock(&mutex); if (!q.empty())
    {
    printf("取出数据:%d\n", q.front());
    q.pop(); }
    else { printf("无数据\n");
    }
    // 放 pthread_mutex_unlock(&mutex); return 0;
    }
    int main() {
    //初始化互斥锁 
    pthread_mutex_init(&mutex, 0); for (size_t i = 0; i < 5; i++)
    {
    q.push(i);
    }
    pthread_t pid[10];
    for (size_t i = 0; i < 10; i++) {
            pthread_create(&pid[i], 0, pop, &q);
        }
    for (size_t i = 0; i < 10; i++) {
            pthread_join(pid[i], 0);
        }
    //需要释放 pthread_mutex_destroy(&mutex); system("pause");
    return 0;
    }
    #include <iostream> #include <queue> #include <pthread.h>
    using namespace std; //互斥量 : 锁 pthread_mutex_t mutex;
    queue<int> q;
    void test(){
    pthread_mutex_lock(&mutex); //线程阻塞,死锁 printf("队列大小:%d\n", q.size()); pthread_mutex_unlock(&mutex);
    }
    void *pop(void* args) {
    int ret= pthread_mutex_lock(&mutex); if (!q.empty()) {
    printf("取出数据:%d\n", q.front());
    q.pop(); }
    else { printf("无数据\n");
    }
    test(); //死锁 pthread_mutex_unlock(&mutex); return 0;
    }
     
    

    但是pthread_mutex_t锁是默认是非递归的,即不可重入锁。如果一个线程多次获取同一个非递归锁,则会产生死 锁:

    #include <iostream> 
    #include <queue> 
    #include <pthread.h>
    using namespace std; //互斥量 : 锁
     pthread_mutex_t mutex;
    queue<int> q;
    void test(){
    pthread_mutex_lock(&mutex); //线程阻塞,死锁
     printf("队列大小:%d\n", q.size()); pthread_mutex_unlock(&mutex);
    }
    void *pop(void* args) {
    int ret= pthread_mutex_lock(&mutex); if (!q.empty()) {
    printf("取出数据:%d\n", q.front());
    q.pop(); }
    else { printf("无数据\n");
    }
    test(); //死锁 
    pthread_mutex_unlock(&mutex); 
    return 0;
    }
      pthread_mutex_destroy(&mutex);
    

    创建递归锁需要在初始化 pthread_mutex_t 时指明:

     
    // 锁的属性 : pthread_mutex_t锁默认是非递归的 
    
    pthread_mutexattr_t attr;
    
    pthread_mutexattr_init(&attr);
    
    // 设置为递归锁
    pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_RECURSIVE); 
    
    // 初始化mutex
    
    pthread_mutex_init(&mutex, &attr); // 完成初始化后即可释放 
    
    pthread_mutexattr_destroy(&attr);
    

    条件变量

    条件变量是线程间进行同步的一种机制,主要包括两个动作:一个线程等待"条件变量的条件成立"而挂起; 另一个线程使"条件成立",从而唤醒挂起线程

    template<class T>
    class SafeQueue {
    public:
        SafeQueue() {
            pthread_mutex_init(&mutex, 0);
        }
    
        ~SafeQueue() {
            pthread_mutex_destory(&mutex);
        }
    
        void enqueue(T t) {
            pthread_mutex_lock(&mutex);
            q.push(t);
            pthread_mutex_unlock(&mutex);
        }
    
        int dequeue(T &t) {
            pthread_mutex_lock(&mutex);
            if (!q.empty()) {
                t = q.front();
                q.pop();
                pthread_mutex_unlock(&mutex);
                return 1;
            }
            pthread_mutex_unlock(&mutex);
            return 0;
        }
    
    private:
        queue <T> q;
        pthread_mutex_t mutex;
    };
      
    

    上面的模板类存放数据T,并使用互斥锁保证对queue的操作是线程安全的。这就是一个生产/消费模式。 如果需要在取出数据的时候,queue为空,则一直等待,直到下一次enqueue加入数据。 此时可以加入条件变量使 “dequeue” 挂起,直到由其他地方唤醒:

    
    #include <queue>
    #include <pthread.h>
    
    using namespace std;
    
    template<class T>
    class SafeQueue {
        queue<T> q;
        pthread_mutex_t mutex;
        pthread_cond_t cond; //条件变量
    public:
        SafeQueue() {
            pthread_mutex_init(&mutex, 0);
            pthread_cond_init(&cond, 0); //初始化
        }
    
        ~SafeQueue() {
            pthread_mutex_destory(&mutex);
            pthread_cond_destory(&cond); //销毁
        }
    
        void enqueue(T t) {
            pthread_mutex_lock(&mutex);
            q.push(t);
    //发出信号 通知挂起线程
    // 1、由系统唤醒一个线程(随机)
    // pthread_cond_signal(&cond);
    // 2、广播 唤醒所有等待条件线程
            pthread_cond_broadcast(&cond);
            pthread_mutex_unlock(&mutex);
        }
    
        int dequeue(T &t) {
            pthread_mutex_lock(&mutex);
    //可能因为某些特殊条件虚假唤醒 所以while循环等待唤醒。(与Java的wait一样)
            while (q.empty()) {
                pthread_cond_wait(&cond, &mutex); //等待并自动释放互斥锁
            }
            t = q.front();
            q.pop();
            pthread_mutex_unlock(&mutex);
            return 1;
        }
    };
    

    存在三个线程,分别为:生产者P、消费者C1与C2。
    1、C1从队列中取出数据,此时队列为空;
    2、C2也想从队列中获取一个元素,但此时队列为空,C2进入阻塞(cond.wait()),等待队列非空;
    3、 P将一个元素入队,并唤醒条件变量;
    4、C1与C2接收到唤醒信号,解除阻塞状态,上锁并获取队列中的元素;
    5、C2优先获取到锁,移除队列元素并释放锁;
    6、C1此时操作的队列为空,被虚假唤醒。

    自动管理

    在使用pthread_mutex_t时,lock之后,一定需要unlock。为了防止忘记解锁,同时方便使用,可以利用C++中构 造方法与析构方法对锁进行封装,实现锁的自动管理。

    #include <pthread.h>
    
    //封装互斥量: 锁
    class ThreadLock {
    private:
        pthread_mutex_t m_lock;
    public:
        ThreadLock() {
    //递归锁
            pthread_mutexattr_t attr;
            pthread_mutexattr_init(&attr);
            pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
            pthread_mutex_init(&m_lock, &attr);
            pthread_mutexattr_destroy(&attr);
        }
    
        ~ThreadLock() {
            pthread_mutex_destroy(&m_lock);
        }
    
        void lock() {
            auto ret = pthread_mutex_lock(&m_lock);
            if (ret != 0) { //失败
            }
        }
    
        bool try_lock() {
            auto ret = pthread_mutex_trylock(&m_lock);
            return (ret == 0);
        }
    
        void unlock() {
            auto ret = pthread_mutex_unlock(&m_lock);
            if (ret != 0) {
    //失败
            }
        }
    };
    
    
    //模板类
    template<typename T>
    class ScopedLock {
        T *m_lock;
    
    //禁止使用编译器默认生成的函数
    //默认拷贝构造与=赋值
        ScopedLock(const ScopedLock<T> &other) = delete;
    
        ScopedLock &operator=(const ScopedLock<T> &other) = delete;
    
    public: //构造方法中上锁
        ScopedLock(T *oLock) : m_lock(oLock) {
            assert(m_lock);
            lock();
        }
    
    //析构方法解除锁
        ~ScopedLock() {
            unlock();
    
            m_lock = nullptr;
        }
    
        void lock() {
            if (m_lock) {
                m_lock->lock();
            }
        }
    
        bool try_lock() {
            if (m_lock) {
                return m_lock->try_lock();
            }
            return false;
        }
    
        void unlock() {
            if (m_lock) {
                m_lock->unlock();
            }
        }
    
    };
    //宏函数 __COUNTER__:初始值为0,编译单元内每出现一次出现该宏,便会加1。
    #define SCOPEDLOCK(lock) _SCOPEDLOCK(lock, __COUNTER__)
    #define _SCOPEDLOCK(lock, counter) __SCOPEDLOCK(lock, counter)
    //decltype:推断变量类型;__scopedLock##counter(&lock):##为连接符
    #define __SCOPEDLOCK(lock, counter) ScopedLock<decltype(lock)> __scopedLock##counter(&lock)
     
    //使用 
    ThreadLock lock;
    
    void test() {
    //创建 ScopedLock<ThreadLock> __scopedLock1对象,使用lock上锁 
        SCOPEDLOCK(lock);
    //退出方法 执行__scopedLock1析构,解锁lock
    }
      
    
    

    文件锁

    在多个进程同时操作同一份文件的过程中,很容易导致文件中的数据混乱,需要锁操作来保证数据的完整性。 在在
    最新版本的MMKV中使用flock文件锁来完成多进程操作文件的同步

     
    #include <sys/file.h>
    // Returns 0 on success, or -1 on error
    int flock (int fd, int operation);
    

    flock()系统调用是在整个文件中加锁,通过对传入的fd所指向的文件进行操作,然后在通过operation参数所设置 的值来确定做什么样的操作。operation可以赋如下值:

    • LOCK_SH,共享锁,多个进程可以使用同一把锁:读锁;
    • LOCK_EX,排他锁,同时只允许一个进程使用:写锁;
    • LOCK_UN,释放锁
    • LOCK_BN,发起非阻塞请求,如:LOCK_SH|LOCK_BN。
      任意数量的进程可同时持有一个文件上的共享锁(读锁),但只能有一个进程能够持有一个文件上的互斥锁(写 锁)。flock支持锁升级:只有自己进程存在读锁,可以直接升级为写锁,在转换的过程中首先会删除既有的锁,然 后创建新锁 。若其他进程存在读锁,需要等待释放读锁;

    在设计MMKV中的文件锁需要实现:

    • 递归锁
      意思是如果一个进程/线程已经拥有了锁,那么后续的加锁操作不会导致卡死,并且解锁也不会导致外层的锁 被解掉。对于文件锁来说,前者是满足的,后者则不然。因为文件锁是状态锁,没有计数器,无论加了多少 次锁,一个解锁操作就全解掉。只要用到子函数,就非常需要递归锁。

    • 锁升级/降级
      锁升级是指将已经持有的共享锁,升级为互斥锁,亦即将读锁升级为写锁;锁降级则是反过来。文件锁支持 锁升级,但是容易死锁:假如 A、B 进程都持有了读锁,现在都想升级到写锁,就会陷入相互等待的困境,发 生死锁。另外,由于文件锁不支持递归锁,也导致了锁降级无法进行,一降就降到没有锁。

    为了解决这两个难题,需要对文件锁进行封装,增加读锁、写锁计数器。

    • 加写锁时,如果当前已经持有读锁,那么先尝试加写锁(try_lock ),try_lock 失败说明其他进程持有了读 锁,我们需要先将自己的读锁释放掉,再进行加写锁操作,以避免死锁的发生。
    • 解写锁时,假如之前曾经持有读锁,那么我们不能直接释放掉写锁,这样会导致读锁也解了。我们应该加一 个读锁,将锁降级
    读锁计数器 写锁计数器 加读锁 加写锁 解读锁 解写锁
    0 0 加读锁 加写锁 - -
    0 1 +1 +1 - 解读锁
    0 N +1 +1 - -1
    1 0 +1 解读锁再加写锁 解读锁 -
    1 1 +1 +1 -1 加读锁
    1 N +1 +1 -1 -1
    N 0 +1 解读锁再加写锁 -1 -
    N 1 +1 +1 -1 加读锁
    N N +1 +1 -1 -1

    读技技术器和写计数器一起作为条件,在单独看其他列

    基于上诉原理,封装flock文件锁C++类为:

    class FileLock {
    
        //文件句柄
        int m_fd;
        //文件锁
        flock m_lockInfo;
        //读计数
        size_t m_sharedLockCount;
        //写计数
        size_t m_exclusiveLockCount;
    
        bool doLock(LockType lockType, bool wait);
    
        bool isFileLockValid() { return m_fd >= 0; }
    
        FileLock(const FileLock &other) = delete;
    
        FileLock &operator=(const FileLock &other) = delete;
    
    public:
        FileLock(int fd) : m_fd(fd), m_sharedLockCount(0), m_exclusiveLockCount(0) {}
    
    
        bool lock(LockType lockType);
    
        bool try_lock(LockType lockType);
    
        bool unlock(LockType lockType);
    };
    
    

    在实现中,关键在于对读写计数器的操作,加锁:

    
    
    bool FileLock::doLock(LockType lockType, bool wait) {
        if (!isFileLockValid()) {
            return false;
        }
        bool unLockFirstIfNeeded = false;
    
        if (lockType == SharedLockType) {
    
            m_sharedLockCount++;
            // 如果本进程之前被上过读锁或者写锁 还未释放,那么不再加读锁
            if (m_sharedLockCount > 1 || m_exclusiveLockCount > 0) {
                return true;
            }
        } else {
            m_exclusiveLockCount++;
            // 如果本进程之前上过写锁还未释放
            if (m_exclusiveLockCount > 1) {
                return true;
            }
            // 如果当前已经持有读锁,那么先尝试加写锁,
            // try_lock 失败说明其他进程持有了读锁,需要先将自己的读锁释放掉,再进行加写锁操作,以免其他进程也在请求加写锁造成死锁
            if (m_sharedLockCount > 0) {
                unLockFirstIfNeeded = true;
            }
        }
    
        int realLockType = LockType2FlockType(lockType);
        // LOCK_NB: 不阻塞
        int cmd = wait ? realLockType : (realLockType | LOCK_NB);
    
        if (unLockFirstIfNeeded) {
            // try lock,这里肯定就是 LOCK_EX|LOCK_NB ,
            auto ret = flock(m_fd, realLockType | LOCK_NB);
            if (ret == 0) { //加锁成功
                return true;
            }
            // 加锁失败, 先把自己的读锁释放
             flock(m_fd, LOCK_UN);
        }
    
        auto ret = flock(m_fd, cmd); //加锁lock方法都是阻塞
        if (ret != 0) {
            return false;
        } else {
            return true;
        }
    }
    
    

    解锁:

    bool FileLock::lock(LockType lockType) {
        return doLock(lockType, true);
    }
    
    bool FileLock::try_lock(LockType lockType) {
        return doLock(lockType, false);
    }
    
    bool FileLock::unlock(LockType lockType) {
        if (!isFileLockValid()) {
            return false;
        }
        bool unlockToSharedLock = false;
    
        if (lockType == SharedLockType) {
            if (m_sharedLockCount == 0) {
                //没锁解,失败
                return false;
            }
            m_sharedLockCount--;
            // 计数器不为0,不解锁
            if (m_sharedLockCount > 0 || m_exclusiveLockCount > 0) {
                //本次解锁完成
                return true;
            }
        } else {
            if (m_exclusiveLockCount == 0) {
                return false;
            }
            m_exclusiveLockCount--;
            if (m_exclusiveLockCount > 0) {
                return true;
            }
            // 写锁解除完了(计数为0)并且读锁还有计数,还原锁为读锁
            if (m_sharedLockCount > 0) {
                unlockToSharedLock = true;
            }
        }
    
        int cmd = unlockToSharedLock ? LOCK_SH : LOCK_UN;
        auto ret = flock(m_fd, cmd);
        if (ret != 0) {
            return false;
        } else {
            return true;
        }
    }
    
    

    MMKV多进程设计

    上面我们讲过了flock文件锁能够实现同一时间只有一个进程在操作持久化文件,但是如果存在AB进程,在B进程修 改完成之后,A进程如何知道B进程的修改?

    MMKV 本质上是将文件 mmap 到内存块中,将新增的 key-value 统统 append 到内存中;到达边界后,进行重整 回写以腾出空间,空间还是不够的话,就 double 内存空间;对于内存文件中可能存在的重复键值,MMKV 只选用 最后写入的作为有效键值。那么其他进程为了保持数据一致,就需要处理这三种情况:写指针增长、内存重整、内 存增长。但首先还得解决一个问题:怎么让其他进程感知这三种情况?

    • 写指针的同步
      我们可以在每个进程内部缓存自己的写指针,然后在写入键值的同时,还要把最新的写指针位置也写到 mmap 内存中;这样每个进程只需要对比一下缓存的指针与 mmap 内存的写指针,如果不一样,就说明其他 进程进行了写操作。事实上 MMKV 原本就在文件头部保存了有效内存的大小,这个数值刚好就是写指针的内 存偏移量,我们可以重用这个数值来校对写指针。
    • 内存重整的感知
      考虑使用一个单调递增的序列号,每次发生内存重整,就将序列号递增。将这个序列号也放到 mmap 内存 中,每个进程内部也缓存一份,只需要对比序列号是否一致,就能够知道其他进程是否触发了内存重整。
    • 内存增长的感知
      事实上 MMKV 在内存增长之前,会先尝试通过内存重整来腾出空间,重整后还不够空间才申请新的内存。所 以内存增长可以跟内存重整一样处理。至于新的内存大小,可以通过查询文件大小来获得,无需在 mmap 内 存另外存放。

    在MMKV中会生成一份与数据文件同名的.crc文件,此文件中记录两个关键数据:数据内容的crc校验码与单调递增 的序列号。

    CRC校验码:循环冗余校验 ,类似文件MD5值。我们下载软件往往会附带MD5值,如AS。比较MD5值就能 知道文件是否合法、完整,一旦修改了文件数据,MD5值将不匹配。
    递增的序列号:每次去重、扩容即执行全量更新,序列号+1并记录在crc文件中,不匹配则需要重新解析全部 文件。

    每次写入与获取数据需要执行以下检查:

    void MMKV::checkLoadData() {
        if (!m_isInterProcess) {
            return;
        }
        MMKVMetaInfo metaInfo;
        metaInfo.read(m_metaFile.getMemory());
        //本次读取和记录的不同
        if (m_metaInfo.m_sequence != metaInfo.m_sequence) {
            //内存重整,序列号递增
            // 当一个进程发现内存被重整了,就意味着原写指针前面的键值全部失效,那么最简单的做法是全部抛弃掉,从头开始重新加载一遍。
            LOGI("checkData:序列号改变");
            SCOPEDLOCK(m_sharedProcessLock);
            clearMemoryState();
            loadFromFile();
        } else if (m_metaInfo.m_crcDigest != metaInfo.m_crcDigest) {
            LOGI("checkData:校验码改变");
            SCOPEDLOCK(m_sharedProcessLock);
            size_t fileSize = 0;
            struct stat st = {0};
            if (fstat(m_fd, &st) != -1) {
                fileSize = (size_t) st.st_size;
            }
            if (m_size != fileSize) {
                // 发生文件增长,必然已经先发生了内存重整,与内存重整一样的处理
                LOGI("checkData:文件大小改变");
                clearMemoryState();
                loadFromFile();
            } else {
                LOGI("checkData:写指针增长");
                // 文件大小不变,可能写指针增长
                partialLoadFromFile();
            }
        }
    }
    

    相关文章

      网友评论

        本文标题:MMKV (三) POSIX线程和文件锁

        本文链接:https://www.haomeiwen.com/subject/dpqmpktx.html