美文网首页
线程池实现

线程池实现

作者: 谭英智 | 来源:发表于2024-05-01 13:31 被阅读0次

使用方法

  • 创建一个拥有五个线程的线程池
thread_pool pool(5);
  • 提交一个任务到线程池执行
int i = 10; //初始化任务需要的变量
auto f1 = pool.submit_task([i] {  
    sleep(i);
}); //通过lamda函数提交任务
f1.get();  //等待任务执行完成
  • 取消线程池中的所有任务
pool.stop();

具体实现

#include <chrono>          
#include <condition_variable> 
#include <cstddef>           
#include <functional>         
#include <future>            
#include <memory>             
#include <mutex>              
#include <queue>              
#include <thread>             
#include <type_traits>        
#include <utility>            
#include <vector>            
#include <iostream>
#include <unistd.h> 

class thread_pool
{
public:
    thread_pool(int size) {
        m_threads.resize(size);
        for(int i=0; i<size; ++i) {
            auto thread_ins = std::thread(&thread_pool::worker, this);
            m_threads.push_back(std::move(thread_ins));
        }
        
    }
    ~thread_pool() {
        m_stop = true;
        task_available_cv.notify_all();
        for(auto& thread_ins: m_threads) {
            if(thread_ins.joinable()) {
                thread_ins.join();
            }
        }
    }

    template <class F, class... Args>
    std::future<typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type> submit_task(F&& task)
    {
        using R = typename std::result_of<typename std::decay<F>::type(typename std::decay<Args>::type...)>::type;
        const std::shared_ptr<std::packaged_task<R(typename std::decay<Args>::type...)>> task_promise = std::make_shared<std::packaged_task<R(typename std::decay<Args>::type...)>>(task);

        detach_task(
            [task_promise] {
                (*task_promise)();
            } 
        );
        
        return task_promise->get_future();
    }
    void stop() {
        m_stop = true;
        task_available_cv.notify_all();
    }

private:
    template <class F, class... Args>
    void detach_task(F&& task)
    {
        {
            std::lock_guard<std::mutex> tasks_lock(tasks_mutex);
            tasks.emplace(std::forward<F>(task));
        }
        task_available_cv.notify_one();
    }
    void worker() {
        while(true) {
            std::unique_lock<std::mutex> tasks_lock(tasks_mutex);
            task_available_cv.wait_for(tasks_lock, std::chrono::milliseconds(1000), [this] { return !tasks.empty() || m_stop;});
            
            if(m_stop) {
                break;
            }
            if(tasks.empty()) {
                continue;
            }
            const std::function<void()> task = std::move(tasks.front());
            tasks.pop();
            tasks_lock.unlock();
            task();
        }
    }
private:
    mutable std::mutex tasks_mutex = {};
    std::condition_variable task_available_cv = {};
    std::queue<std::function<void()>> tasks = {};
    std::vector<std::thread> m_threads;
    bool m_stop = false;
};

相关文章

网友评论

      本文标题:线程池实现

      本文链接:https://www.haomeiwen.com/subject/qdntfjtx.html