美文网首页
C++11 实现线程池

C++11 实现线程池

作者: leon_tly | 来源:发表于2024-09-05 23:00 被阅读0次

    线程池

    在程序开始运行前创建多个线程这样,程序在运行时,只需要从线程池中拿来用就可以了.大大提高了程序运行效率。

    1. 线程池中预先设置了多个线程。
    2. 线程池中有一个任务队列,用于存储函数。
    3. 线程池中有一个push任务的函数,讲需要在线程中调用的函数以及函数参数push到队列中。
    4. 线程池中不断监听任务队列,利用信号量机制判断是否有任务到队列中。监听到任务则执行。

    线程池的实现

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <string>
    #include <condition_variable>
    #include <queue>
    #include <vector>
    #include <functional>
    
    class ThreadPool{
    public:
        ThreadPool(int num) : stop(false)
        {
            for (int i = 0; i < num; i++)
            {
                threads.emplace_back([this](){
                    while(true)
                    {
                        std::unique_lock<std::mutex> lock(mtx);
                        cv.wait(lock, [this]{
                            return !tasks.empty() || stop;
                        });
                        if (stop && tasks.empty())
                        {
                            return;
                        }
                        std::function<void()> task(std::move(tasks.front()));
                        tasks.pop();
                        lock.unlock();
                        task();
                    }   
                });
            }
        }
        ~ThreadPool()
        {
            {
                std::unique_lock<std::mutex> lock(mtx);
                stop = true;
            }
            cv.notify_all();
            for (auto& t : threads)
            {
                t.join();
            }
        }
        
        // not undersanding ...
        template<typename F, typename... Args>  
        void enqueue(F&& f, Args&&... args)
        {
            std::function<void()> task = 
                std::bind(std::forward<F>(f), std::forward<Args>(args)...);
            {
                std::unique_lock<std::mutex> lock(mtx);
                tasks.emplace(std::move(task));
            }
            cv.notify_one();
        }
        
    private:
        bool stop;
        /* threads list*/
        std::vector<std::thread> threads;
        /* task list*/
        std::queue<std::function<void()>> tasks;
        
        std::mutex mtx;
        std::condition_variable cv;
    };
    
    
    void task(std::string name, int data)
    {
        std::cout << "name : " << name << ", data : " << data << std::endl;
    }
    
    int main()
    {
        ThreadPool pool(4);
        for(int i = 0; i < 10; i++)
        {
            pool.enqueue(task, "task", i);
        }
        return 0;
    }
    

    带有返回结果的线程池

    #include <iostream>
    #include <vector>
    #include <queue>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <functional>
    #include <future>
    
    class ThreadPool {
    public:
        ThreadPool(size_t numThreads) : stop(false) {
            for (size_t i = 0; i < numThreads; ++i) {
                workers.emplace_back([this] {
                    while (true) {
                        std::function<void()> task;
                        {
                            std::unique_lock<std::mutex> lock(this->queueMutex);
                            this->condition.wait(lock, [this] { return this->stop || !this->tasks.empty(); });
                            if (this->stop && this->tasks.empty()) {
                                return;
                            }
                            task = std::move(this->tasks.front());
                            this->tasks.pop();
                        }
                        task();
                    }
                });
            }
        }
    
        template<class F, class... Args>
        auto enqueue(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
            using return_type = decltype(f(args...));
            auto task = std::make_shared<std::packaged_task<return_type()>>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
            std::future<return_type> res = task->get_future();
            {
                std::unique_lock<std::mutex> lock(queueMutex);
                if (stop) {
                    throw std::runtime_error("enqueue on stopped ThreadPool");
                }
                tasks.emplace([task]() { (*task)(); });
            }
            condition.notify_one();
            return res;
        }
    
        ~ThreadPool() {
            {
                std::unique_lock<std::mutex> lock(queueMutex);
                stop = true;
            }
            condition.notify_all();
            for (std::thread &worker : workers) {
                worker.join();
            }
        }
    
    private:
        std::vector<std::thread> workers;
        std::queue<std::function<void()>> tasks;
        std::mutex queueMutex;
        std::condition_variable condition;
        bool stop;
    };
    
    // Example function to be executed in the thread pool
    int add(int a, int b) {
        return a + b;
    }
    
    int main() {
        ThreadPool pool(4);
    
        // Enqueue tasks and get return values
        std::future<int> result1 = pool.enqueue(add, 2, 3);
        std::future<int> result2 = pool.enqueue(add, 5, 7);
    
        // Get the results
        std::cout << "Result1: " << result1.get() << std::endl;
        std::cout << "Result2: " << result2.get() << std::endl;
    
        return 0;
    }
    
    

    std::packaged_task 说明

    相关文章

      网友评论

          本文标题:C++11 实现线程池

          本文链接:https://www.haomeiwen.com/subject/zledljtx.html