美文网首页
C++并发开发(1)- 生产者消费者模型

C++并发开发(1)- 生产者消费者模型

作者: Savior2016 | 来源:发表于2017-12-11 15:48 被阅读140次

打算先看一下生产者消费者模型,在进行从头到尾系统的学习。
参考文章:C++11 并发指南九(综合运用: C++11 多线程下生产者消费者模型详解)

1 单生产者-单消费者模型


源码及解析如下:

#include <unistd.h>

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize  = 10; // Item buffer size.
static const int kItemsToProduce  = 1000;   // How many items we plan to produce.

struct ItemRepository {
    int item_buffer[kItemRepositorySize]; // 产品缓冲区, 配合 read_position 和 write_position 模型环形队列.
    size_t read_position; // 消费者读取产品位置.
    size_t write_position; // 生产者写入产品位置.
    std::mutex mtx; // 互斥量,保护产品缓冲区
    std::condition_variable repo_not_full; // 条件变量, 指示产品缓冲区不为满.
    std::condition_variable repo_not_empty; // 条件变量, 指示产品缓冲区不为空.
} gItemRepository; // 产品库全局变量, 生产者和消费者操作该变量.

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, int item)
{
    std::unique_lock<std::mutex> lock(ir->mtx);//可以简单的理解为开启线程锁
    while(((ir->write_position + 1) % kItemRepositorySize)== ir->read_position) 
    { // item buffer is full, just wait here.这时说明仓库满了,也就是写入的指针已经追着读取的指针追到一圈了
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock); // 生产者等待"产品库缓冲区不为满"这一条件发生.
    }

    (ir->item_buffer)[ir->write_position] = item; // 在生产者指针位置写入产品.
    (ir->write_position)++; // 写入位置后移.

    if (ir->write_position == kItemRepositorySize) // 写入位置若是在队列最后则重新设置为初始位置.
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all(); // 通知消费者产品库不为空.
    lock.unlock(); // 解锁.
}

int ConsumeItem(ItemRepository *ir)
{
    int data;
    std::unique_lock<std::mutex> lock(ir->mtx);
    // item buffer is empty, just wait here.
    while(ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock); // 消费者等待"产品库缓冲区不为空"这一条件发生.
    }

    data = (ir->item_buffer)[ir->read_position]; // 读取某一产品
    (ir->read_position)++; // 读取位置后移

    if (ir->read_position >= kItemRepositorySize) // 读取位置若移到最后,则重新置位.
        ir->read_position = 0;

    (ir->repo_not_full).notify_all(); // 通知消费者产品库不为满.
    lock.unlock(); // 解锁.

    return data; // 返回产品.
}


void ProducerTask() // 生产者任务
{
    for (int i = 1; i <= kItemsToProduce; ++i) {
        // sleep(1);
        std::cout << "Produce the " << i << "^th item..." << std::endl;
        ProduceItem(&gItemRepository, i); // 循环生产 kItemsToProduce 个产品.
    }
}

void ConsumerTask() // 消费者任务
{
    static int cnt = 0;
    while(1) {
        sleep(1);
        int item = ConsumeItem(&gItemRepository); // 消费一个产品.
        std::cout << "Consume the " << item << "^th item" << std::endl;
        if (++cnt == kItemsToProduce) break; // 如果产品消费个数为 kItemsToProduce, 则退出.
    }
}

void InitItemRepository(ItemRepository *ir)
{
    ir->write_position = 0; // 初始化产品写入位置.
    ir->read_position = 0; // 初始化产品读取位置.
}

int main()
{
    InitItemRepository(&gItemRepository);
    std::thread producer(ProducerTask); // 创建生产者线程.
    std::thread consumer(ConsumerTask); // 创建消费之线程.
    producer.join();
    consumer.join();
}

2 多生产者-单消费者模型

这个可能近期会用到,所以记录一下。例如数据库操作,可以把要存储的数据缓存下来,这样可以提高数据采集速度,缓存下来的数据可以采用事务处理批量操作,又提高了数据库存储的速度。多个线程采集数据,单个线程进行数据库存储。

#include <unistd.h>

#include <cstdlib>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <thread>

static const int kItemRepositorySize  = 4; // Item buffer size.
static const int kItemsToProduce  = 10;   // How many items we plan to produce.

struct ItemRepository {
    int item_buffer[kItemRepositorySize];
    size_t read_position;
    size_t write_position;
    size_t item_counter;
    std::mutex mtx;
    std::mutex item_counter_mtx;
    std::condition_variable repo_not_full;
    std::condition_variable repo_not_empty;
} gItemRepository;

typedef struct ItemRepository ItemRepository;


void ProduceItem(ItemRepository *ir, int item)
{
    std::unique_lock<std::mutex> lock(ir->mtx);
    while(((ir->write_position + 1) % kItemRepositorySize)
        == ir->read_position) { // item buffer is full, just wait here.
        std::cout << "Producer is waiting for an empty slot...\n";
        (ir->repo_not_full).wait(lock);
    }

    (ir->item_buffer)[ir->write_position] = item;
    (ir->write_position)++;

    if (ir->write_position == kItemRepositorySize)
        ir->write_position = 0;

    (ir->repo_not_empty).notify_all();
    lock.unlock();
}

int ConsumeItem(ItemRepository *ir)
{
    int data;
    std::unique_lock<std::mutex> lock(ir->mtx);
    // item buffer is empty, just wait here.
    while(ir->write_position == ir->read_position) {
        std::cout << "Consumer is waiting for items...\n";
        (ir->repo_not_empty).wait(lock);
    }

    data = (ir->item_buffer)[ir->read_position];
    (ir->read_position)++;

    if (ir->read_position >= kItemRepositorySize)
        ir->read_position = 0;

    (ir->repo_not_full).notify_all();
    lock.unlock();

    return data;
}

void ProducerTask()
{
    bool ready_to_exit = false;
    while(1) {
        sleep(1);
        std::unique_lock<std::mutex> lock(gItemRepository.item_counter_mtx);//区别是这里加了个锁
        if (gItemRepository.item_counter < kItemsToProduce) {
            ++(gItemRepository.item_counter);
            ProduceItem(&gItemRepository, gItemRepository.item_counter);
            std::cout << "Producer thread " << std::this_thread::get_id()
                << " is producing the " << gItemRepository.item_counter
                << "^th item" << std::endl;
        } else ready_to_exit = true;
        lock.unlock();
        if (ready_to_exit == true) break;
    }
    std::cout << "Producer thread " << std::this_thread::get_id()
                << " is exiting..." << std::endl;
}

void ConsumerTask()
{
    static int item_consumed = 0;
    while(1) {
        sleep(1);
        ++item_consumed;
        if (item_consumed <= kItemsToProduce) {
            int item = ConsumeItem(&gItemRepository);
            std::cout << "Consumer thread " << std::this_thread::get_id()
                << " is consuming the " << item << "^th item" << std::endl;
        } else break;
    }
    std::cout << "Consumer thread " << std::this_thread::get_id()
                << " is exiting..." << std::endl;
}

void InitItemRepository(ItemRepository *ir)
{
    ir->write_position = 0;
    ir->read_position = 0;
    ir->item_counter = 0;
}

int main()
{
    InitItemRepository(&gItemRepository);
    std::thread producer1(ProducerTask);
    std::thread producer2(ProducerTask);
    std::thread producer3(ProducerTask);
    std::thread producer4(ProducerTask);
    std::thread consumer(ConsumerTask);

    producer1.join();
    producer2.join();
    producer3.join();
    producer4.join();
    consumer.join();

相关文章

  • C++并发开发(1)- 生产者消费者模型

    打算先看一下生产者消费者模型,在进行从头到尾系统的学习。参考文章:C++11 并发指南九(综合运用: C++11 ...

  • pthread中mutex和cond的配合使用

    典型的生产者,消费者多线程并发模型中,cond一般要和mutex配合使用,如下: 1.生产者:pthread_mu...

  • 生产者和消费者模型

    生产者和消费者模型 1. 什么是生产者和消费者模型 生产者消费者模型具体来讲,就是在一个系统中,存在生产者和消费者...

  • Java实现生产者-消费者模型

    考查Java的并发编程时,手写“生产者-消费者模型”是一个经典问题。有如下几个考点: 对Java并发模型的理解 对...

  • 34.Python之生产者消费者模型

    Python之生产者消费者模型(非常重要) 生产者消费者模型模型指的是一种解决问题的套路。 生产者消费者模型中包含...

  • 生产者消费者模型

    - 实现c++的生产者消费者模型 互斥锁+条件变量 - java中的实现方式 Wait + notify

  • 生产者消费者(一)

    生产者消费者模型: 生产者------> 缓存<-------- 消费者

  • 进程并发编程基础

    前言:看了操作系统并发编程的基础,做个笔记并用 C 实现常见的一种并发编程的模型——消费者、生产者模型 进程之间的...

  • 阻塞队列实现生产者消费者模式

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和...

  • 多线程之生产者与消费者问题

    生产者消费者模式是并发、多线程编程中经典的设计模式,生产者和消费者通过分离的执行工作解耦,简化了开发模式,生产者和...

网友评论

      本文标题:C++并发开发(1)- 生产者消费者模型

      本文链接:https://www.haomeiwen.com/subject/zipiixtx.html