美文网首页
跨进程无锁队列

跨进程无锁队列

作者: 谭英智 | 来源:发表于2023-09-21 19:16 被阅读0次

    介绍

    实现多进程单读单写共享内存无锁队列
    不使用信号量来同步
    而是通过原子操作来同步

    performance

    produce 1000000000 record use 33425683us  qps 2991w
    produce 1000000000 record use 39856828us  qps 2508w
    produce 1000000000 record use 41187954us  qps 2427w
    

    优化点

    • 使用局部变量,而不是类成员,减少编译优化的难度
    • 使用padding,避免伪共享
    • 优先是用relaxed来操作atomit变量
    • 使用peek来获取内存,减少内存拷贝
    • 使用加法来代替取余操作
    • 去除不必要的变量
    • 循环内重复计算外提,减少不必要的计算

    code

    #include <sys/types.h>
    #include <unistd.h>
    #include <sys/wait.h>
    #include <sys/shm.h>
    #include <atomic>
    #include <string>
    #include <thread>
    #include <iostream>
    #include <cstring>
    #include <chrono>
    #include <sys/time.h>
    #include <signal.h>
    struct Idx {
        std::atomic<uint32_t> pidx;
        uint32_t padding_1[15];
        std::atomic<uint32_t> cidx;
        uint32_t padding_2[15];
    };
    
    const uint32_t max_number = 10000;
    const uint32_t stop_number = 20000;
    const uint32_t g_queue_size = 2047;
    const uint32_t g_element_size = 2048;
    const std::string  path = "./";
    const uint32_t magic = 100;
    
    class ShmLockFreeProducer {
    private:
        Idx* meta;
        uint8_t* memory_addr;
        uint8_t* queue_addr;
        uint32_t mem_size;
        uint32_t element_size;
        uint32_t queue_size;
        int shmid;
    public:
        ShmLockFreeProducer(uint32_t element_size, uint32_t queue_size);
        ~ShmLockFreeProducer();
        int create(const std::string& path, uint32_t magic);
        int load(const std::string& path, uint32_t magic);
        void produce(void* data, uint32_t size);
    private:
        void distory();
        bool isFull(uint32_t cur_idx);
    };
    ShmLockFreeProducer::ShmLockFreeProducer(uint32_t element_size, uint32_t queue_size) {
        meta = nullptr;
        memory_addr = nullptr;
        queue_addr = nullptr;
        shmid = -1;
        this->element_size = element_size;
        this->queue_size = queue_size;
        mem_size = (queue_size + 1)*element_size;
    }
    ShmLockFreeProducer::~ShmLockFreeProducer() {
        distory();
    }
    int ShmLockFreeProducer::create(const std::string& path, uint32_t magic) {
        key_t key = ftok(path.c_str(), magic);
        shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
        if(shmid == -1) {
            shmid = shmget(key, 0, 0);
            shmctl(shmid, IPC_RMID, NULL);
            shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
            if(shmid == -1) {
                std::cout<<"create shm fail"<<std::endl;
                return -1;
            }
        }
        memory_addr = (uint8_t*)shmat(shmid, 0, 0);
        if((void*)-1 == memory_addr) {
            std::cout<<"shmat error" <<std::endl;
            return -1;
        }
        meta = (Idx*)(memory_addr + queue_size*element_size);
        memset(meta, 0, sizeof(Idx));
        queue_addr = memory_addr;
        return 0;
    }
    int ShmLockFreeProducer::load(const std::string& path, uint32_t magic) {
        key_t key = ftok(path.c_str(), magic);
        shmid = shmget(key, 0, 0);
        if(shmid == -1) {
            std::cout<<"create shm error"<<std::endl;
            return -1;
        }
        memory_addr = (uint8_t*)shmat(shmid, 0, 0);
        if((void*)-1 == memory_addr) {
            std::cout<<"shmat error" <<std::endl;
            return -1;
        }
        meta = (Idx*)(memory_addr + queue_size*element_size);
        queue_addr = memory_addr;
        return 0;
    }
    bool ShmLockFreeProducer::isFull(uint32_t cur_idx) {
        return (cur_idx == meta->cidx.load(std::memory_order_acquire));
    }
    void ShmLockFreeProducer::produce(void* data, uint32_t size) {
        uint32_t cur_idx = meta->pidx.load(std::memory_order_relaxed) + 1;
        if(cur_idx == queue_size) {
            cur_idx = 0;
        }
        while(isFull(cur_idx)) {        
        }
        
        uint8_t* cur_addr = queue_addr + cur_idx * element_size;
        memcpy(cur_addr, data, size);
        meta->pidx.store(cur_idx, std::memory_order_release);
    }
    void ShmLockFreeProducer::distory() {
        if(shmid == -1) {
            return;
        }
        shmctl(shmid, IPC_RMID, NULL);
        shmdt(memory_addr);
    }
    
    class ShmLockFreeConsumer {
    private:
        Idx* meta;
        uint8_t* memory_addr;
        uint8_t* queue_addr;
        uint32_t mem_size;
        uint32_t element_size;
        uint32_t queue_size;
        int shmid;
    public:
        ShmLockFreeConsumer(uint32_t element_size, uint32_t queue_size);
        ~ShmLockFreeConsumer();
        int create(const std::string& path, uint32_t magic);
        int load(const std::string& path, uint32_t magic);
        bool peek(void*& data, uint32_t size);
        void ack();
    private:
        void distory();
        bool isEmpty();
    };
    ShmLockFreeConsumer::ShmLockFreeConsumer(uint32_t element_size, uint32_t queue_size) {
        meta = nullptr;
        memory_addr = nullptr;
        queue_addr = nullptr;
        shmid = -1;
        this->element_size = element_size;
        this->queue_size = queue_size;
        mem_size = (queue_size + 1)*element_size;
    }
    ShmLockFreeConsumer::~ShmLockFreeConsumer() {
        distory();
    }
    bool ShmLockFreeConsumer::peek(void*& data, uint32_t size) {
        if(isEmpty()) {
            return false;
        }
        uint32_t cur_idx = (meta->cidx.load(std::memory_order_relaxed) + 1);
        if(cur_idx == queue_size) {
            cur_idx = 0;
        }
    
        data = queue_addr + cur_idx * element_size;
        return true;
    }
    void ShmLockFreeConsumer::ack() {
        uint32_t cur_idx = (meta->cidx.load(std::memory_order_relaxed) + 1);
        if(cur_idx == queue_size) {
            cur_idx = 0;
        }
        meta->cidx.store(cur_idx, std::memory_order_release);
    }
    int ShmLockFreeConsumer::create(const std::string& path, uint32_t magic) {
        key_t key = ftok(path.c_str(), magic);
        shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
        if(shmid == -1) {
            shmid = shmget(key, 0, 0);
            shmctl(shmid, IPC_RMID, NULL);
            shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
            if(shmid == -1) {
                std::cout<<"create shm fail"<<std::endl;
                return -1;
            }
        }
        memory_addr = (uint8_t*)shmat(shmid, 0, 0);
        if((void*)-1 == memory_addr) {
            std::cout<<"shmat error" <<std::endl;
            return -1;
        }
        meta = (Idx*)(memory_addr + queue_size*element_size);
        memset(meta, 0, sizeof(Idx));
        queue_addr = memory_addr;
        return 0;
    }
    int ShmLockFreeConsumer::load(const std::string& path, uint32_t magic) {
        key_t key = ftok(path.c_str(), magic);
        shmid = shmget(key, 0, 0);
        if(shmid == -1) {
            std::cout<<"create shm error"<<std::endl;
            return -1;
        }
        memory_addr = (uint8_t*)shmat(shmid, 0, 0);
        if((void*)-1 == memory_addr) {
            std::cout<<"shmat error" <<std::endl;
            return -1;
        }
        meta = (Idx*)(memory_addr + queue_size*element_size);
        queue_addr = memory_addr;
        return 0;
    }
    
    void ShmLockFreeConsumer::distory() {
        if(shmid == -1) {
            return;
        }
        shmctl(shmid, IPC_RMID, NULL);
        shmdt(memory_addr);
        
    }
    bool ShmLockFreeConsumer::isEmpty() {
        return meta->pidx.load(std::memory_order_acquire) == meta->cidx.load(std::memory_order_relaxed);
    }
    
    int main () {
        pid_t c_pid = fork();
        if(c_pid == 0) {
            
            ShmLockFreeConsumer c(g_element_size, g_queue_size);
            if(0 != c.create(path, magic)) {
                std::cout<<"consumer create error " <<std::endl;
                return 0;
            }
            void* data;
            uint32_t size = sizeof(uint32_t);
            while(true) {
                if(c.peek(data, size)) {
                    static uint32_t i = 0;
                    uint32_t p = *(uint32_t*)data;
                    if (i != p) {
                        break;
                    }
                    i++;
                    if(i == max_number) {
                        i = 0;
                    }
                    c.ack();
                }
                
            }
        } else {
            std::this_thread::sleep_for(std::chrono::milliseconds(1000));
            ShmLockFreeProducer p(g_element_size, g_queue_size);
            if(0 != p.load(path, magic)) {
                std::cout<<"produce create error"<<std::endl;
                return 0;
            }
            
            for(int m=0; m<3; m++) {
                struct timeval start;
                gettimeofday(&start, NULL);
                uint64_t count = 0;
                for(int k=0; k<100000; ++k) {
                    for(uint32_t i=0; i<max_number; ++i) {
                        p.produce(&i, sizeof(i));
                        count++;
                    }
                }
                struct timeval end;
                gettimeofday(&end, NULL);
                
                uint64_t token = end.tv_sec*1000000 + end.tv_usec - start.tv_sec*1000000 - start.tv_usec;
                std::cout<<"produce "<<count<<" record use "<<token<<"us "<<" qps "<<count*100/token<<"w"<<std::endl;
            }
            int i = stop_number;
            p.produce(&i, sizeof(i));
            wait(nullptr);
        }
        
        
        return 0;
    }
    
    g++ -std=c++11 main.cpp -lpthread -O3
    

    相关文章

      网友评论

          本文标题:跨进程无锁队列

          本文链接:https://www.haomeiwen.com/subject/glxavdtx.html