介绍
实现多进程单读单写共享内存无锁队列
不使用信号量来同步
而是通过原子操作来同步
performance
produce 1000000000 record use 33425683us qps 2991w
produce 1000000000 record use 39856828us qps 2508w
produce 1000000000 record use 41187954us qps 2427w
优化点
- 使用局部变量,而不是类成员,减少编译优化的难度
- 使用padding,避免伪共享
- 优先是用relaxed来操作atomit变量
- 使用peek来获取内存,减少内存拷贝
- 使用加法来代替取余操作
- 去除不必要的变量
- 循环内重复计算外提,减少不必要的计算
code
#include <sys/types.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/shm.h>
#include <atomic>
#include <string>
#include <thread>
#include <iostream>
#include <cstring>
#include <chrono>
#include <sys/time.h>
#include <signal.h>
struct Idx {
std::atomic<uint32_t> pidx;
uint32_t padding_1[15];
std::atomic<uint32_t> cidx;
uint32_t padding_2[15];
};
const uint32_t max_number = 10000;
const uint32_t stop_number = 20000;
const uint32_t g_queue_size = 2047;
const uint32_t g_element_size = 2048;
const std::string path = "./";
const uint32_t magic = 100;
class ShmLockFreeProducer {
private:
Idx* meta;
uint8_t* memory_addr;
uint8_t* queue_addr;
uint32_t mem_size;
uint32_t element_size;
uint32_t queue_size;
int shmid;
public:
ShmLockFreeProducer(uint32_t element_size, uint32_t queue_size);
~ShmLockFreeProducer();
int create(const std::string& path, uint32_t magic);
int load(const std::string& path, uint32_t magic);
void produce(void* data, uint32_t size);
private:
void distory();
bool isFull(uint32_t cur_idx);
};
ShmLockFreeProducer::ShmLockFreeProducer(uint32_t element_size, uint32_t queue_size) {
meta = nullptr;
memory_addr = nullptr;
queue_addr = nullptr;
shmid = -1;
this->element_size = element_size;
this->queue_size = queue_size;
mem_size = (queue_size + 1)*element_size;
}
ShmLockFreeProducer::~ShmLockFreeProducer() {
distory();
}
int ShmLockFreeProducer::create(const std::string& path, uint32_t magic) {
key_t key = ftok(path.c_str(), magic);
shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
if(shmid == -1) {
shmid = shmget(key, 0, 0);
shmctl(shmid, IPC_RMID, NULL);
shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
if(shmid == -1) {
std::cout<<"create shm fail"<<std::endl;
return -1;
}
}
memory_addr = (uint8_t*)shmat(shmid, 0, 0);
if((void*)-1 == memory_addr) {
std::cout<<"shmat error" <<std::endl;
return -1;
}
meta = (Idx*)(memory_addr + queue_size*element_size);
memset(meta, 0, sizeof(Idx));
queue_addr = memory_addr;
return 0;
}
int ShmLockFreeProducer::load(const std::string& path, uint32_t magic) {
key_t key = ftok(path.c_str(), magic);
shmid = shmget(key, 0, 0);
if(shmid == -1) {
std::cout<<"create shm error"<<std::endl;
return -1;
}
memory_addr = (uint8_t*)shmat(shmid, 0, 0);
if((void*)-1 == memory_addr) {
std::cout<<"shmat error" <<std::endl;
return -1;
}
meta = (Idx*)(memory_addr + queue_size*element_size);
queue_addr = memory_addr;
return 0;
}
bool ShmLockFreeProducer::isFull(uint32_t cur_idx) {
return (cur_idx == meta->cidx.load(std::memory_order_acquire));
}
void ShmLockFreeProducer::produce(void* data, uint32_t size) {
uint32_t cur_idx = meta->pidx.load(std::memory_order_relaxed) + 1;
if(cur_idx == queue_size) {
cur_idx = 0;
}
while(isFull(cur_idx)) {
}
uint8_t* cur_addr = queue_addr + cur_idx * element_size;
memcpy(cur_addr, data, size);
meta->pidx.store(cur_idx, std::memory_order_release);
}
void ShmLockFreeProducer::distory() {
if(shmid == -1) {
return;
}
shmctl(shmid, IPC_RMID, NULL);
shmdt(memory_addr);
}
class ShmLockFreeConsumer {
private:
Idx* meta;
uint8_t* memory_addr;
uint8_t* queue_addr;
uint32_t mem_size;
uint32_t element_size;
uint32_t queue_size;
int shmid;
public:
ShmLockFreeConsumer(uint32_t element_size, uint32_t queue_size);
~ShmLockFreeConsumer();
int create(const std::string& path, uint32_t magic);
int load(const std::string& path, uint32_t magic);
bool peek(void*& data, uint32_t size);
void ack();
private:
void distory();
bool isEmpty();
};
ShmLockFreeConsumer::ShmLockFreeConsumer(uint32_t element_size, uint32_t queue_size) {
meta = nullptr;
memory_addr = nullptr;
queue_addr = nullptr;
shmid = -1;
this->element_size = element_size;
this->queue_size = queue_size;
mem_size = (queue_size + 1)*element_size;
}
ShmLockFreeConsumer::~ShmLockFreeConsumer() {
distory();
}
bool ShmLockFreeConsumer::peek(void*& data, uint32_t size) {
if(isEmpty()) {
return false;
}
uint32_t cur_idx = (meta->cidx.load(std::memory_order_relaxed) + 1);
if(cur_idx == queue_size) {
cur_idx = 0;
}
data = queue_addr + cur_idx * element_size;
return true;
}
void ShmLockFreeConsumer::ack() {
uint32_t cur_idx = (meta->cidx.load(std::memory_order_relaxed) + 1);
if(cur_idx == queue_size) {
cur_idx = 0;
}
meta->cidx.store(cur_idx, std::memory_order_release);
}
int ShmLockFreeConsumer::create(const std::string& path, uint32_t magic) {
key_t key = ftok(path.c_str(), magic);
shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
if(shmid == -1) {
shmid = shmget(key, 0, 0);
shmctl(shmid, IPC_RMID, NULL);
shmid = shmget(key, mem_size, IPC_CREAT|0666|IPC_EXCL);
if(shmid == -1) {
std::cout<<"create shm fail"<<std::endl;
return -1;
}
}
memory_addr = (uint8_t*)shmat(shmid, 0, 0);
if((void*)-1 == memory_addr) {
std::cout<<"shmat error" <<std::endl;
return -1;
}
meta = (Idx*)(memory_addr + queue_size*element_size);
memset(meta, 0, sizeof(Idx));
queue_addr = memory_addr;
return 0;
}
int ShmLockFreeConsumer::load(const std::string& path, uint32_t magic) {
key_t key = ftok(path.c_str(), magic);
shmid = shmget(key, 0, 0);
if(shmid == -1) {
std::cout<<"create shm error"<<std::endl;
return -1;
}
memory_addr = (uint8_t*)shmat(shmid, 0, 0);
if((void*)-1 == memory_addr) {
std::cout<<"shmat error" <<std::endl;
return -1;
}
meta = (Idx*)(memory_addr + queue_size*element_size);
queue_addr = memory_addr;
return 0;
}
void ShmLockFreeConsumer::distory() {
if(shmid == -1) {
return;
}
shmctl(shmid, IPC_RMID, NULL);
shmdt(memory_addr);
}
bool ShmLockFreeConsumer::isEmpty() {
return meta->pidx.load(std::memory_order_acquire) == meta->cidx.load(std::memory_order_relaxed);
}
int main () {
pid_t c_pid = fork();
if(c_pid == 0) {
ShmLockFreeConsumer c(g_element_size, g_queue_size);
if(0 != c.create(path, magic)) {
std::cout<<"consumer create error " <<std::endl;
return 0;
}
void* data;
uint32_t size = sizeof(uint32_t);
while(true) {
if(c.peek(data, size)) {
static uint32_t i = 0;
uint32_t p = *(uint32_t*)data;
if (i != p) {
break;
}
i++;
if(i == max_number) {
i = 0;
}
c.ack();
}
}
} else {
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
ShmLockFreeProducer p(g_element_size, g_queue_size);
if(0 != p.load(path, magic)) {
std::cout<<"produce create error"<<std::endl;
return 0;
}
for(int m=0; m<3; m++) {
struct timeval start;
gettimeofday(&start, NULL);
uint64_t count = 0;
for(int k=0; k<100000; ++k) {
for(uint32_t i=0; i<max_number; ++i) {
p.produce(&i, sizeof(i));
count++;
}
}
struct timeval end;
gettimeofday(&end, NULL);
uint64_t token = end.tv_sec*1000000 + end.tv_usec - start.tv_sec*1000000 - start.tv_usec;
std::cout<<"produce "<<count<<" record use "<<token<<"us "<<" qps "<<count*100/token<<"w"<<std::endl;
}
int i = stop_number;
p.produce(&i, sizeof(i));
wait(nullptr);
}
return 0;
}
g++ -std=c++11 main.cpp -lpthread -O3
网友评论