iOS开发进阶:锁的分析

作者: __Null | 来源:发表于2021-12-05 07:25 被阅读0次

    我们在使用多线程的时候,同一时刻可能会有多个线程访问同一内存的内容,这样就很容易引发数据混乱(数据安全)的问题。为了减少或者避免这种问题的出现,我们需要使用锁来保证统一时刻只有一个线程访问这一块内存。锁可以让数据的访问更安全。

    我们常见的锁包括OSSpinLockdispatch_semaphore_tos_unfair_lockpthread_mutex_tNSlockNSConditionpthread_mutex_t(recursive)NSRecursiveLockNSConditionLock@synchronized等10来种。

    按照特性划分,锁可以分为自旋锁互斥锁:
    1.自旋锁:线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。一旦获取了自旋锁,线程会一直保持该锁,甚至显示释放自旋锁。自旋锁避免了进程上下文调度的开销,因此对于线程只会阻塞很短时间的场合是有效的。
    2.互斥锁:是一种利用于多线程编程中,防止两条线程同时对统一资源进行读写的机制。该目的是通过将代码切片成一个一个临时区二达成的。属于互斥锁的有NSlockpthread_mutex_t@synchronized。互斥锁又分为递归和不递归两种类型。

    那么它们各自的性能优势什么样的呢?我们来做一个简单的测试,测试代码如下:

    - (void)test{
        int loop = 100000;
        
        {
            
            OSSpinLock lock = OS_SPINLOCK_INIT;
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                OSSpinLockLock(&lock);
                OSSpinLockUnlock(&lock);
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"OSSpinLock:%f", (end - start)*1000);
        }
        
        {
            dispatch_semaphore_t semaphore = dispatch_semaphore_create(1);
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                dispatch_semaphore_wait(semaphore, DISPATCH_TIME_FOREVER);
                dispatch_semaphore_signal(semaphore);
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"dispatch_semaphore_t:%f", (end - start)*1000);
        }
        
        {
            os_unfair_lock unfair = OS_UNFAIR_LOCK_INIT;
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                os_unfair_lock_lock(&unfair);
                os_unfair_lock_unlock(&unfair);
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"os_unfair_lock:%f", (end - start)*1000);
        }
        
        {
            pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                pthread_mutex_lock(&mutex);
                pthread_mutex_unlock(&mutex);
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"pthread_mutex_t:%f", (end - start)*1000);
        }
        
        {
            pthread_mutex_t recurive;
            pthread_mutexattr_t attr;
            pthread_mutexattr_init(&attr);
            pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE);
            pthread_mutex_init(&recurive, &attr);
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                pthread_mutex_lock(&recurive);
                pthread_mutex_unlock(&recurive);
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"pthread_mutex_t(recurive):%f", (end - start)*1000);
        }
        
        {
            NSLock *lock = [NSLock new];
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                [lock lock];
                [lock unlock];
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"NSLock:%f", (end - start)*1000);
        }
        
        {
            NSRecursiveLock *recursiveLock = [NSRecursiveLock new];
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                [recursiveLock lock];
                [recursiveLock unlock];
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"NSRecursiveLock:%f", (end - start)*1000);
        }
        
        {
            NSCondition *condition = [NSCondition new];
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                [condition lock];
                [condition unlock];
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"NSCondition:%f", (end - start)*1000);
        }
        
        {
            NSConditionLock *conditionLock = [NSConditionLock new];
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                [conditionLock lock];
                [conditionLock unlock];
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"NSConditionLock:%f", (end - start)*1000);
        }
        
        {
            double_t start = CFAbsoluteTimeGetCurrent();
            for (int i = 0; i < loop; i++){
                @synchronized (self) {
                    
                }
            }
            double_t end = CFAbsoluteTimeGetCurrent();
            NSLog(@"@synchronized:%f", (end - start)*1000);
        }
    }
    

    也就是我们尝试加锁解锁10万次,看这个过程耗费的时间(单位:毫秒ms)。
    在模拟器上运行时间如下:(单位:毫秒)


    模拟器运行结果.png

    在真机上运行时间如下:(单位:毫秒)


    真机(iPhone 11)运行结果.png

    真机和模拟器上运行结果基本相近,在真机上@synchronized的表现要比模拟器上号很多,从7.5毫秒提升到2.7毫秒(猜测:@synchronized可能苹果针对arm有专门的优化?)

    一.@synchronized

    @synchronized有加锁的效果,并且递归可重用,我们从源码的角度看看他是如何实现的:
    源文件

    const NSString *syncKey = @"NXXcrun";
    @implementation NXXcrun
    + (void)main {
        @synchronized (syncKey) {
        }
    }
    

    在控制台执行如下命令,将.m文件转成.cpp文件

    xcrun -sdk iphoneos clang -arch arm64e -rewrite-objc NXXcrun.m
    

    整理后的.cpp文件

    const NSString *syncKey = (NSString *)&__NSConstantStringImpl__var_folders_ns_73tnh7591jvg16yqm8fn6ykh0000gn_T_NXXcrun_0a2d9a_mi_0;
    
    // @implementation NXXcrun
    static void _C_NXXcrun_main(Class self, SEL _cmd) {
        {
            id _rethrow = 0;
            id _sync_obj = (id)syncKey;
            objc_sync_enter(_sync_obj);
            
            try {
                struct _SYNC_EXIT {
                    //构造方法首先执行sync_exit(arg)这个方法,给sync_exit变量赋值,函数体为空。
                    _SYNC_EXIT(id arg) : sync_exit(arg) {}
                    //析构函数调用时调用objc_sync_exit,这里的参数sync_exit是构造方法外部传入的_sync_obj。
                    ~_SYNC_EXIT() {objc_sync_exit(sync_exit);}
                    id sync_exit;
                }_sync_exit(_sync_obj);。
            } catch (id e) {_rethrow = e;}
            
            { struct _FIN {
                _FIN(id reth) : rethrow(reth) {}
                ~_FIN() { if (rethrow) objc_exception_throw(rethrow); }
                id rethrow;
            } _fin_force_rethow(_rethrow);}
        }
    }
    // @end
    

    这里的_SYNC_EXIT(id arg)是结构体的构造函数,_sync_exit(_sync_obj)相当于调用_SYNC_EXIT(id arg)参数与构造方法一致,而sync_exit(arg)是给sync_exit变量赋值,函数题为空。~_SYNC_EXIT()是析构函数,在实例销毁时会自动调用,析构函数中调用了objc_sync_exit(sync_exit);函数。我么举个例子看看这段代码的调用顺序:

    NSLog(@"开始");
    {
        struct Tester {
            Tester(id arg) : syncKey(arg){
                NSLog(@"init:arg=%@;syncKey=%@", arg, syncKey);
            }
            ~Tester(){
                NSLog(@"deinit:syncKey=%@", syncKey);
            }
            id syncKey;
        }fn(@"abc");
        
        NSLog(@"需要执行的业务代码");
    }
    NSLog(@"结束");
    

    运行代码后打印结果如下:

    开始
    init:arg=abc;syncKey=abc
    需要执行的业务代码
    deinit:syncKey=abc
    结束
    

    可以看到这里需要执行的业务代码是在Tester的构造函数和析构函数之间打印的,这个设计还是很巧妙的。明白了这一点,我们可以将上面的代码简化为:

    id _sync_obj = (id)syncKey;
    objc_sync_enter(_sync_obj);
    objc_sync_exit(_sync_obj);
    

    添加符号断点:objc_sync_enter、objc_sync_exit,然后运行代码看


    符号断点.png

    断点进入了libobjc.A.dylib中,接下里我们可以去objc的源码中看看

    int objc_sync_enter(id obj){
        int result = OBJC_SYNC_SUCCESS;
        if (obj) {
            SyncData* data = id2data(obj, ACQUIRE);
            ASSERT(data);
            data->mutex.lock();
        } 
        else {
            objc_sync_nil(); // @synchronized(nil) does nothing
        }
        return result;
    }
    
    int objc_sync_exit(id obj){
        int result = OBJC_SYNC_SUCCESS;
        if (obj) {
            SyncData* data = id2data(obj, RELEASE); 
            if (!data) {
                result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
            } 
            else {
                bool okay = data->mutex.tryUnlock();
                if (!okay) {
                    result = OBJC_SYNC_NOT_OWNING_THREAD_ERROR;
                }
            }
        } 
        else {
            // @synchronized(nil) does nothing
        }
        return result;
    }
    

    通过objc_sync_enter可以看到,如果@synchronized(nil),那么它是没做事情的,反之调用data->mutex.lock()进行加锁;而objc_sync_exit类似,调用data->mutex.tryUnlock()进行解锁。

    重点都在data中。这个data的创建是在一个名为id2data的函数中(第一个参数是obj标记,第二个参数表示操作,加锁的时候传入ACQUIRE, 解锁的时候传入RELEASE),我们跟随进入id2data函数中,接下来很多的内容都出现在这个函数中(有数函数太长,我就不贴代码了)。

    我们通过如下代码进行断点调试:

    @synchronized (p) {
        @synchronized (p) {
            @synchronized (q) {
            }
        }
    }
    

    进入这个函数首先进入的是

    spinlock_t *lockp = &LOCK_FOR_OBJ(object);
    SyncData **listp = &LIST_FOR_OBJ(object);
    
    #define LOCK_FOR_OBJ(obj) sDataLists[obj].lock
    #define LIST_FOR_OBJ(obj) sDataLists[obj].data
    static StripedMap<SyncList> sDataLists;
    
    struct SyncList {
        SyncData *data;
        spinlock_t lock;
    
       constexpr SyncList() : data(nil), lock(fork_unsafe_lock) { }
    };
    
    typedef struct alignas(CacheLineSize) SyncData {
        struct SyncData* nextData;
        DisguisedPtr<objc_object> object;
        int32_t threadCount;  // number of THREADS using this block
        recursive_mutex_t mutex;
    } SyncData;
    

    可以看出:

    • lockp和listp是从全局静态变量sDataLists中取出来的数据,sDataLists是一个StripedMap结构体类型,存储的是SyncList类型的结构体;SyncList是对spinlock_tSyncData的封装。
    • SyncData是一个链表结构,通过nextData可以找到下一个SyncData,object存储了@synchronized的标记,threadCount记录了线程的数量,并且包含recursive_mutex_t类型的递归锁mutex供外部加锁解锁使用。

    我们打印一下:

    (lldb) p sDataLists[0]
    (SyncList *) $1={
        data = nil
        lock = {
           mLock = (_os_unfair_lock_opaque = 0)
        }
    }
    

    打印结果符合预期,打印整个的sDataLists,则存在64个已经创建好的SyncList结构体,结构如上所示。

    解读完如上基础结构,我们接着往下看,tls_get_direct从线程的栈存空间获取SyncData,首次进来这里为空;接着查询cache,仍然为空;接着进入如下代码

    // Allocate a new SyncData and add to list.
    // XXX allocating memory with a global lock held is bad practice,
    // might be worth releasing the lock, allocating, and searching again.
    // But since we never free these guys we won't be stuck in allocation very often.
    posix_memalign((void **)&result, alignof(SyncData), sizeof(SyncData));
    result->object = (objc_object *)object;
    result->threadCount = 1;
    new (&result->mutex) recursive_mutex_t(fork_unsafe_lock);
    result->nextData = *listp;
    *listp = result;
    

    posix_memalign是给result分配内存空间,还做了内存对齐,内存分配完成则保存object,赋值threadCount,初始化mutex,将result.nextData指向链表的头指针,再将头指针指向result。实际上这是链表的头插法操作,新插入的对象在链表的头部。这样数据就保存到了sDataLists中。再次打印发现hash为32的SyncList的data已经有值了,

    [32] = {
      value = {
        data = 0x0000000101209980
        lock = {
          mLock = (_os_unfair_lock_opaque = 0)
         }
      }
    }
    

    有个细节是这里进行内存分配和初始化的操作还进行了lock和unlock。

    result创建之后,通过tls_set_direct(SYNC_DATA_DIRECT_KEY, result)保存result,以便下一次能直接通过(SyncData *)tls_get_direct(SYNC_DATA_DIRECT_KEY)取出。

    当进入第二个synchronized的时候,通过tls_get_direct取出了上一步存储的data, data.object == object,则进入操作,取出上一步的lockCount。因为是加锁操作ACQUIRE,这里直接lockCount++,然后再执行tls_set_direct(SYNC_COUNT_DIRECT_KEY(void*)lockCount);保存最新的加锁次数。这里没有生成新的SyncData,所以sDataLists没有发生变化。

    当代码进入第三个synchronized的时候,仍然会取出上一步的存储的data,但是因为data.object!=object,则继续往下查询,cache中查询结果为空。继续按照第一步中的流程创建result。执行完*listp = result;后sDataLists中hash为34的SyncList已经有值了,

    [34] = {
       value = {
          data = 0x000000010139a380
          lock = {
             mLock = (_os_unfair_lock_opaque = 259)
          }
       }
    }
    

    如果查询出来过SyncData,则fastCacheOccupied为true,后续新建的SyncData的状态存储都存储在cache中

    if (!cache) cache = fetch_cache(YES);
    cache->list[cache->used].data = result;
    cache->list[cache->used].lockCount = 1;
    cache->used++;
    

    那么objc_sync_exit的流程是什么样的呢?如果查询到tls中存在同object的SyncData,则进行lockCount--操作,并且在lockCount减至0的时候进行如下操作,清楚tls中的数据

    // remove from fast cache
    tls_set_direct(SYNC_DATA_DIRECT_KEY, NULL);
    // atomic because may collide with concurrent ACQUIRE
    OSAtomicDecrement32Barrier(&result->threadCount);
    

    这里的threadCount表示当前SyncData被访问的线程数量,在初始化的时候为1,在lockCount减至0的时候,修改result->threadCount为0,表示当前线程已经使用完毕。外部通过id2data获取到SyncData后,通过SyncData的mutex,进行加锁和解锁。外部传入的对象为空时不具备加锁的效果。

    二.NSLock, NSRecursiveLock,NSCondition,NSRecursiveLock

    这几把锁定义在Foundation中,我们可以通过开源的swift-corelibs-foundation来窥探它底层的实现。
    这几个类定义在Foundation的NSLock文件中,它们都继承于NSObject,遵守NSLocking协议:

    public protocol NSLocking {
        func lock()
        func unlock()
    }
    

    这也就导致了,在常规需求中这几把锁的是使用非常相近,都是先初始化一个实例,然后lock加锁、unlock解锁。

    2.1 NSLock

    NSLock是一把普通的互斥锁, 当一个线程进行访问的时候,该线程获得锁,其他线程被系统挂起,直到该线程释放锁,其他线程才能进行访问,从而确保线程的安全。

    open class NSLock: NSObject, NSLocking {
        //private typealias _MutexPointer = UnsafeMutablePointer<pthread_mutex_t>
        internal var mutex = _MutexPointer.allocate(capacity: 1)
        private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
        private var timeoutMutex = _MutexPointer.allocate(capacity: 1)
        
        public override init() {
            pthread_mutex_init(mutex, nil);...
        }
        
        open func lock();
        open func unlock();
        ...
    }
    

    而NSRecursiveLock是一把递归的互斥锁,也叫递归锁。除了具备NSLock的特性外,它支持同一个线程对齐连续加锁多次而不会发生死锁。

    open class NSRecursiveLock: NSObject, NSLocking {
        //private typealias _MutexPointer = UnsafeMutablePointer<pthread_mutex_t>
        internal var mutex = _RecursiveMutexPointer.allocate(capacity: 1)
        private var timeoutCond = _ConditionVariablePointer.allocate(capacity: 1)
        private var timeoutMutex = _MutexPointer.allocate(capacity: 1)
    
        public override init() {
            super.init()
            var attrib = pthread_mutexattr_t()
            withUnsafeMutablePointer(to: &attrib) { attrs in
                pthread_mutexattr_init(attrs)
                pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE))
                pthread_mutex_init(mutex, attrs)
            };...
        }
       
        open func lock();
        open func unlock();
    }
    

    这两把锁的实现很相近,内部封装了pthread_mutex_t这把锁,初始化的时候pthread_mutex_init(mutex, nil)初始化了一个普通的互斥锁,而通过参数设置attrs:pthread_mutexattr_settype(attrs, Int32(PTHREAD_MUTEX_RECURSIVE));再pthread_mutex_init(mutex, attrs)初始化了一个递归锁。
    [lock lock]加锁
    [lock unlock]解锁
    [lock tryLock]尝试获取锁,获取到返回YES;获取不到返回NO,不会使线程进入休眠,会继续向下执行
    [lock lockBeforeDate:]若果锁被占用,则在指定时间之前线程进入修改,如果在时间点之前锁被释放了,线程立即被唤醒获得锁,返回YES,继续执行任务;如果等到指定时间还没获得锁则返回NO,继续执行任务。如果锁没有被占用,则返回YES,继续执行任务。

    2.3 NSCondition

    NSCondition底层还是对pthread_mutex_t的封装,NSCondition对象实际上是作为一个锁和一和一个线程的检查器,锁主要为了当检查条件时保护数据源,执行条件引发的任务;线程检查器主要是根据条件决定是否运行线程,即线程是否阻塞。

    open class NSCondition: NSObject, NSLocking {
        internal var mutex = _MutexPointer.allocate(capacity: 1)
        internal var cond = _ConditionVariablePointer.allocate(capacity: 1)
    
        public override init() {
            pthread_mutex_init(mutex, nil)
            pthread_cond_init(cond, nil)
        }
        
        open func lock();
        open func unlock() ;
        open func wait();
        open func wait(until limit: Date) -> Bool;
        open func signal();
        open func broadcast();
    }
    

    [condition lock]一般用于多线程同时访问、修改统一数据源,保证在统一时间内数据只被访问、修改一次,其他的线程需要在lock外等待,直到unlock。
    [condition unlock]与lock成对使用。
    [condition wait]让当前线程处于等待状态,通常用在加锁后解锁前。(当前操作不满足,需等待...操作后方可解锁)
    [condition signal]发出信号告诉线程不用再等待,可以继续执行。

    2.4 NSConditionLock

    NSConditionLock在一定条件下加锁,内部封装了NSCondition:

    open class NSConditionLock : NSObject, NSLocking {
        internal var _cond = NSCondition()
        internal var _value: Int
        internal var _thread: _swift_CFThreadRef?
        
        public convenience override init() {
            self.init(condition: 0)
        }
        
        public init(condition: Int) {
            _value = condition
        }
    
        open func lock() ;
        open func unlock();
        open var condition;
        open func lock(whenCondition condition: Int) ;
        open func `try`() -> Bool;
        open func tryLock(whenCondition condition: Int) -> Bool;
        open func unlock(withCondition condition: Int) ;
        open func lock(before limit: Date) -> Bool ;
        open func lock(whenCondition condition: Int, before limit: Date) -> Bool;
    }
    

    [conditionLock lock]; 表示conditionLock期待获得锁,如果没有其他线程获得锁(不需要判断内部的condition) 那它能执⾏此⾏以下代码,如果已经有其他线程获得锁(可能是条件锁,或者⽆条件锁),则等待,直⾄其他线程解锁。
    [conditionLock lockWhenCondition:A条件]; 表示如果没有其他线程获得该锁,但是该锁内部的condition不等于A条件,它依然不能获得锁,仍然等待。如果内部的condition等于A条件,并且没有其他线程获得该锁,则进⼊代码区,同时设置它获得该锁,其他任何线程都将等待它代码的完成,直⾄它解锁。
    [conditionLock unlockWithCondition:A条件]; 表示释放锁,同时把内部的condition设置为A条件。
    return = [conditionLock lockWhenCondition:A条件 beforeDate:A时间]; 表示如果被锁定(没获得锁),并超过该时间则不再阻塞线程。但是注意:返回的值是NO,它没有改变锁的状态,这个函数的⽬的在于可以实现两种状态下的处理。
    所谓的condition就是整数,内部通过整数⽐较条件。

    三、总结

    锁名称 描述 类型
    OSSpinLock
    dispatch_semaphore_t
    os_unfair_lock
    pthread_mutex_t 互斥锁
    pthread_mutex_t(recursive) 互斥锁(递归锁)
    NSLock 封装了pthread_mutex_t 互斥锁
    NSRecursiveLock 封装了pthread_mutex_t(recursive) 互斥锁(递归锁)
    NSCondition 封装了pthread_mutex_t 互斥锁(条件锁)
    NSConditionLock 封装了NSCondition 互斥锁(条件锁)
    @synchorized 互斥锁(递归锁)

    相关文章

      网友评论

        本文标题:iOS开发进阶:锁的分析

        本文链接:https://www.haomeiwen.com/subject/oxyvxrtx.html