美文网首页
C++11生产者消费者

C++11生产者消费者

作者: Magic11 | 来源:发表于2019-06-19 15:53 被阅读0次
    
    #include <iostream>
    #include <thread>
    #include <mutex>
    #include <condition_variable>
    #include <queue>
    #include <random>
    
    using namespace std;
    
    static const int g_product_max_count = 100;        //生产者生产产品的最大个数
    std::mutex stdoutMutex;                           //多线程标准输出 同步锁
    
    struct ProductManager {
        queue<int> m_product_queue;      
        int queue_max_size = 10;                      // 队列所容纳的产品最大个数
        int m_produce_product_count = 0;
        int m_consume_product_count = 0;
        std::mutex m_mutex;                           // 互斥量,保护产品缓冲区
        std::mutex m_produce_count_mutex;
        std::mutex m_consume_count_mutex;
        std::condition_variable cv_queue_notFull;     // 条件变量, 指产品仓库缓冲区不为满
        std::condition_variable cv_queue_notEmpty;    // 条件变量, 指产品仓库缓冲区不为空
    } g_productManager;                               // 产品库全局变量,生产者和消费者操作该变量.
    
    
    void ProduceProduct(ProductManager &pManager, int product)
    {
        std::unique_lock<std::mutex> lock(pManager.m_mutex);
        while (pManager.m_product_queue.size() >= pManager.queue_max_size) {
            {
                std::lock_guard<std::mutex> lock(stdoutMutex);
                cout << "仓库满了,生产者等待中..." << "thread id = " << std::this_thread::get_id() << endl;
            }
            pManager.cv_queue_notFull.wait(lock);
    
        }
    
        pManager.m_product_queue.push(product);         // 仓库放入产品
        pManager.cv_queue_notEmpty.notify_all();        // 通知消费者仓库不为空
        lock.unlock(); 
    }
    
    
    int ConsumeProduct(ProductManager &pManager)
    {
        int data;
        std::unique_lock<std::mutex> lock(pManager.m_mutex);
    
        while (pManager.m_product_queue.empty()) {
            {
                std::lock_guard<std::mutex> lock(stdoutMutex);
                cout << "仓库空了,消费者等待中..." << "thread id = " << std::this_thread::get_id() << endl;
            }
            pManager.cv_queue_notEmpty.wait(lock);
        }
    
        data = pManager.m_product_queue.front();
        pManager.m_product_queue.pop();
        pManager.cv_queue_notFull.notify_all();
        lock.unlock();
        return data;
    }
    
    // 生产者任务
    void ProducerTask(int th_ID)
    {
        bool readyToExit = false;
        while (true) {
            default_random_engine e;
            uniform_int_distribution<unsigned >u(0, 9);
            this_thread::sleep_for(std::chrono::seconds(u(e)));
            std::unique_lock<std::mutex> lock(g_productManager.m_produce_count_mutex);  // 仓库产品消费计数器保持多线程互斥
    
            if (g_productManager.m_produce_product_count < g_product_max_count) {
                ++g_productManager.m_produce_product_count;
                ProduceProduct(g_productManager, g_productManager.m_produce_product_count);
                {
                    std::lock_guard<std::mutex> lock(stdoutMutex);
                    cout << "Thread " << th_ID << " Produce the " << g_productManager.m_produce_product_count << " th product" << endl;
                }
            } else {
                readyToExit = true;
            }
    
            lock.unlock();
            if (readyToExit)
                break;
        }
    
        std::lock_guard<std::mutex> lock(stdoutMutex);
        cout << "Producer Thread " << th_ID << " exit.... " << endl;
    }
    
    // 消费者任务
    void ConsumerTask(int th_ID)
    {
        while (true) {
            default_random_engine e;
            uniform_int_distribution<unsigned >u(0, 9);
            this_thread::sleep_for(std::chrono::seconds(u(e)));
            std::unique_lock<std::mutex> lock(g_productManager.m_consume_count_mutex);
            g_productManager.m_consume_product_count++;
            if (g_productManager.m_consume_product_count <= g_product_max_count) {
                int product = ConsumeProduct(g_productManager);   
                {
                    std::lock_guard<std::mutex> lock(stdoutMutex);
                    cout << "Thread " << th_ID << " Consume the " << product << " th product" << endl;
                }
            } else {
                break;
            }
    
        }
    
        std::lock_guard<std::mutex> lock(stdoutMutex);
        cout << "Consumer Thread " << th_ID << " exit...." << endl;
    
    }
    
    #define PRODUCTER_NUMS 10
    #define CUSTOMER_NUMS  5
    
    int main()
    {
        std::thread producer[PRODUCTER_NUMS];
        std::thread consumer[PRODUCTER_NUMS];
    
        for (int i = 0; i < PRODUCTER_NUMS; i++) {
            producer[i] = std::thread(ProducerTask, i + 1);
        }
    
        for (int i = 0; i < CUSTOMER_NUMS; i++) {
            consumer[i] = std::thread(ConsumerTask, i + 1);
        }
    
    
    
        for (int i = 0; i < PRODUCTER_NUMS; i++) {
            producer[i].join();
        }
    
        for (int i = 0; i < CUSTOMER_NUMS; i++) {
            consumer[i].join();
        }
    
        system("pause");
        return 0;
    }
    
    
    

    参考:
    https://blog.csdn.net/zy13270867781/article/details/79231775

    https://blog.csdn.net/u013390476/article/details/52067321

    #include <stdio.h>
    #include <mutex>
    #include <queue>
    #include <thread>
    #include <condition_variable>
    #include <atomic>
    
    
    class Product;
    
    std::mutex g_mutex;
    std::condition_variable g_producter_cv;
    std::condition_variable g_customer_cv;
    std::queue<Product *> g_queue_product;
    std::mutex m_mutex_print;
    
    std::atomic<int> g_id = 0;
    
    int MAX_SIZE = 10;
    
    
    
    class Product {
    public:
        Product(int id) {
            std::lock_guard<std::mutex> lock(m_mutex_print);
            c_id = id;
            printf("%ld Product is product \n", c_id);
        }
    
        void print() {
            std::lock_guard<std::mutex> lock(m_mutex_print);
            printf("%ld Product is custom  \n", c_id);
        }
    private:
        int c_id;
        
    };
    
    class Producter {
    public:
        void product() {
            while (true) {
                std::unique_lock<std::mutex> lock(g_mutex); //此处不能使用lock_guard
                while (g_queue_product.size() >= MAX_SIZE) {
                    {
                        std::lock_guard<std::mutex> lock(m_mutex_print);
                        printf("queue is full wait  \n");
                    }
                    g_producter_cv.wait(lock);
                }
                Product *product = new Product(++g_id);
                g_queue_product.push(product);
                g_customer_cv.notify_all();
                lock.unlock();
    
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
            }
        }
    };
    
    class Customer {
    public:
        void custom() {
            while (true) {
                {
                    std::unique_lock<std::mutex> lock(g_mutex);
                    while (g_queue_product.empty()) {
                        {
                            std::lock_guard<std::mutex> lock(m_mutex_print);
                            printf("queue is empty wait  \n");
                        }
                        g_customer_cv.wait(lock);
                    }
                    Product *product = g_queue_product.front();
                    g_queue_product.pop();
                    if (nullptr != product) {
                        product->print();
                        delete product;
                        product = nullptr;
                    }
                    g_producter_cv.notify_all();
                }
    
                std::this_thread::sleep_for(std::chrono::milliseconds(500));
            }
    
        }
    };
    
    
    int main() {
    
    
        Producter producter1;
        std::thread producterThread1(&Producter::product, &producter1);
    
        Producter producter2;
        std::thread producterThread2(&Producter::product, &producter2);
    
        Customer customer;
        std::thread customerThread(&Customer::custom, &customer);
    
        producterThread1.join();
    
        return 0;
    }
    

    相关文章

      网友评论

          本文标题:C++11生产者消费者

          本文链接:https://www.haomeiwen.com/subject/klskqctx.html