美文网首页JAVA
手写线程池: thread pool with modern c

手写线程池: thread pool with modern c

作者: 张夜白哥哥 | 来源:发表于2019-02-15 19:42 被阅读0次

    C++线程池

    进程的创建和销毁,代价是昂贵的,除去操作系统的实现及其本身的原理,跟线程相比它更显得重量级。
    这几年互联网的迅速发展,让线程正面临着跟进程一样的“重量级”困扰。尤其是GO等语言推出协程(纤程)
    后,线程更是不堪其重。那么有没有改进的方向呢?有,将线程池化——线程池。

    由于C++版本推进的历程(C++98, C++03, C++11, C++14, C++17, C++20)以及其弱鸡般的ABI兼容性,导致很多框架用起来得自己造轮子。C++版本再吐槽一句好了, C++即使版本推进到0xff, 对很多人来说还是c with class, 包括我。
    我们的目标是, 造一个很Java中的ThreadPool类似的线程池。目前的进度:

    • [x] 极其简易的线程池(header only)
    • [x] 支持设置线程池核心线程数
    • [ ] 支持设置线程池最大线程数
    • [ ] 支持设置最大缓存的任务数
    • [ ] 支持设置任务提交拒绝策略


    线程池中的概念:

    • job: 需要在线程中执行的代码
      • 例如: void read(const string & path, const HANDLE handle);
      • 该函数从文本中读取内容然后交给窗口渲染到界面上
    • task: 将job封装成一个task, 由于job的函数签名各异,所以需要封装(Java的job是Runnable,接口签名一致)。
      • 例如: auto task = [=]()->void { return read(path, handle); }
      • 这样就将签名各异的job统一封装成了std::function<void()>类型的task
      • 通过std::packaged_task和std::future处理job被异步调用的返回值
    • queue: 缓存task的队列, 队列操作和线程池中的线程耦合度很高, 原因如下:
      • 队列中的任务少时, 池中的空闲线程如何做到真正的不占用cpu?
        • 目前此项目是通过std::condition_variable的条件判断让空闲线程阻塞从而让出cpu
        • Java中是通过实现BlockQueue实现的,也就是队列中没有任务时,线程从队列中get会阻塞, 从而让出cpu
        • 也可以通过信号量 互斥量实现
      • 队列read write操作时, 可根据现实情况实现读优先、写优先的锁来平衡队列task的生产和消费, 目前此项目不支持
      • 设置queue的最大缓存task数
      • 为什么采取队列, 是为了保证task被执行的优先级(队列可以保证先提交的task被先执行,但是不保证先提交的task被先执行完)
    • thread: 采用C++11 标准库中的std::thread
      • 根据std::thread::hardware_concurrency()获取cpu数量进行任务cpu友好性优化, 目前此项目不支持
      • 设置thread的cpu亲和性优化程序执行(Windows平台:通过SetThreadAffinityMask指定线程在cpu哪个核心上运行)

    Code

    Code On GitHub

    #ifndef _THREAD_POOL_H
    #define _THREAD_POOL_H
    
    #include <memory>
    #include <functional>
    #include <future>
    #include <condition_variable>
    #include <queue>
    #include <algorithm>
    #include <vector>
    #include <thread>
    #include <typeinfo>
    /**
     * 几个需要注意的点:
     * 1、tasks的读写锁需要优化成带优先级的锁, 可以肯定线程池的绝大部分使用场景commit task比run task更密集
     * 2、根据tasks以及cpu扩展线程数
     * 3、支持允许缓存的task数,如果超出此数将采取拒绝策略
     * 4、拒绝策略
    */
    class ThreadPool{
    public:
    
        ThreadPool(int core, int max = 0, int cache = 0): core(core),//由于max和cache暂时没用到,因此赋值0
                        max(max), cache(cache), quit(false), force(false){
            
        }  
    
        ~ThreadPool(){
            this->quit.store(true);
            this->enable.notify_all();
            std::for_each(this->pool.begin(), this->pool.end(), [](std::thread & t){
                if(t.joinable()){
                    t.join();
                }
            });
        }
    public:
        void start(){
            for(auto idx = 0; idx < core; ++idx){
                pool.push_back(std::thread([this](){
                    // 第一次退出,判断是否要强制退出
                    bool quit = this->force.load() ? this->quit.load() : false;
                    for(; !quit;){
                        std::unique_lock<std::mutex> lock(this->oper_lock);
                        this->enable.wait(lock, [this](){
                            return this->quit.load() || !this->tasks.empty();
                        });
                        // 不是强制退出时可从这里退出
                        if(this->quit.load() && this->tasks.empty()){
                            return;
                        }
                        std::function<void()> task = std::move(this->tasks.front());
                        this->tasks.pop();
    
                        task();
                    }
                }));
            }
        }
    
        void shutdown(bool force = false){
            this->quit.store(true);
            this->force.store(force);
        }
    
        //void commit(std::function<void (void * param)> task);
        template<class T, class... Args>
        auto commit(T && t, Args&&...args)->std::future<decltype(t(args...))>{
            using TYPE = decltype(t(args...));
            if(this->quit.load()){
                //dont know return what, so throw an exception
                throw std::runtime_error("thread pool is alreay shutdown.");
            }
            // 1、std::packaged_task<decltype(f(args...))() 类似std::function\
                但是会将其封装的可调用元素的结果封装在std::future中
            // 2、std::make_shared 创建std::packaged_task<decltype(f(args...))()\
                类型的智能指针
            // 3、std::bind(std::forward<T>(t), std::forward<Args>(args)...)当做\
                std::packaged_task的构造参数
            auto task = std::make_shared<std::packaged_task<TYPE()> >( 
                std::bind(std::forward<T>(t), std::forward<Args>(args)...)
            );
            std::future<TYPE> result = task->get_future();
            std::lock_guard<std::mutex> lock(this->oper_lock);
            //将packaged_task 包裹在一个签名为void()的lambda函数中调用,因为此lambda函数符合std::function<void()>\
                的签名,所以可以放到queue中
            this->tasks.emplace([task](){
                (*task)();  //调用packaged_task
            });
            this->enable.notify_one();  // 在线程池中唤醒一个休眠的线程
            return result;
        }
    private:
        //void move();
    
    
    
    private:
        std::vector<std::thread> pool;
        std::queue<std::function<void()> > tasks;
        int core;   //线程池核心线程数
        int max;    //线程池根据tasks量以及cpu数最大可扩展的量
        int cache;  //运行tasks可缓存的最大task数,超出次数后commit将采取拒绝策略
    
        std::atomic<bool> quit;     //线程池shutdown条件, true时shutdown
        std::atomic<bool> force;    //是否强制shutdown,true时有剩余的task将不执行直接退出, false时等待执行完所有的task再退出
        std::condition_variable enable;     //
        std::mutex oper_lock;   // queue的读写锁
    };
    #endif
    

    Test Code

    #include <iostream>
    #include <algorithm>
    #include <random>
    #include <chrono>
    
    #include "./pool/ThreadPool.hpp"
    
    int main(int argc, char** argv){
        ThreadPool pool(4);
        pool.start();
        std::default_random_engine rd;
        std::uniform_int_distribution<int> rang(100, 1000);
        for(int idx = 0; idx < 20; ++idx){
            pool.commit([=](int x, int y, int t){
                std::cout << "thread id : " << std::this_thread::get_id() 
                    << " x = " << x << " y = " << y <<
                    " sleep time = " << t << " ms" <<
                    " id = " << idx << std::endl;
                    
                std::this_thread::sleep_for(std::chrono::milliseconds(t));
            }, rang(rd), rang(rd), rang(rd));
        }
        std::vector<std::future<int> > results;
        for (auto index = 20; index < 50; ++index){
            results.push_back(
                pool.commit([=]()->int{
                    return index;
                })
            );
        }
        for ( auto & r : results){
            std::cout << "get result from thread "
            << " index = " << r.get() << std::endl;
        }
        char command = std::cin.get();
        if (command == 'q'){
            pool.shutdown(true);
        }else if (command == 'e'){
            pool.shutdown(true);
            try
            {
                pool.commit([](){
                    std::cout << "i want to get an exception" << std::endl;
                });
            }
            catch(const std::exception& e)
            {
                std::cerr << e.what() << '\n';
            } 
        }
        std::cout << "test finish, OY!" << std::endl;
        return 0;
    }
    

    Compile & Link

    g++ -g -O3 -Wall -std=c++11 main.cpp -o ./out/test
    

    Run

    thread id : 0x70000a352000 x = 242 y = 937 sleep time = 480 ms id = 0
    thread id : 0x70000a352000 x = 340 y = 390 sleep time = 692 ms id = 1
    thread id : 0x70000a352000 x = 188 y = 294 sleep time = 738 ms id = 2
    thread id : 0x70000a352000 x = 390 y = 978 sleep time = 270 ms id = 3
    thread id : 0x70000a4db000 x = 432 y = 780 sleep time = 102 ms id = 4
    thread id : 0x70000a458000 x = 652 y = 661 sleep time = 498 ms id = 5
    thread id : 0x70000a3d5000 x = 839 y = 452 sleep time = 487 ms id = 6
    thread id : 0x70000a352000 x = 698 y = 540 sleep time = 183 ms id = 7
    thread id : 0x70000a4db000 x = 157 y = 983 sleep time = 638 ms id = 8
    thread id : 0x70000a3d5000 x = 232 y = 823 sleep time = 766 ms id = 9
    thread id : 0x70000a458000 x = 801 y = 411 sleep time = 314 ms id = 10
    thread id : 0x70000a458000 x = 359 y = 912 sleep time = 294 ms id = 11
    thread id : 0x70000a458000 x = 260 y = 142 sleep time = 372 ms id = 12
    thread id : 0x70000a458000 x = 618 y = 499 sleep time = 831 ms id = 13
    thread id : 0x70000a458000 x = 108 y = 319 sleep time = 376 ms id = 14
    thread id : 0x70000a3d5000 x = 870 y = 490 sleep time = 519 ms id = 15
    thread id : 0x70000a352000 x = 446 y = 998 sleep time = 496 ms id = 16
    thread id : 0x70000a3d5000 x = 321 y = 308 sleep time = 610 ms id = 17
    thread id : 0x70000a3d5000 x = 247 y = 256 sleep time = 629 ms id = 18
    thread id : 0x70000a3d5000 x = 186 y = 484 sleep time = 703 ms id = 19
    get result from thread  index = 20
    get result from thread  index = 21
    get result from thread  index = 22
    get result from thread  index = 23
    get result from thread  index = 24
    get result from thread  index = 25
    get result from thread  index = 26
    get result from thread  index = 27
    get result from thread  index = 28
    get result from thread  index = 29
    get result from thread  index = 30
    get result from thread  index = 31
    get result from thread  index = 32
    get result from thread  index = 33
    get result from thread  index = 34
    get result from thread  index = 35
    get result from thread  index = 36
    get result from thread  index = 37
    get result from thread  index = 38
    get result from thread  index = 39
    get result from thread  index = 40
    get result from thread  index = 41
    get result from thread  index = 42
    get result from thread  index = 43
    get result from thread  index = 44
    get result from thread  index = 45
    get result from thread  index = 46
    get result from thread  index = 47
    get result from thread  index = 48
    get result from thread  index = 49
    e
    thread pool is alreay shutdown.
    test finish, OY!
    

    相关文章

      网友评论

        本文标题:手写线程池: thread pool with modern c

        本文链接:https://www.haomeiwen.com/subject/xvgmeqtx.html