美文网首页
用C++实现数据总线的方法系列(中):数据总线的实现

用C++实现数据总线的方法系列(中):数据总线的实现

作者: JasonLiThirty | 来源:发表于2020-03-14 09:38 被阅读0次

    视频教程:https://www.bilibili.com/video/av95993899/

    用C++实现数据总线的方法,本文主要介绍实现一个完整功能的带超时的数据总线DataQueue的方法。

    数据总线以及传输的数据的定义和实现

    传输数据类Data,它是由数据的地址和数据的大小两个成员组成的。

    数据总线类DataQueue,需要有存放总线数据的数据链表,构成锁的互斥量和用于多线程同步的条件变量,同时也需要具有最基本的Push和Pop函数,还有就是Clear和Empty函数。

    #ifndef DATA_QUEUE_H
    #define DATA_QUEUE_H
    
    #include<stdint.h>
    #include<mutex>
    #include<condition_variable>
    #include<list>
    
    class Data
    {
    public:
        Data(): m_address(NULL), m_size(0)
        {
    
    
        }
    
        Data(uint64_t size): m_size(size)
        {
            m_address = new char[m_size];
        }
    
        ~Data()
        {
            if (m_address != NULL)
            {
                delete[] m_address;
                m_address = NULL;
            }
        }
    
        char* m_address;
        uint64_t m_size;
    };
    class DataQueue
    {
    public:
        static void Clear();
        static bool Empty();
    
        static void Push(Data* dataPtr);
        static void Push(void* data, uint64_t size);
        static Data* Pop();
    private:
        static std::mutex               s_mt;
        static std::condition_variable  s_cv;
        static std::list<Data *>        s_queue;
    };
    
    
    #endif
    
    #include "DataQueue.h"
    
    static int POP_TIMEOUT = 1000;
    
    std::mutex              DataQueue::s_mt;
    std::condition_variable DataQueue::s_cv;
    std::list<Data *>       DataQueue::s_queue;
    void DataQueue::Clear()
    {
        std::list<Data *>::iterator it = s_queue.begin();
        for (; it != s_queue.end(); ++it)
        {
            delete *it;
        }
        s_queue.clear();
    }
    bool DataQueue::Empty()
    {
        std::unique_lock<std::mutex> lock(s_mt);
        return s_queue.empty();
    }
    
    void DataQueue::Push(Data* dataPtr)
    {
        std::unique_lock<std::mutex> lock(s_mt);
        s_queue.push_back(dataPtr);
        s_cv.notify_all();
    }
    void DataQueue::Push(void* data, uint64_t size)
    {
        Data *dataPtr = new Data(size);
        memcpy(dataPtr->m_address, data, size);
        Push(dataPtr);
    }
    
    Data* DataQueue::Pop()
    {
        Data* dataPtr = NULL;
        {
            std::unique_lock<std::mutex> lock(s_mt);
            while (s_queue.empty())
            {
                if (s_cv.wait_for(lock, std::chrono::milliseconds(POP_TIMEOUT)) == std::cv_status::timeout)
                {
                    break;
                }
            }
    
            if (!s_queue.empty())
            {
                dataPtr = s_queue.front();
                s_queue.pop_front();
            }
        }
        
        return dataPtr;
    
    }
    

    这个示例所实现的数据总线呢,虽然可以传递任意类型的数据,但是为了提高效率,所以我们传递的实际上是原始数据的地址,这样减少了数据的构造和拷贝,确实提高了效率,但在使用的过程中就需要留意使用方法,以免造成程序异常。

    读取数据会出错的情况

    传输的数据类User,它是由id和info两个数据局成员来组成的。同时我们定义了UserPtr作为User类的shared_ptr智能指针。

    #ifndef USER_H
    #define USER_H
    
    #include<string>
    #include<memory>
    class User
    {
    public:
        User(int id, std::string info):m_id(id), m_info(info) {}
    
        int ID()
        {
            return m_id;
        }
    
        std::string Info()
        {
            return m_info;
        }
    private:
        int m_id;
        std::string m_info;
    };
    
    using UserPtr = std::shared_ptr<User>;
    

    调用程序,在SetData线程函数中,我们新建了一个原始数据User对象,并用一个shared_ptr智能指针pUser指向它,然后我们用了一个变量addr存放了这个User对象的地址,然后将addr里的值,也就是User对象的地址放入拷贝到传输数据对象data中,存放到数据总线中。
    在GetData线程函数中,获得了传输的数据对象data,然后得到了data里面的值,也就是原始数据User对象的地址,然后使用reinterpret_cast将这个地址值强转为一个目标数据User对象,因为原始对象和目标对象的结构一致,所以也就得到了原始数据值。

    #include "stdafx.h"
    #include "User.h"
    #include "DataQueue.h"
    #include<atomic>
    #include<thread>
    #include<string>
    #include<iostream>
    static std::atomic_bool getStop = false;
    
    void SetData()
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        int i = 1;
        while (i <= 1) //insert one data
        {   
            UserPtr pUser(new User(i, std::to_string(i * 10)));
            User* user = pUser.get();
            uint64_t addr = (uint64_t)user;
            DataQueue::Push(&addr, sizeof(addr));
            i++;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
    }
    
    
    void GetData()
    {
        while (true)
        {
            Data *data = DataQueue::Pop();
            if (data != NULL)
            {
                uint64_t addr = 0;
                memcpy(&addr, data->m_address, sizeof(addr));
                UserPtr pUser(reinterpret_cast<User*>(addr));
                User* user = pUser.get();
                std::cout << "---user id: " << user->ID() << "---user info: " << user->Info().c_str() << std::endl;
    
    
                //std::cout << "---user id: " << pUser->ID() << "---user info: " << pUser->Info().c_str() << std::endl;
            }
    
    
        }
    }
    int main()
    {
        std::thread thGet(GetData);
        std::thread thSet(SetData);
    
    
        thGet.join();
        thSet.join();
        return 0;
    }
    

    但是报错了,而且是个内存错误,原因是为了提高效率,我们在数据总线上传输的是原始数据的地址,但是,因为在SetData新建的原始数据User使用的是shared_ptr智能指针来指向它,在Push函数放入原始数据地址后,shared_ptr智能指针这个局部变量也将释放,这样原始数据的引用计数就归零了,所以原始数据就被释放了,导致读取线程在总线Pop数据时崩溃。

    正确的数据总线的使用方法

    正确的使用方法是:使用boost::intrusive_ptr智能指针,同时需要用总线的数据类自己来实现引用计数,这样就可以保证原始数据不被释放,具体方法如下:

    建立一个提供引用计数功能的基类ReferenceCounter,这个基类需要提供intrusive_ptr_add_ref和intrusive_ptr_release两个接口供boost::intrusive_ptr来进行引用计数的增加和减少。同时,要提供两个接口来供我们自己来进行进行引用计数的增加和减少,这里我们实现了IncReference和DecReference这两个接口来完成相应的功能。

    #ifndef ADVANCED_USER_H
    #define ADVANCED_USER_H
    #include <assert.h>
    #include <atomic>
    #include <string>
    #include <boost/intrusive_ptr.hpp>
    
    class ReferenceCounter
    {
    public:
        friend void intrusive_ptr_add_ref(ReferenceCounter *p)
        {
            assert(p);
            assert(p->ref_count >= 0);
            ++p->ref_count;
        }
    
        friend void intrusive_ptr_release(ReferenceCounter *p)
        {
            assert(p);
            assert(p->ref_count > 0);
            if (--p->ref_count == 0)
            {
                delete p;
            }
        }
        friend uint64_t IncReference(ReferenceCounter *p)
        {
            intrusive_ptr_add_ref(p);
            return reinterpret_cast<uint64_t>(p);
        }
    
        friend void DecReference(ReferenceCounter *p)
        {
            intrusive_ptr_release(p);
        }
    
        int RefCount()
        {
            return ref_count;
        }
    protected:
        ReferenceCounter() { ref_count = 0; }
        virtual ~ReferenceCounter() {};
    
    private:
        std::atomic_int ref_count;
    };
    
    class AdvancedUser: public ReferenceCounter
    {
    public:
        AdvancedUser(int id, std::string info) :m_id(id), m_info(info) {}
    
        int ID()
        {
            return m_id;
        }
    
        std::string Info()
        {
            return m_info;
        }
    private:
        int m_id;
        std::string m_info;
    };
    
    using AdvancedUserPtr = boost::intrusive_ptr<AdvancedUser>;
    #endif
    

    如果调用总线的方式不改变,还是会报错,因为其实和之前使用shared_ptr没有本质的区别,还是会因为数据释放问题而报错,只不过我们能更清楚的了解数据释放的过程。

    #include "stdafx.h"
    #include "User.h"
    #include "AdvancedUser.h"
    #include "DataQueue.h"
    #include<atomic>
    #include<thread>
    #include<string>
    #include<iostream>
    
    void SetData()
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        int i = 1;
        while (i <= 1) 
        {
            AdvancedUserPtr pUser(new AdvancedUser(i, std::to_string(i * 10)));
            AdvancedUser*  user = pUser.get();
            uint64_t addr = (uint64_t)user;
            DataQueue::Push(&addr, sizeof(addr));
            i++;
        }
        std::cout << "---SetData End!" << std::endl;
    }
    
    void GetData()
    {
        while (true)
        {
            std::this_thread::sleep_for(std::chrono::seconds(5));
            Data *data = DataQueue::Pop();
            if (data != NULL)
            {
                uint64_t addr = 0;
                memcpy(&addr, data->m_address, sizeof(data->m_size));
                AdvancedUserPtr pUser(reinterpret_cast<AdvancedUser*>(addr));
                std::cout << "---user id: " << pUser->ID() << "---user info: " << pUser->Info().c_str() << std::endl;
            }
        }
        std::cout << "---GetData End!" << std::endl;
    }
    
    
    int main()
    {
        std::thread thGet(GetData);
        std::thread thSet(SetData);
    
        thGet.join();
        thSet.join();
        return 0;
    }
    
    • 在建立智能指针对象时,对象的引用计数加1后为1

    在push完成,智能指针对象出作用域之后,对象的引用计数减1后为0,释放了原始对象,在Pop数据时同样会崩溃

    正确的使用方法如下,在SetData线程函数里的Push数据到总线之前,调用IncReference,使得对象的计数加1,这样原始数据的引用计数就为2了,当智能指针出作用域,对象计数减1后,原始数据的引用计数为从2变为1,但没有清零,所以原始数据比不会被释放。

    当然,那在GetData函数中的得到原始数据之后,还需要调用DecReference将对象计数减1,也就是减掉我们之前人为调用IncReference的那个计数加1。代码如下:

    #include "stdafx.h"
    #include "User.h"
    #include "AdvancedUser.h"
    #include "DataQueue.h"
    #include<atomic>
    #include<thread>
    #include<string>
    #include<iostream>
    
    static bool StopFlag = false;
    void SetData()
    {
        std::this_thread::sleep_for(std::chrono::seconds(1));
        int i = 1;
        while (i <= 10) 
        {
            AdvancedUserPtr pUser(new AdvancedUser(i, std::to_string(i * 10)));
            AdvancedUser*  user = pUser.get();
            uint64_t addr = (uint64_t)user;
            IncReference(pUser.get());
            std::cout << "Set---user id: " << pUser->ID() << "---user info: " << pUser->Info().c_str() << std::endl;
            DataQueue::Push(&addr, sizeof(addr));
            i++;
            std::this_thread::sleep_for(std::chrono::seconds(1));
        }
        std::cout << "---SetData End!" << std::endl;
        getStop = true;
    }
    void GetData()
    {
        while (!getStop)
        {
            //std::this_thread::sleep_for(std::chrono::seconds(1));
            Data *data = DataQueue::Pop();
            if (data != NULL)
            {
                uint64_t addr = 0;
                memcpy(&addr, data->m_address, sizeof(data->m_size));
                AdvancedUserPtr pUser(reinterpret_cast<AdvancedUser*>(addr));
                std::cout << "Get---user id: " << pUser->ID() << "---user info: " << pUser->Info().c_str() << std::endl;
                DecReference(pUser.get());
            }
        }
        std::cout << "---GetData End!" << std::endl;
    }
    
    int main()
    {
        std::thread thGet(GetData);
        std::thread thSet(SetData);
    
    
        thGet.join();
        thSet.join();
        return 0;
    }
    
    //output
    Set---user id: 1---user info: 10
    Get---user id: 1---user info: 10
    Set---user id: 2---user info: 20
    Get---user id: 2---user info: 20
    Set---user id: 3---user info: 30
    Get---user id: 3---user info: 30
    Set---user id: 4---user info: 40
    Get---user id: 4---user info: 40
    Set---user id: 5---user info: 50
    Get---user id: 5---user info: 50
    Set---user id: 6---user info: 60
    Get---user id: 6---user info: 60
    Set---user id: 7---user info: 70
    Get---user id: 7---user info: 70
    Set---user id: 8---user info: 80
    Get---user id: 8---user info: 80
    Set---user id: 9---user info: 90
    Get---user id: 9---user info: 90
    Set---user id: 10---user info: 100
    Get---user id: 10---user info: 100
    ---SetData End!
    ---GetData End!
    

    相关文章

      网友评论

          本文标题:用C++实现数据总线的方法系列(中):数据总线的实现

          本文链接:https://www.haomeiwen.com/subject/oafmshtx.html