美文网首页C++多线程
用C++11实现一个简单的线程池

用C++11实现一个简单的线程池

作者: 琼蘂无徵朝霞难挹 | 来源:发表于2019-08-22 15:28 被阅读0次

    0.为什么需要线程池?

    当我们需要完成一些持续时间短、发生频率高的工作时,每次为他们开启一个线程既显得繁琐又会造成不必要的开销,所以为这一类工作写一个简单的线程池就很有必要了。

    1.如何设计我们的线程池?

    考虑这样一个应用场景,我们有一个设计软件,当我们需要下载一些模板用作接下来的设计工作。我们显然不能将下载工作放在主线程去完成,如果你的用户网络速度飞快还好说,一旦他的网络稍有波动,你的设计软件将会停止GUI界面的渲染,卡在那边不懂,这显然不是好的体验。正确的做法是另外开启一个线程去下载,一旦这个下载工作完成后,我们可以通知主线程去做一个提示,例如弹出一个对话框告诉用户模板已经可以用了。
    如果这个设计软件需要频繁的做下载工作,那么每次单开一个线程就比较麻烦。我们可以将下载任务打包好,一个一个的推送到队列中,由线程池自动从队列中获取任务,完成下载工作。当队列为空时,线程池里面的线程就进入等待状态,这也不会造成资源的占用。

    2.先完成队列的设计

    STL中提供了一个关于队列的实现std::queue,但是std::queue并不是线程安全的,我们需要在此基础上进行一些包装,让它在多线程的情况下也可以使用。
    我们的队列需要拥有这几个功能:
    1.从队列里弹出一个任务
    2.向队列里推送任务
    3.了解队列里的任务还有多少个
    4.清除任务队列
    下面是一个简单的线程安全队列:

    namespace multi_thread {
        template<typename Task>
        class thread_safe_queue {
        public:
            thread_safe_queue() :done_(false) {
    
            }
    
            void push(const Task& t) {
                std::lock_guard<std::mutex> lock(mtx_);
                queue_.push(t);
                ready_.notify_one();
            }
    
            void wait_and_pop(Task& t) {
                std::unique_lock<std::mutex> lock(mtx_);
                if (queue_.empty() && !done_)
                    ready_.wait(lock);
                if (done_)
                    return;
                t = queue_.front();
                queue_.pop();
            }
    
            bool empty()const {
                std::lock_guard<std::mutex> lock(mtx_);
                return queue_.empty();
            }
    
            void clear() {
                std::lock_guard<std::mutex> lock(mtx_);
                for (int i = 0; i < queue_.size(); ++i)
                    queue_.pop();
            }
    
            void done() {
                done_ = true;
                ready_.notify_all();
            }
        private:
            std::queue<Task> queue_;
            mutable std::mutex mtx_;
            std::condition_variable ready_;
            std::atomic_bool done_;
        };
    }
    

    简单讲解一下:
    我们的线程安全队列thread_safe_queue遵循了STL的命名规范,对于任意容器,他们的判断为空的函数签名都是bool empty()const;,清除容器中的内容的都为void clear()。对于推入任务的函数,我们跟std::queue保持了一致,而弹出函数的命名我们做了一些改变,与函数的操作更加切合,表示对std::queue::pop()的一种扩展。

    这里我们采用了std::mutex作为我们的互斥量,每次访问队列中的内容时,都用std::lock_guard或者std::unique_lock加锁,保证在同一时段只有一个线程可以读写队列。std::lock_guard是一个方便的RAII的体现,他会在析构时自动调用unlock()std::unique_lock与之的区别就是,你可以显式调用unlock()函数完成解锁,它更加的灵活,当然你可以用{}std::lock_guard完成相同的工作。我们将std::mutex_设计成mutable,是因为我们需要在const member functionempty()中使用它。
    push函数中,每次完成push后就调用std::conditional_variable::notify_one()来唤醒一个处于等待状态的线程,通知他队列中已经有任务了,可以开始弹出了。在对应了wait_and_pop函数中,同样有一个std::conditional_variable::wait(),这个函数可以传入一个或者两个参数,第一个参数表示要释放的锁,第二个函数是一个predicate,不接受参数,返回值为bool。他会在调用wait时首先调用这个predicate,如果值是true,那么就会重新获得锁并继续向下执行,如果值是false,他会释放锁并且进入等待状态,直到接收到前面所说的唤醒通知,他才会重新检查predicate并做前述操作。这里我们没有使用predicate,仅使用了简单的版本,他会在接收到唤醒通知时直接向下执行。
    再来看最后一个变量std::atomic_bool,它是标准库提供的一个原子类型,在访问时标准保证在同一时刻只有一个线程在读写他。原子类型是不可拷贝且不可移动的,所以我们不能对他进行类内初始化,这会调用copy constructor,正确的方法是使用constructor initializer list进行初始化,当然你也可以在构造函数体内使用赋值运算符给其赋上一个值,但是请务必在这两个中间选择一个。回到代码,这个变量的作用很简单,就是让弹出的函数wait_and_pop直接结束,我们只有在需要析构这个线程池时才会这么做,具体做法在线程池中细说。

    3.完成线程池的设计

    在启用线程池时,我们需要指定在线程池中存放的线程的数量,当然我们也可以在后续为线程池添加新的线程。
    我们将使用STL提供的容器std::vector来存放我们的线程,这些线程做的工作都是一致的,他们将在队列中不断地弹出任务并执行它,如果队列中没有任务了,他们就将进入等待状态。
    结合上面的队列实现,给出一个简单的线程池实现:

    namespace multi_thread {
        template<typename Task>
        class thread_safe_queue {
        public:
            thread_safe_queue() :done_(false) {
    
            }
    
            void push(const Task& t) {
                std::lock_guard<std::mutex> lock(mtx_);
                queue_.push(t);
                ready_.notify_one();
            }
    
            void wait_and_pop(Task& t) {
                std::unique_lock<std::mutex> lock(mtx_);
                if (queue_.empty() && !done_)
                    ready_.wait(lock);
                if (done_)
                    return;
                t = queue_.front();
                queue_.pop();
            }
    
            bool empty()const {
                std::lock_guard<std::mutex> lock(mtx_);
                return queue_.empty();
            }
    
            void clear() {
                std::lock_guard<std::mutex> lock(mtx_);
                for (int i = 0; i < queue_.size(); ++i)
                    queue_.pop();
            }
    
            void done() {
                done_ = true;
                ready_.notify_all();
            }
        private:
            std::queue<Task> queue_;
            mutable std::mutex mtx_;
            std::condition_variable ready_;
            std::atomic_bool done_;
        };
    
        class thread_pool {
        public:
            thread_pool(const int threadNum) :done_(false) {
                for (int i = 0; i < threadNum; ++i) {
                    threads_.emplace_back(&thread_pool::workerThread, this);
                }
            }
            ~thread_pool() {
                done_.store(true);
                queue_.done();
                for (auto& thread : threads_) {
                    if (thread.joinable())
                        thread.join();
                }
            }
            void submit(const std::function<void(void)>& t) {
                queue_.push(t);
            }
    
            bool isEmpty()const {
                return queue_.empty();
            }
    
            void clearTask() {
                queue_.clear();
            }
        private:
            void workerThread() {
                while (true) {
                    std::function<void(void)> t;
                    queue_.wait_and_pop(t);
                    if (done_)
                        break;
                    if (t)
                        t();
                }
            }
        private:
            std::vector<std::thread> threads_;
            thread_safe_queue<std::function<void(void)>> queue_;
            std::atomic_bool done_;
        };
    }
    

    队列里存放的是std::function<void(void)>,是一个没有参数和返回值的Callable。在C++中,常用的Callable包括lambda表达式、function object(也就是operator())、std::bindstd::function。我个人比较喜欢使用lambda和std::bind,他们都很简单易用,在使用std::bind时,如果你的函数是一个member function,别忘了把this作为第一个参数。
    这个线程池非常简单,当你调用构造函数时,传入一个你想要的线程数量,然后构造函数体里就会开启对应数量的线程。std::thread接受一个Callable和它的参数作为构造函数的参数,构造函数完成后线程立即启动,这里的workerThread是一个member function,所以,我们还得加上this作为参数,跟std::bind一样。
    workerThread函数里面循环地从队列中获取任务并执行,如果没有任务,他就会处于等待状态。可以看到这里同样有一个std::atomic_bool的变量done_,它用于使线程结束。我们会在线程池的析构函数里将这个变量设为true,然后我们调用队列的done函数,让队列退出。最后我们逐个检查线程池中的线程,如果他们还在运行,等待他们结束。一个std::thread在析构前必须指定等待该线程结束还是分离该线程,否则会发生错误。如果我们不做这些工作,那么线程池在析构时要么是其中的线程一直处于等待状态,要么是一直处于循环状态,这会导致析构函数停滞不前,也就是程序的卡死。

    4.总结

    这次实现的线程池只能说是实现了最基础的功能,还缺少一些功能,例如任务返回值的缺失等。本文实现的线程安全队列也十分简单,还有很多可以优化的地方。

    相关文章

      网友评论

        本文标题:用C++11实现一个简单的线程池

        本文链接:https://www.haomeiwen.com/subject/vdbgsctx.html