美文网首页
ring buffer实现

ring buffer实现

作者: 谭英智 | 来源:发表于2023-10-31 21:35 被阅读0次

背景

多线程通讯,如果是单读单写,一般使用带atomic操作的ring buffer来实现

ring buffer原理非常简单,而然,如果需要实现高性能,则需要在原始的设计上,扣细节,才能实现高性能的队列

从原始到高速

原始设计

read/write index各自占一个cache line

避免伪共享

class io_uring {
  atomic<uint32_t> writeIdx;
  uint32_t padding[cacheline_size - 1];
  atomic<uint32_t> readIdx;
  uint32_t padding[cacheline_size - 1];
};

通过操作atomic idx,实现无锁入队列和出队列

bool push(int val) {
  auto const writeIdx = writeIdx_.load(std::memory_order_relaxed);
  auto nextWriteIdx = writeIdx + 1;
  if (nextWriteIdx == data_.size()) {
    nextWriteIdx = 0;
  }
  if (nextWriteIdx == readIdx_.load(std::memory_order_acquire)) {
    return false;
  }
  data_[writeIdx] = val;
  writeIdx_.store(nextWriteIdx, std::memory_order_release);
  return true;
}
bool pop(int &val) {
  auto const readIdx = readIdx_.load(std::memory_order_relaxed);
  if (readIdx == writeIdx_.load(std::memory_order_acquire)) {
    return false;
  }
  val = data_[readIdx];
  auto nextReadIdx = readIdx + 1;
  if (nextReadIdx == data_.size()) {
    nextReadIdx = 0;
  }
  readIdx_.store(nextReadIdx, std::memory_order_release);
  return true;
}

加入idx线程内cache

reader thread加入对writeIdx的本线程cache:writeIdxCached_

writer thread 加入对writeIdx的本线程cache:readIdxCached_

这样,可以减少对另外一个线程的atomic变量的获取,减少memory order的次数

因为获取另外一个线程的atomic idx,一般需要使用memory_order_acquire

struct ringbuffer2 {
  std::vector<int> data_{};
  alignas(64) std::atomic<uint32_t> readIdx{0};
  alignas(64) uint32_t writeIdxCached_{0};
  alignas(64) std::atomic<uint32_t> writeIdx{0};
  alignas(64) uint32_t readIdxCached_{0};
}

writer thread在判断本线程过期的readIdxCached_无法满足写入时,才去拉去read thread的atomic变量readIdx

如果本线程的cache readIdxCached_能够满足写入,则可以省去获取真实的readIdx的开销

从而实现更高的性能

if (nextWriteIdx == readIdxCached_) {
  readIdxCached_ = readIdx.load(std::memory_order_acquire);
  if (nextWriteIdx == readIdxCached_) {
    return false;
  }
}

相同的,reader thread判断本线程过期的writeIdxCached_无法满足读取时,才去拉去writer thread 的atomic变量writerIdx

如果本线程的writeIdxCached_可以满足读取,则可以省去获取真实writeIdx的开销

从而实现更高的性能

if (readIdx == writeIdxCached_) {
  writeIdxCached_ = writeIdx_.load(std::memory_order_acquire);
  if (readIdx == writeIdxCached_) {
    return false;
  }
}

提升结果

  • 原始版本:551w ops/s
  • 加入cache后的版本:11228w ops/s

相关文章

网友评论

      本文标题:ring buffer实现

      本文链接:https://www.haomeiwen.com/subject/afrwidtx.html