美文网首页
C+++ 多线程

C+++ 多线程

作者: 何亮hook_8285 | 来源:发表于2022-11-20 13:09 被阅读0次

    C++11线程编程的内容

    thread、mutux、atomic、condition_variable、unique_lock、semaphore、future等。

    c++11多线程理论

    多线程基本知识、线程互斥、线程同步、原子操作、CAS等。

    并发和并行

    • CPU单核
    • CPU多核、多CPU

    并发

    单核上,多个线程占用不同的CPU时间片段,物理上还是串行执行的,但是由于每个线程占用的CPU时间片非常短(比如10ms),看起来就像多个线程都在同时执行一样,这样的场景称作并发(concurrent)。

    并行

    在多核或者多CPU上,多个线程是真正的同时执行,这样的场景称为并行(parallel)。

    使用多线程考虑

    编写程序使用多线程,需要考虑程序是IO密集型还是CPU密集型。

    程序是IO密集型

    IO密集型程序涉及一些IO操作,比如设备、文件、网络操作时存在阻塞问题,那就是IO密集型。

    IO密集型的程序建议将线程数是CPU核心数两倍,通过线程上下文切换,换取处理速度。

    程序是CPU密集型

    多线程存在上下文切换,是额外的花销,线程越多上下文切换所花费的额外时间也越多,倒不如控制线程的数量进行计算。

    CPU密集型的程序建议将线程数设置为CPU核心数。

    线程的消耗

    为了完成任务,创建很多的线程可以吗?线程真的是越多越好?

    • 线程的创建和销毁都是非常"重"的操作。
    • 线程栈本身占用大量内存。
    • 线程的上下文切换要占用大量时间。
    • 大量线程同事唤醒会使系统经常出现锯齿状态或瞬间负载量很大导致宕机。

    线程池的优势

    操作系统上创建线程和销毁线程都是很“重”的操作,耗时耗性能都比较多,那么程序执行的过程中,如果业务量比较大,实时的去创建线程、执行业务、业务完成后销毁线程,那么会导致系统实时性能降低,业务的处理能力也会降低。

    线程池的优势就是(每个池都有自己的优势),在程序进程启动之初,就事先创建好线程池里的线程,当业务流量到来时需要分配线程,直接从线程池中获取一个空闲线程执行task任务即可,task执行完成后,也不用释放线程,而是把线程归还到线程池中继续给后续的task提供服务。

    fixed模式线程池

    线程池里面的线程个数是固定不变的,一般是threadPool创建时根据当前机器的CPU核心数量进行指定。

    cached模式线程池

    线程池里面的线程个数是可动态增长的,根据任务的数量动态的增加线程的数量,但是会设置一个线程数量的阈值,任务处理完成,如果动态增长的线程空闲了60s还没有处理其他任务,那么关闭线程,保持池中最初数量的线程即可。

    线程同步

    • 线程互斥
      • 互斥锁metux
      • atomic原子类型
    • 线程通信
      • 条件变量condition_variable
      • 信号量semaphore

    C++线程实现

    //引入线程头文件
    #include <thread>
    #include <chrono>
    #include <iostream>
    #include <string>
    
    void run(int num, std::string str)
    {
        //get_id 获取线程id
        std::cout << std::this_thread::get_id()  << std::endl;
    }
    
    int main()
    {
        std::thread t1(run,1,"hello world");
        //等待子线程对任务函数执行完成
        t1.join();
        //与主线程分离
        //t1.detach();
        // 让主线程休眠
        std::this_thread::sleep_for(chrono::seconds(5));
        //获取CPU的核心数
        int num = std::thread::hardware_concurrency();
        std::cout << "CPU number: " << num << std::endl;
    }
    
    

    this_thread函数

    调用命名空间 std::this_thread 中的 get_id() //方法可以得到当前线程的线程 ID
    this_thread::sleep_for(chrono::seconds(1));// 当前线程休眠
    
    // 获取当前系统时间点
    auto now = chrono::system_clock::now();
    // 时间间隔为2s
    chrono::seconds sec(2);
    // 当前时间点之后休眠两秒
    this_thread::sleep_until(now + sec); //指定线程阻塞到某一个指定的时间点 time_point类型,之后解除阻塞
    
    //在线程中调用这个函数之后,处于运行态的线程会主动让出自己已经抢到的 CPU 时间片,最终变为就绪态,这样其它的线程就有更大的概率能够抢到 CPU 时间片了。使用这个函数的时候需要注意一点,线程调用了 yield () 之后会主动放弃 CPU 资源,但是这个变为就绪态的线程会马上参与到下一轮 CPU 的抢夺战中,不排除它能继续抢到 CPU 时间片的情况,这是概率问题。
    this_thread::yield();
    

    std::mutex

    //于给临界区加锁,并且只能有一个线程获得锁的所有权
    std::mutex
    
    //可以简化mutex互斥锁 lock() 和 unlock() 的写法,同时也更安全
    std::lock_guard
    
    //递归互斥锁 std::recursive_mutex 允许同一线程多次获得互斥锁,可以用来解决同一线程需要多次获取互斥量时死锁的问题
    std::recursive_mutex
    
    //std::timed_mutex 是超时独占互斥锁,主要是在获取互斥锁资源时增加了超时等待功能,因为不知道获取锁资源需要等待多长时间,为了保证不一直等待下去,设置了一个超时时长,超时后线程就可以解除阻塞去做其他事情了。
    std::timed_mutex
    

    原子性

    CAS 简介
    它的英文全称是 Compare-And-Swap,中文叫做“比较并交换”,它是一种思想、一种算法

    在多线程的情况下,各个代码的执行顺序是不能确定的,所以为了保证并发安全,我们可以使用互斥锁。而 CAS 的特点是避免使用互斥锁,当多个线程同时使用 CAS 更新同一个变量时,只有其中一个线程能够操作成功,而其他线程都会更新失败。不过和同步互斥锁不同的是,更新失败的线程并不会被阻塞,而是被告知这次由于竞争而导致的操作失败,但还可以再次尝试

    CAS 被广泛应用在并发编程领域中,以实现那些不会被打断的数据交换操作,从而就实现了无锁的线程安全

    atomic及部分函数

    atomic是c++11推出的原子变量,使用需要C++11及更高标准,包含的头文件为#include<atomic>

    atomic变量

    std::atomic<int> a(0);
    std::atomic<char> b('0');
    
    #include <iostream>
    #include <thread>
    #include <atomic>
    #include <functional>
    using namespace std;
    
    struct Counter
    {
        void increment()
        {
            for (int i = 0; i < 10; ++i)
            {
                m_value++;
                cout << "increment number: " << m_value
                    << ", theadID: " << this_thread::get_id() << endl;
                this_thread::sleep_for(chrono::milliseconds(500));
            }
        }
    
        void decrement()
        {
            for (int i = 0; i < 10; ++i)
            {
                m_value--;
                cout << "decrement number: " << m_value
                    << ", theadID: " << this_thread::get_id() << endl;
                this_thread::sleep_for(chrono::milliseconds(500));
            }
        }
        // atomic<int> == atomic_int
       volatile atomic_int m_value = 0;
    };
    
    int main()
    {
        Counter c;
        auto increment = bind(&Counter::increment, &c);
        auto decrement = bind(&Counter::decrement, &c);
        thread t1(increment);
        thread t2(decrement);
    
        t1.join();
        t2.join();
    
        return 0;
    }
    

    C++ 线程同步之条件变量

    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <list>
    #include <functional>
    #include <condition_variable>
    using namespace std;
    
    class SyncQueue
    {
    public:
        SyncQueue(int maxSize) : m_maxSize(maxSize) {}
        
        void put(const int& x)
        {
            unique_lock<mutex> locker(m_mutex);
            // 判断任务队列是不是已经满了
            while (m_queue.size() == m_maxSize)
            {
                cout << "任务队列已满, 请耐心等待..." << endl;
                // 阻塞线程
                m_notFull.wait(locker);
            }
            // 将任务放入到任务队列中
            m_queue.push_back(x);
            cout << x << " 被生产" << endl; 
            // 通知消费者去消费
            m_notEmpty.notify_one();
        }
    
        int take()
        {
            unique_lock<mutex> locker(m_mutex);
            while (m_queue.empty())
            {
                cout << "任务队列已空,请耐心等待。。。" << endl;
                m_notEmpty.wait(locker);
            }
            // 从任务队列中取出任务(消费)
            int x = m_queue.front();
            m_queue.pop_front();
            // 通知生产者去生产
            m_notFull.notify_one();
            cout << x << " 被消费" << endl;
            return x;
        }
    
        bool empty()
        {
            lock_guard<mutex> locker(m_mutex);
            return m_queue.empty();
        }
    
        bool full()
        {
            lock_guard<mutex> locker(m_mutex);
            return m_queue.size() == m_maxSize;
        }
    
        int size()
        {
            lock_guard<mutex> locker(m_mutex);
            return m_queue.size();
        }
    
    private:
        list<int> m_queue;     // 存储队列数据
        mutex m_mutex;         // 互斥锁
        condition_variable m_notEmpty;   // 不为空的条件变量
        condition_variable m_notFull;    // 没有满的条件变量
        int m_maxSize;         // 任务队列的最大任务个数
    };
    
    int main()
    {
        SyncQueue taskQ(50);
        auto produce = bind(&SyncQueue::put, &taskQ, placeholders::_1);
        auto consume = bind(&SyncQueue::take, &taskQ);
        thread t1[3];
        thread t2[3];
        for (int i = 0; i < 3; ++i)
        {
            t1[i] = thread(produce, i+100);
            t2[i] = thread(consume);
        }
    
        for (int i = 0; i < 3; ++i)
        {
            t1[i].join();
            t2[i].join();
        }
    
        return 0;
    }
    
    

    call_once

    在某些特定情况下,某些函数只能在多线程环境下调用一次,比如:要初始化某个对象,而这个对象只能被初始化一次,就可以使用 std::call_once() 来保证函数在多线程环境下只能被调用一次。使用 call_once() 的时候,需要一个 once_flag 作为 call_once() 的传入参数,该函数的原型如下:

    // 定义于头文件 <mutex>
    template< class Callable, class... Args >
    void call_once( std::once_flag& flag, Callable&& f, Args&&... args );
    

    示例:

    #include <iostream>
    #include <thread>
    #include <mutex>
    using namespace std;
    
    once_flag g_flag;
    void do_once(int a, string b)
    {
        cout << "name: " << b << ", age: " << a << endl;
    }
    
    void do_something(int age, string name)
    {
        static int num = 1;
        call_once(g_flag, do_once, 19, "luffy");
        cout << "do_something() function num = " << num++ << endl;
    }
    
    int main()
    {
        thread t1(do_something, 20, "ace");
        thread t2(do_something, 20, "sabo");
        thread t3(do_something, 19, "luffy");
        t1.join();
        t2.join();
        t3.join();
    
        return 0;
    }
    

    初版线程池实现

    threadpool.h

    //
    // Created by HL .
    //
    
    #ifndef UNTITLED2_THREADPOOL_H
    #define UNTITLED2_THREADPOOL_H
    
    #include <vector>
    #include <queue>
    #include <atomic>
    #include <mutex>
    #include <memory>
    #include <condition_variable>
    #include <functional>
    
    
    
    ///任务抽象基类
    class Task{
    public:
        virtual void run()=0;
    };
    ///线程模式
    enum class PoolMode{
        MODE_FIXED, //固定数量的线程
        MODE_CACHED, //线程数量可动态增长
    };
    
    class Thread
    {
    public:
        //线程函数对象
        using TreadFunc=std::function<void()>;
        //线程构造
        Thread(TreadFunc func);
        //线程析构
        ~Thread();
    
        //启动线程
        void start();
    
    private:
        TreadFunc func_;
    
    };
    
    //线程池类型
    class ThreadPool{
    public:
        ThreadPool();
        ~ThreadPool();
    
        //设置线程池的工作模式
        void setMode(PoolMode mode);
    
        //设置task任务队列的阈值
        void setTaskQueMaxThreshHold(int threshHold);
    
        //给线程池提交任务
        void submitTask(std::shared_ptr<Task> sp);
    
        //开启线程池
        void start(int initThreadSize=4);
    
        //禁止拷贝
        ThreadPool(const ThreadPool&)=delete;
        ThreadPool operator=(const ThreadPool&)=delete;
    
    private:
        //定义线程函数
        void threadFunc();
    private:
        //线程列表
        std::vector<std::unique_ptr<Thread>> threads_;
        //初始化线程的数量
        int initThreadSize_;
        //任务队列
        std::queue<std::shared_ptr<Task>> taskQue_;
        //任务的数量
        std::atomic_int taskSize_;
        //任务队列数量上线阈值
        int taskQueMaxThreshHold_;
    
        //保证任务队列的线程安全
        std::mutex taskQueMtx_;
    
        //表示任务队列不满
        std::condition_variable notFull;
    
        //表示任务队列不空
        std::condition_variable notEmpty;
    
        //当前线程池的工作模式
        PoolMode poolMode_;
    
    };
    #endif //UNTITLED2_THREADPOOL_H
    
    

    threadpool.cpp

    //
    // Created by HL
    //
    
    #include "threadpool.h"
    #include <thread>
    #include <iostream>
    
    const int TASK_MAX_THRESHHOLD=1024;
    
    //线程池构造
    ThreadPool::ThreadPool():initThreadSize_(4),
                taskSize_(0),
                taskQueMaxThreshHold_(TASK_MAX_THRESHHOLD),
                poolMode_(PoolMode::MODE_FIXED)
    {
    
    }
    
    //线程池析构
    ThreadPool::~ThreadPool() {
    
    }
    
    
    //设置线程池的工作模式
    void ThreadPool::setMode(PoolMode mode)
    {
        this->poolMode_=mode;
    }
    
    //设置task任务队列的阈值
    void ThreadPool::setTaskQueMaxThreshHold(int threshHold)
    {
        this->taskQueMaxThreshHold_=threshHold;
    }
    
    
    //给线程池提交任务
    void ThreadPool::submitTask(std::shared_ptr<Task> sp)
    {
        //获取锁
        std::unique_lock<std::mutex> lock(taskQueMtx_);
        //线程通信 等待任务有空余
        //用户提交任务,最长不能阻塞超过1s,否则判断提交任务失败了
        if(!notFull.wait_for(lock,std::chrono::seconds(1),[&]()->bool {return taskQue_.size()<taskQueMaxThreshHold_ ;})){
            std::cerr << "task queue is full ,submit task fail ." << std::endl;
            return;
        }
    
        //如果有空余,把任务放入任务队里中
        taskQue_.emplace(sp);
        taskSize_++;
    
        //因为新放了任务,任务队里肯定不空了,在notEmpty上进行通知
        notEmpty.notify_all();
    
    }
    
    //开启线程池
    void ThreadPool::start(int initThreadSize)
    {
        this->initThreadSize_= initThreadSize;
    
        //创建线程对象
        for(int i=0;i<this->initThreadSize_;i++)
        {
            //省去了拷贝或移动元素的过程
            std::unique_ptr<Thread> ptr=std::make_unique<Thread>(std::bind(&ThreadPool::threadFunc,this));
            threads_.emplace_back(std::move(ptr));
        }
    
        //启动所有线程
        for(int i=0; i<initThreadSize_;i++)
        {
            threads_[i]->start();
        }
    }
    
    //定义线程函数
    void ThreadPool::threadFunc() {
         for(;;)
         {
             std::shared_ptr<Task> task;
             {
                 //先获取锁
                 std::unique_lock<std::mutex> lock(taskQueMtx_);
    
                 //等待notEmpty条件
                 notEmpty.wait(lock,[&]()->bool {return taskQue_.size()>0;});
    
                 //从任务队列中取一个任务出来
                 task=taskQue_.front();
                 taskQue_.pop();
                 taskSize_--;
    
                 //如果依然有剩余任务,继续通知其它得线程执行任务
                 if(taskQue_.size()>0)
                 {
                     notEmpty.notify_all();
                 }
    
                 //取出一个任务,进行通知,通知可以继续提交任务
                 notFull.notify_all();
    
             } //就应该把锁释放
    
             //当前线程负责执行这个任务
             if(task!= nullptr)
             {
                 task->run();
             }
    
         }
    }
    
    //线程构造
    Thread::Thread(TreadFunc func) {
        this->func_=func;
    }
    
    //线程析构
    Thread::~Thread() {
    
    }
    
    void Thread::start() {
      //创建线程
      std::thread t(func_);
      //设置分离线程
      t.detach();
    
    }
    
    
    

    main.cpp

    #include <iostream>
    #include "threadpool.h"
    #include <thread>
    #include <chrono>
    
    
    class MyTask: public Task{
    public:
        void run()
        {
            std::cout << "tid:" << std::this_thread::get_id() << ",begin!" << std::endl << std::flush;
            std::this_thread::sleep_for(std::chrono::seconds(2));
            std::cout << "tid:" << std::this_thread::get_id() << ",end!" << std::endl << std::flush;
        }
    };
    
    int main() {
    
        ThreadPool pool;
        pool.start(4);
        pool.submitTask(std::make_shared<MyTask>());
        pool.submitTask(std::make_shared<MyTask>());
        pool.submitTask(std::make_shared<MyTask>());
        pool.submitTask(std::make_shared<MyTask>());
        pool.submitTask(std::make_shared<MyTask>());
        std::this_thread::sleep_for(std::chrono::milliseconds(5000));
        return 0;
    }
    
    

    相关文章

      网友评论

          本文标题:C+++ 多线程

          本文链接:https://www.haomeiwen.com/subject/ppebxdtx.html