美文网首页studying
生产者消费者缓冲池模型

生产者消费者缓冲池模型

作者: IT孤独者 | 来源:发表于2017-02-10 15:44 被阅读6次

    有一个生产者在生产产品,这些产品将提供给若干个消费者去消费,为了使生产者和消费者能并发执行,在两者之间设置一个具有多个缓冲区的缓冲池,生产者将它生产的产品放入一个缓冲区中,消费者可以从缓冲区中取走产品进行消费,显然生产者和消费者之间必须保持同步,即不允许消费者到一个空的缓冲区中取产品,也不允许生产者向一个已经放入产品的缓冲区中再次投放产品。

    代码如下:

    #include <stdio.h>
    #include <windows.h>
    #include <time.h>
    #include <iostream>
    #include <thread>
    #include <condition_variable>
    #include <mutex>
    #include <vector>
    #include <deque>
    #include <assert.h>
    #include <set>
    using namespace std;
    
    #define POOL_SIZE      4        // 缓冲池含有的缓冲区的个数
    #define BUF_SIZE       1        // 缓冲区能够缓冲元素的个数
    #define PRODUCT_NUM    1000     // 生产的总产品数目
    #define PRODUCER_NUM   1        // 生产者的数目
    #define CONSUMER_NUM   2        // 消费者的数目
    
    vector<deque<int> > pool(POOL_SIZE, deque<int>());   // 缓冲池定义
    int g_num = 0;
    
    struct s_buf_info
    {
        s_buf_info(int n) : nSize(n), bLock(false) {}
        int nSize;
        bool bLock;
    };
    vector<s_buf_info> poolStatus(POOL_SIZE, s_buf_info(BUF_SIZE));   // 缓冲池使用状态
    set<int> InitSetWrite() {
        set<int> s;
        for (int i = 0; i < POOL_SIZE; ++i) {
            s.insert(i);
        }
        return s;
    }
    set<int> setWrite(InitSetWrite());
    set<int> setRead;
    
    mutex mtxPoolStatus;
    condition_variable conWrite;
    condition_variable conRead;
    
    mutex mtx;  // for output data
    
    BOOL SetConsoleColor(WORD wAttributes)
    {
        HANDLE hConsole = GetStdHandle(STD_OUTPUT_HANDLE);
        if (hConsole == INVALID_HANDLE_VALUE)
            return FALSE;
    
        return SetConsoleTextAttribute(hConsole, wAttributes);
    }
    
    int GetWriteID()
    {
        if (setWrite.empty()) return -1;
    
        auto itr = setWrite.begin();
        int nID = *itr;
    
        setWrite.erase(itr);
    
        if (!setRead.empty()) {
            auto itr = setRead.find(nID);
            if (itr != setRead.end())
                setRead.erase(itr);
        }
    
        return nID;
    }
    
    int GetReadID()
    {
        if (setRead.empty()) return -1;
    
        auto itr = setRead.begin();
        int nID = *itr;
    
        setRead.erase(itr);
    
        if (!setWrite.empty()) {
            auto itr = setWrite.find(nID);
            if (itr != setWrite.end())
                setWrite.erase(itr);
        }
    
        return nID;
    }
    
    int ProductLock()
    {
        unique_lock<mutex> lck(mtxPoolStatus);
    
        int nID = GetWriteID();
        while (nID == -1) {
            conWrite.wait(lck);
            nID = GetWriteID();
        }
    
        poolStatus[nID].bLock = true;
    
        return nID;
    }
    
    void ProductUnLock(int nID)
    {
        unique_lock<mutex> lck(mtxPoolStatus);
    
        poolStatus[nID].bLock = false;
        poolStatus[nID].nSize -= 1;
    
        if (poolStatus[nID].nSize < BUF_SIZE) {
            setRead.insert(nID);
            conRead.notify_all();
        }
    
        if (poolStatus[nID].nSize > 0) {
            setWrite.insert(nID);
            conWrite.notify_all();
        }
    }
    
    int ConsumeLock()
    {
        unique_lock<mutex> lck(mtxPoolStatus);
    
        int nID = GetReadID();
        while (nID == -1) {
            conRead.wait(lck);
            nID = GetReadID();
        }
    
        poolStatus[nID].bLock = true;
    
        return nID;
    }
    
    void ConsumeUnLock(int nID)
    {
        unique_lock<mutex> lck(mtxPoolStatus);
    
        poolStatus[nID].bLock = false;
        poolStatus[nID].nSize += 1;
    
        if (poolStatus[nID].nSize < BUF_SIZE) {
            setRead.insert(nID);
            conRead.notify_all();
        }
    
        if (poolStatus[nID].nSize > 0) {
            setWrite.insert(nID);
            conWrite.notify_all();
        }
    
    }
    
    void ProducerThreadFun(int nProductNum)
    {   
        for (int i=0 ; i < nProductNum; ++i) {
            int nID = ProductLock();
            pool[nID].push_back(g_num++);
            mtx.lock();
            printf("编号为%d生产者在缓冲池第%d个缓冲区中投放数据%d\n", GetCurrentThreadId(), nID, g_num - 1);
            mtx.unlock();
            ProductUnLock(nID);
        }
    }
    
    void ConsumerThreadFun(int nConsumNum)
    {
        for (int i = 0; i < nConsumNum; ++i) {
            int nID = ConsumeLock();
            int nVal = pool[nID].front();
            pool[nID].pop_front();
            ConsumeUnLock(nID);
            mtx.lock();
            SetConsoleColor(FOREGROUND_GREEN);
            printf("  编号为%d的消费者从缓冲池中第%d个缓冲区取出数据%d\n", GetCurrentThreadId(), nID, nVal);
            SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
            mtx.unlock();
        }
    
        SetConsoleColor(FOREGROUND_GREEN);
        printf("  编号为%d的消费者收到通知,线程结束运行\n", GetCurrentThreadId());
        SetConsoleColor(FOREGROUND_RED | FOREGROUND_GREEN | FOREGROUND_BLUE);
    }
    
    int main(int argc, char ** argv)
    {
        time_t t1 = time(NULL);
    
        vector<thread> pro(PRODUCER_NUM);
        vector<thread> con(CONSUMER_NUM);
    
        assert(PRODUCT_NUM % PRODUCER_NUM == 0);
        assert(PRODUCT_NUM % CONSUMER_NUM == 0);
    
        for (auto &th : pro) th = thread(ProducerThreadFun, PRODUCT_NUM / PRODUCER_NUM);
        for (auto &th : con) th = thread(ConsumerThreadFun, PRODUCT_NUM / CONSUMER_NUM);
    
        for (auto &th : pro) th.join();
        for (auto &th : con) th.join();
    
        time_t t2 = time(NULL);
        
        cout << t2 - t1 << " s" << endl;
        return 0;
    }
    

    相关文章

      网友评论

        本文标题:生产者消费者缓冲池模型

        本文链接:https://www.haomeiwen.com/subject/vuggittx.html