美文网首页
ring buffer实现

ring buffer实现

作者: 谭英智 | 来源:发表于2023-10-31 21:35 被阅读0次

    背景

    多线程通讯,如果是单读单写,一般使用带atomic操作的ring buffer来实现

    ring buffer原理非常简单,而然,如果需要实现高性能,则需要在原始的设计上,扣细节,才能实现高性能的队列

    从原始到高速

    原始设计

    read/write index各自占一个cache line

    避免伪共享

    class io_uring {
      atomic<uint32_t> writeIdx;
      uint32_t padding[cacheline_size - 1];
      atomic<uint32_t> readIdx;
      uint32_t padding[cacheline_size - 1];
    };
    

    通过操作atomic idx,实现无锁入队列和出队列

    bool push(int val) {
      auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
      auto nextWriteIdx = writeIdx + 1;
      if (nextWriteIdx == data_.size()) {
        nextWriteIdx = 0;
      }
      if (nextWriteIdx == readIdx_.load(std::memory_order_acquire)) {
        return false;
      }
      data_[writeIdx] = val;
      writeIdx_.store(nextWriteIdx, std::memory_order_release);
      return true;
    }
    bool pop(int &val) {
      auto const readIdx = readIdx_.load(std::memory_order_relaxed);
      if (readIdx == writeIdx_.load(std::memory_order_acquire)) {
        return false;
      }
      val = data_[readIdx];
      auto nextReadIdx = readIdx + 1;
      if (nextReadIdx == data_.size()) {
        nextReadIdx = 0;
      }
      readIdx_.store(nextReadIdx, std::memory_order_release);
      return true;
    }
    

    加入idx线程内cache

    reader thread加入对writeIdx的本线程cache:writeIdxCached_

    writer thread 加入对writeIdx的本线程cache:readIdxCached_

    这样,可以减少对另外一个线程的atomic变量的获取,减少memory order的次数

    因为获取另外一个线程的atomic idx,一般需要使用memory_order_acquire

    struct ringbuffer2 {
      std::vector<int> data_{};
      alignas(64) std::atomic<uint32_t> readIdx{0};
      alignas(64) uint32_t writeIdxCached_{0};
      alignas(64) std::atomic<uint32_t> writeIdx{0};
      alignas(64) uint32_t readIdxCached_{0};
    }
    

    writer thread在判断本线程过期的readIdxCached_无法满足写入时,才去拉去read thread的atomic变量readIdx

    如果本线程的cache readIdxCached_能够满足写入,则可以省去获取真实的readIdx的开销

    从而实现更高的性能

    if (nextWriteIdx == readIdxCached_) {
      readIdxCached_ = readIdx.load(std::memory_order_acquire);
      if (nextWriteIdx == readIdxCached_) {
        return false;
      }
    }
    

    相同的,reader thread判断本线程过期的writeIdxCached_无法满足读取时,才去拉去writer thread 的atomic变量writerIdx

    如果本线程的writeIdxCached_可以满足读取,则可以省去获取真实writeIdx的开销

    从而实现更高的性能

    if (readIdx == writeIdxCached_) {
      writeIdxCached_ = writeIdx_.load(std::memory_order_acquire);
      if (readIdx == writeIdxCached_) {
        return false;
      }
    }
    

    提升结果

    • 原始版本:551w ops/s
    • 加入cache后的版本:11228w ops/s

    相关文章

      网友评论

          本文标题:ring buffer实现

          本文链接:https://www.haomeiwen.com/subject/afrwidtx.html