美文网首页
基于pthread创建ThreadPool(线程池)和Qispa

基于pthread创建ThreadPool(线程池)和Qispa

作者: 小凉介 | 来源:发表于2018-07-28 20:48 被阅读190次

    ThreadPool简介

    我们都知道多线程有很多好处,同时创建管理维护等也有很多成本,大致有以下几个缺点:

    1. 空间成本:每个线程占用的空间512kb,意味着更多的线程需要更多的内存空间
    2. 时间成本:创建一个线程大约90毫秒
    3. 线程间的通信,多线程的数据共享,有些数据是在多个线程间共享的,需要防止线程死锁情况的发生
    4. 如果有大量的线程,会影响性能,因为操作系统需要在它们之间切换

    线程池可以节省不断创建线程和释放线程的时间,所谓线程池就是预先创建好一组线程,线程池有一个任务队列,当有一个新任务需要执行时将该任务放入到任务队列中,线程池中空闲的线程会从该队列中取出任务并执行,这样做的好处就是减少了创建线程和销毁线程带来的效率问题,在任务处理时间比较短的时候这个好处非常显著,可以提升任务处理的效率。
    Pool的任务队列是一个优先级任务队列,这意味着新加入的任务如果优先级比较高的话则将可能会被线程池中空闲线程优先考虑执行。

    线程池在多线程编程中经常要用到,其基本模型仍是生产者/消费者模型,线程池一般由线程池管理器(ThreadPool),工作线程(PoolWorker),任务( Task),任务队列(TaskQueue)四部分组成,其中

    ThreadPool:用于创建并管理线程池,包括 创建线程池,销毁线程池,添加新任务;
    PoolWorker:线程池中线程,在没有任务时处于等待状态,可以循环的执行任务;
    Task:每个任务必须实现的接口,以供工作线程调度任务的执行,它主要规定了任务的入口,任务执行完后的收尾工作,任务的执行状态等;
    taskQueue:用于存放没有处理的任务。提供一种缓冲机制。

    简单的工作流程图如下:

    Screen Shot 2018-07-28 at 3.28.16 PM.png

    pthread简介

    POSIX线程(英语:POSIX Threads,常被缩写为Pthreads)是POSIX的线程标准,定义了创建和操纵线程的一套API。
    pthread是一套通用的多线程的API,可以在Unix / Linux / Windows 等系统跨平台使用,使用C语言编写,需要程序员自己管理线程的生命周期,使用难度较大。

    可能很多iOS没有听说过pthread,确实苹果为我们提供了NSThread,使用起来比pthread更加面向对象,简单易用,可以直接操作线程对象。不过也需要需要程序员自己管理线程的生命周期(主要是创建),我们在开发的过程中偶尔使用NSThread。甚至封装了更好用的GCD和NSOperation,我们在平常开发中是没必要去使用pthread的,

    结构体定义

    Condition(任务锁)

    线程池可能会被外部或内部的多个线程同时访问,因此必须保证其线程安全。除了保证读写操作互斥外,该锁还要支持 waitsignal操作,所以这里不要使用NSLockNSCondition是可以满足我们的要求的,但是这一次我们希望尽可能都用pthread的接口。所以这次我们选择pthread_mutex_t和pthread_cond_t,我们把他们封装成Conditon让它具备类似NSCondition的功能,类似这样:

    class Condition{
        
        private var mutex = pthread_mutex_t()
        private var cont = pthread_cond_t()
        
        init() {
            pthread_mutex_init(&self.mutex, nil)
            pthread_cond_init(&self.cont, nil)
        }
        
        deinit {
            pthread_mutex_destroy(&mutex)
            pthread_cond_destroy(&cont)
        }
        
        func lock() {
            pthread_mutex_lock(&mutex)
        }
        
        func unlock() {
            pthread_mutex_unlock(&mutex)
        }
        
        func wait() {
            pthread_cond_wait(&cont, &mutex)
        }
        
        func signal() {
            pthread_cond_broadcast(&cont)
        }
    }
    
    

    Lock(互斥锁)

    我们可以用NSLok,同Condition一样我们选择自己封装

    class Lock{
        
      private var mutex = pthread_mutex_t()
      private var attr = pthread_mutexattr_t()
    
      init() {
        pthread_mutexattr_init(&attr)
        pthread_mutexattr_settype(&attr, PTHREAD_MUTEX_RECURSIVE)
        pthread_mutex_init(&mutex, &attr)
      }
    
      deinit {
        pthread_mutexattr_destroy(&attr)
        pthread_mutex_destroy(&mutex)
      }
    
      func lock() {
        pthread_mutex_lock(&mutex)
      }
    
      func unlock() {
        pthread_mutex_unlock(&mutex)
      }
    }
    
    

    Task(任务)

    Task这里我们的任务不需要做太多事情,所以我们直接选择闭包

    typealias Task = () -> Void
    
    

    taskQueue (任务队列)

    taskQueue我选择用数组,当然你也可以自己构造队列

    ThreadPriority

    用来设置线程的优先级,一共分为三个等级,分别为highnormallow

    enum ThreadPriority {
        
        case high
        case normal
        case low
    }
    
    

    ThreadPool和DispatchQueue的具体结构之后会详细介绍,所以这里先不讲了。

    ThreadPool

    接口

    Thread主要提供三个个方法:

    1. 初始化方法,可以通过初始化方法设置名称,线程池最大线程数和优先级。
    2. addTask方法,提交任务。
    3. workerThreadLoop方法,执行任务。

    ThreadPool实现

    通过最开始那个图我们应该大致能猜到需要哪些东西。首先,我们需要你一个锁用来控制任务执行,也就是我们创建的Condition,提供了和NSCondition一样的waitsigal方法。

    let lock = Condition()
    

    为了确定何时启动新的线程,需要知道当前线程池中有多少线程,有多少线程正在运行,线程池最多支持多少线程:

    var threadCount = 0  // 当前存在的线程个数
    var activeThreadCount = 0 // 当前活跃的线程个数
    var threadCountLimit = 0 // 线程池设置的最大线程个数
    
    

    为了满足高级一点的需求,这里我们提供了设置线程优先级的功能,所以我们需要一个qos

    let qos: qos_class_t  //用来设置线程优先级
    
    

    最后就是任务队列,我们选择是的数组,因为我们的需求也很简单,进队,出队,删除,判空。

    var tasksQueue = [DispatchTask]()  //用来存储任务
    

    初始化很简单就不写了,现在重点讲一讲addTask方法,主要有几点需要注意:

    1.为了保证tasksQueue的数据安全,在存入新的任务之前要先加锁

    lock.lock()
    
    tasksQueue.append(task)
    
    

    2.我们是边添加任务边创建线程,这样可以创建合适的线程数,避免一次性创建过多不需要用的线程。phread创建线程有点麻烦

    let idleThreads = threadCount - activeThreadCount
    if tasksQueue.count > idleThreads && threadCount < threadCountLimit {
        
        let holder = Unmanaged.passRetained(self)
        let pointer = UnsafeMutableRawPointer(holder.toOpaque())
        var thread: pthread_t? = nil
        var user_interactive_qos_attr = pthread_attr_t()
        pthread_attr_init(&user_interactive_qos_attr)
        pthread_attr_set_qos_class_np(&user_interactive_qos_attr, qos, 0)
        
        if pthread_create(&thread, &user_interactive_qos_attr, threadRun, pointer) == 0 && thread != nil {
            holder.release()
        }
        threadCount += 1
    }
    
    private func threadRun(arg: UnsafeMutableRawPointer) -> UnsafeMutableRawPointer? {
        let unmanaged = Unmanaged<ThreadPool>.fromOpaque(arg)
        unmanaged.takeUnretainedValue().workerThreadLoop()
        unmanaged.release()
        return nil
    }
    
    

    workerThreadLoop是线程需要执行的方法。

    接着我们需要唤醒线程开始来执行任务,至于为什么要唤醒,之后讲workerThreadLoop你就知道了。

    lock.signal()
    

    最后就是解锁了

    lock.unlock()
    

    下面我们来看看workerThreadLoop方法

    fileprivate func workerThreadLoop() {
            
            if name != nil {
                pthread_setname_np(name!.cString(using: String.Encoding.utf8)!)
            }
            
            while true {
                
                autoreleasepool {
                    let task = peekTask()
                        
                    task()
                    
                    lock.lock()
                    activeThreadCount -= 1
                    lock.unlock()
                }
            }
        }
    
    

    工作线程中主要是一个无限循环,这个和RunLoop类似。当tasksQueue为空时,线程进入休眠并等待。一旦数组不为空,该任务立即出队并开始执行。任务块执行开始后,当前活跃线程数加一;任务执行完成后,当前活跃线程数减一。这里我把任务出队列放在一个叫peekTask的方法中

    private func peekTask() -> DispatchTask{
            
            lock.lock()
    
            while tasksQueue.isEmpty {
                lock.wait()
            }
            let task = tasksQueue.removeFirst()
            activeThreadCount += 1
            
            lock.unlock()
            
            return task
        }
    

    其实我们整个ThreadPool设计是用的经典的生产者-消费者模式,当然也需要解决这个模式中的老大难的假性唤醒问题

    什么是虚假唤醒?
    举个例子,我们现在有一个生产者-消费者队列和三个线程。
    I.1号线程从队列中获取了一个元素,此时队列变为空。
    II.2号线程也想从队列中获取一个元素,但此时队列为空,2号线程便只能进入阻塞(cond.wait()),等待队列非空。
    III.这时,3号线程将一个元素入队,并调用cond.notify()唤醒条件变量。
    IV.处于等待状态的2号线程接收到3号线程的唤醒信号,便准备解除阻塞状态,执行接下来的任务(获取队列中的元素)。
    V.然而可能出现这样的情况:当2号线程准备获得队列的锁,去获取队列中的元素时,此时1号线程刚好执行完之前的元素操作,返回再去请求队列中的元素,1号线程便获得队列的锁,检查到队列非空,就获取到了3号线程刚刚入队的元素,然后释放队列锁。 (即2号线程获得队列锁,却发现队列为空)
    VI.等到2号线程获得队列锁,判断发现队列仍为空,1号线程“偷走了”这个元素,所以对于2号线程而言,这次唤醒就是“虚假”的,它需要再次等待队列非空。

    防止虚假唤醒
    cond.wait(locker, {return !q.empty(); });

    我们这里防止虚假唤醒也是采用这种方式,也就是这段:

    while tasksQueue.isEmpty {
        lock.wait()
    }
    
    

    下面我们再看看DispatchQueue吧

    DispatchQueue

    DispatchQueue接口

    这我们模拟GCD中的DispatchQueue,一共提供了四个方法:

    1. 初始化方法,构造器,可通过它创建并行或串行队列。
    2. 异步派发方法async
    3. 同步派发方法sync

    这里需要注意串行并行和同步异步,很多人容易搞混,不懂得可以看看这篇文章GCD中的串行/并行和同步/异步理解

    DispatchQueue实现

    这部分我是参照Let's Build dispatch_queue实现的,然后在原有基础上添加了优先级,pthread可以通过qos_class类型值设置优先级,所以在初始化的时候需要做个优先级的类型转换。

    let qos: qos_class_t = {
        switch priority {
        case .high:
            return QOS_CLASS_USER_INITIATED
        case .low:
            return QOS_CLASS_UTILITY
        case .normal:
            return QOS_CLASS_DEFAULT
        }
    }()
        
    threadPool = ThreadPool(name: name, threadCount: threadCout, qos: qos)
    

    分发队列和线程池一样也需要锁,但这里的锁就相对简单些,不需要signalwait,所以可以用我们构造的互斥锁Lock。

    let lock = Lock()
    

    分发队列需要依赖ThreadPool完成任务的执行,所以还需要一个线程池

    let threadPool: ThreadPool
    
    

    我们希望能提供设置队列是串行还是并行,如果是串行我们还需要记录是否有任务在执行

    let serial: Bool  //判断是串行还是并行队列
    
    var serialRunning = false
    

    如果是串行队列我们就需要在DispatchQueue中暂存任务,等到上一个任务执行完成之后再将一下个任务添加到ThreadPool中,所以我们需要一个数组存储任务

    var pendingTasks = [DispatchTask]()   //存储任务
    

    在这里我把所有DispatchQueue的线程池的最大线程数都设置成128,这是一个随机数字,没有任何意义,你可以根据自己需要设置

    let threadCout = 128
    
    

    下面来看看异步派发方法async,异步方法简单些,任务不需要被等待,可以直接添加到线程池中,只需要告诉DispatchQueue有任务就行了。

    同步派发方法就要复杂一点了,需要一个局部条件变量Condition和一个done来记录任务执行完成,获得锁任务未执行完成前condition.wait(),执行完成后condition.signal()并且condition.lock()

    func sync(execute block: @escaping DispatchTask) {
            
        let condition = Condition()  //NSCondition 的对象实际上作为一个锁和一个线程检查器:锁主要为了当检测条件时保护数据源,执行条件引发的任务;线程检查器主要是根据条件决定是否继续运行线程,即线程是否被阻塞。
        var done = false
        
        self.async {
            block()
            condition.lock() //一般用于多线程同时访问、修改同一个数据源,保证在同一时间内数据源只被访问、修改一次,其他线程的命令需要在lock 外等待,只到unlock ,才可访问
            done = true
            condition.signal()  //CPU发信号告诉线程不用在等待,可以继续执行
            condition.unlock()
        }
        
        condition.lock()
        
        while !done {
            condition.wait()  //让当前线程处于等待状态
        }
        
        condition.unlock()
    }
    
    

    asyncsync都是管理DispatchQueue中的任务执行,再来看看怎样将DispatchQueue中任务添加到ThreadPool中执行。

    func dispatchOneTask() {
            
        threadPool.addTask {
            self.lock.lock()
            let block = self.pendingTasks.removeFirst()
            self.lock.unlock()
            
            block()
            
            if self.serial {
                self.lock.lock()
                if self.pendingTasks.count > 0 {
                    self.dispatchOneTask()
                }else{
                    self.serialRunning = false
                }
                self.lock.unlock()
            }
        }
    }
    

    并行队列很简单,只需要需要对pendingTasks加锁就行了。在串行队列上,新增的代码块需要等到前一个代码块执行完后才能执行。每当一个代码块执行完后,dispatchOneBlock 会检查当前队列是否还有代码块未执行。如果有的话,它会调用自身以便最后可以执行到该代码块。如果没有,将队列的运行状态设置回false

    好了,以上就是所有的内容,最后本文的demo在这PThreadPool

    参考文章

    Let's Build dispatch_queue
    Pthread - 线程池(thread pool)实现
    Strand
    Aojet
    Signals
    Strand.swift

    相关文章

      网友评论

          本文标题:基于pthread创建ThreadPool(线程池)和Qispa

          本文链接:https://www.haomeiwen.com/subject/wapomftx.html