美文网首页
如何使用两个条件变量实现简单的生产者和消费者

如何使用两个条件变量实现简单的生产者和消费者

作者: crazyhank | 来源:发表于2021-09-25 19:51 被阅读0次

在这个模型中生成两个线程:

  • 生产者线程:发送数据,通知消费者线程进行处理;
  • 消费者线程:接受数据进行处理,通知生产者线程处理完成;

代码如下:

#include <iostream>
#include <thread>
#include <mutex>
#include <condition_variable>

std::mutex mtx;
std::condition_variable ready_cv, processed_cv;
bool ready = false;
bool processed = false;

void Worker()
{
    std::unique_lock<std::mutex> lk(mtx);

    // 等待分发线程发送数据
    ready_cv.wait(lk, [] () -> bool {return ready;});

    // 处理数据,睡眠模拟处理过程
    std::cout << "Worker thread: processing data ..." << std::endl;
    std::this_thread::sleep_for(std::chrono::seconds(1));

    // 处理完毕,设置标志位
    processed = true;
    std::cout << "Worker thread: done!" << std::endl;

    // 通知分发线程处理完毕
    processed_cv.notify_one();
}
void Dispatcher()
{
    std::unique_lock<std::mutex> lk(mtx);
    // 分发数据给处理线程
    std::cout << "Dispatcher thread: send data ..." << std::endl;
    ready = true;
    ready_cv.notify_one();

    // 等待处理线程处理完毕
    processed_cv.wait(lk, [] () -> bool {return processed;});
    std::cout << "Dispatcher thread: worker thread returned!" << std::endl;
}

int main()
{
    std::thread worker(Worker), dispatcher(Dispatcher);

    worker.join();
    dispatcher.join();

    return 0;
}

------------------------------------分割线----------------------------------------
模拟更加接近实际业务场景的一个例子:有一个任务队列,生产者线程不断发送任务到这个队列,消费者线程从这个队列中读取任务,并处理。要求生产者线程的发送任务函数和消费者线程中的获取任务函数都是阻塞式的。
使用两个条件变量实现上述的业务代码如下:

#include <iostream>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <queue>

struct Job {
    std::string jobName;    // 任务名称
    int jobId;              // 任务ID号
};

class JobQueue {
public:
    JobQueue(int queueMax = 10) : queueMax_(queueMax) {}  // 默认队列长度限制为10
    ~JobQueue() {}
    void submitJob(std::shared_ptr<Job> job);   // 队列满时会阻塞
    std::shared_ptr<Job> getJob();              // 队列空时会阻塞
private:
    int queueMax_;
    std::queue<std::shared_ptr<Job>> jobQ_;     // 使用智能指针队列,可以避免动态Job对象资源管理的问题。
    std::mutex queueMutex_;
    std::condition_variable notEmptyCv_;        // 队列非空条件变量
    std::condition_variable notFullCv_;         // 队列非满条件变量
};
void JobQueue::submitJob(std::shared_ptr<Job> job)
{
    // 这里用大括号,在右括号后面queueMutex_自动解锁
    {
        std::unique_lock<std::mutex> lk(queueMutex_);
        notFullCv_.wait(lk, [this]{return jobQ_.size() < queueMax_;});
        jobQ_.push(job);
    }
    // 通知函数可以不用在保持锁的状态下执行
    notEmptyCv_.notify_one();
}

std::shared_ptr<Job> JobQueue::getJob()
{
    std::shared_ptr<Job> retJob;
    // 这里用大括号,理由同上
    {
        std::unique_lock<std::mutex> lk(queueMutex_);
        notEmptyCv_.wait(lk, [this]{return jobQ_.size() > 0;});
        retJob = jobQ_.front();
        jobQ_.pop();
    }
    // 通知函数可以不用在保持锁的状态下执行
    notFullCv_.notify_one();

    return retJob;
}
void ProducerFunc(int producerId, JobQueue& jobQ)
{
    for (size_t i = 0; i < 100; i++) {
        auto job = std::make_shared<Job>();
        job->jobName = std::to_string(producerId) + "-" + std::to_string(i);
        job->jobId = i;
        jobQ.submitJob(job);
    }
}

void ConsumerFunc(JobQueue& jobQ)
{
    while(true) {
        auto job = jobQ.getJob();
        std::cout << "Processing Job: " << job->jobName << std::endl;
    }
}
int main()
{
    JobQueue jobQ(10);

    // 产生三个生产线程,分别往任务队列中添加任务。
    std::thread producerThread1(ProducerFunc, 1, std::ref(jobQ));
    std::thread producerThread2(ProducerFunc, 2, std::ref(jobQ));
    std::thread producerThread3(ProducerFunc, 3, std::ref(jobQ));
    // 产生一个消费者线程用于处理生产线程发送过来的任务。
    std::thread consumerThread(ConsumerFunc, std::ref(jobQ));

    producerThread1.join();
    producerThread2.join();
    producerThread3.join();
    consumerThread.join();

    return 0;
}

相关文章

网友评论

      本文标题:如何使用两个条件变量实现简单的生产者和消费者

      本文链接:https://www.haomeiwen.com/subject/wiaxnltx.html