Bufferlist的设计与实现
Buffer::list是Ceph中用于序列化的工具。将所有数据结构能够转换成二进制的数据,用于存储及网络传输。每一个数据结构需要提供encode及decode方法。
Bufferlist,主要三个结构体:
- buffer::raw
- buffer::ptr
- buffer::list
自定义智能指针:
- 自定义智能指针的析构函数,此处智能指针不释放资源
- unique_leakable_ptr只涉及控制权的管理,仍需要手动(delete)释放对应资源
template <class T>
struct nop_delete {
void operator()(T*) {}
};
template <class T>
struct unique_leakable_ptr : public std::unique_ptr<T, ceph::nop_delete<T>> {
using std::unique_ptr<T, ceph::nop_delete<T>>::unique_ptr;
};
三个数据结构体之间的关系如下图:
buffer::raw
数据结构如下,其中mempool用于跟踪容器使用的内存大小,从raw中剔除出去已更清晰的查看raw的数据结构:
class raw {
public:
/*
* 大小为sizeof(ptr_node),alignof(ptr_node)对齐的类型。用于ptr_node的构造(new placement方式),实际并未使用
*/
std::aligned_storage<sizeof(ptr_node),alignof(ptr_node)>::type bptr_storage;
/*
* buffer
*/
char *data;
unsigned len;
/*
* 引用计数
*/
ceph::atomic<unsigned> nref { 0 };
/*
* 指定数据段的校验值
*/
std::pair<size_t, size_t> last_crc_offset {std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::max()};
std::pair<uint32_t, uint32_t> last_crc_val;
/*
* spinlock: std::atomic_flag(原子布尔类型),实现自旋互斥,免锁结构
* class spinlock{
std::atomoc_flag af = ATOMIC_FLAG_INIT;
void lock(){
while(af.test_and_set(std::memory_order_acquire));
}
void unlock(){
af.clear(std::memory_order_release);
}
};
*/
mutable ceph::spinlock crc_spinlock;
/*
* raw构造函数,raw是作为基类,实际数据分配方式是由子类实现
*/
explicit raw(unsigned l)
: data(nullptr), len(l), nref(0){
}
raw(char *c, unsigned l)
: data(c), len(l), nref(0) {
}
virtual ~raw() {
}
void _set_len(unsigned l) {
len = l;
}
private:
/*
* 禁止拷贝复制
*/
raw(const raw &other) = delete;
const raw& operator=(const raw &other) = delete;
public:
/*
* 获取原始数据
*/
char *get_data() {
return data;
}
/*
* clone data,具体实现由子类定义
*/
virtual raw* clone_empty() = 0;
/*
* 数据拷贝
*/
ceph::unique_leakable_ptr<raw> clone() {
raw* const c = clone_empty();
memcpy(c->data, data, len);
return ceph::unique_leakable_ptr<raw>(c);
}
/*
* crc相关设置
*/
bool get_crc(const std::pair<size_t, size_t> &fromto,std::pair<uint32_t, uint32_t> *crc) const {
std::lock_guard lg(crc_spinlock);
if (last_crc_offset == fromto) {
*crc = last_crc_val;
return true;
}
return false;
}
void set_crc(const std::pair<size_t, size_t> &fromto,const std::pair<uint32_t, uint32_t> &crc) {
std::lock_guard lg(crc_spinlock);
last_crc_offset = fromto;
last_crc_val = crc;
}
void invalidate_crc() {
std::lock_guard lg(crc_spinlock);
last_crc_offset.first = std::numeric_limits<size_t>::max();
last_crc_offset.second = std::numeric_limits<size_t>::max();
}
};
raw不同buffer分配方式的子类:
- class raw_malloc;
- class raw_static;
- class raw_posix_aligned;
- class raw_hack_aligned;
- class raw_char;
- class raw_claimed_char;
- class raw_unshareable;
- class raw_combined;
- class raw_claim_buffer;
raw_combined的定义如下:
- raw_combined的object与data buffer分配在一个buffer上,data处于buffer的开头,object为buffer尾
- buffer大小以alignment对齐
class buffer::raw_combined : public buffer::raw {
/*
* 对齐大小
*/
size_t alignment;
public:
/*
* 构造函数,dataptr指向已分配好的buffer
*/
raw_combined(char *dataptr, unsigned l, unsigned align)
:raw(dataptr, l),
alignment(align) {}
/*
* 定义本类的clone的方法,创建一个新的buffer,调用release()返回裸指针
*/
raw* clone_empty() override {
return create(len, alignment).release();
}
/*
* 入口静态函数:
1. 使用posix_memalign,创建align对齐的buffer,buffer大小为:data+self(raw_combined)
2. 在新分配的buffer上,构造raw_combined对象,使用operator new (buf) class
3. operator delete,定义本类的delete函数,在使用delete {*raw_combined}时,调用此函数
*/
static ceph::unique_leakable_ptr<buffer::raw> create(unsigned len, unsigned alig)
{
if (!align)
align = sizeof(size_t);
size_t rawlen = round_up_to(sizeof(buffer::raw_combined),alignof(buffer::raw_combined));
size_t datalen = round_up_to(len, alignof(buffer::raw_combined));
char *ptr = 0;
int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
if (r)
throw bad_alloc();
return ceph::unique_leakable_ptr<buffer::raw>(new (ptr + datalen) raw_combined(ptr, len, align));
}
static void operator delete(void *ptr) {
raw_combined *raw = (raw_combined *)ptr;
::free((void *)raw->data);
}
};
raw_malloc定义如下:
- 使用mollac进行buffer的分配
- 无对齐要求
class buffer::raw_malloc : public buffer::raw {
public:
/*
* 构造函数,使用malloc buffer的分配
*/
explicit raw_malloc(unsigned l) : raw(l) {
if (len) {
data = (char *)malloc(len);
if (!data)
throw bad_alloc();
} else {
data = 0;
}
}
/*
* 利用创建好的buffer初始化raw_malloc
*/
raw_malloc(unsigned l, char *b) : raw(b, l) {
}
/*
* 资源释放
*/
~raw_malloc() override {
free(data);
}
raw* clone_empty() override {
return new raw_malloc(len);
}
};
raw_posix_aligned定义
- 利用posix_memalign, 创建align对齐的data buffer
- 与raw_combined的区别是,buffer就是data buffer,不包含raw_posix_aligned object
实现如下:
class buffer::raw_posix_aligned : public buffer::raw {
/*
* data buffer对齐大小
*/
unsigned align;
public:
/*
* 构造函数。利用posix_memalign,构造大小为len,以align对齐的data buffer
*/
raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
align = _align;
assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
int r = ::posix_memalign((void**)(void*)&data, align, len);
if (r)
throw bad_alloc();
}
/*
* 释放data buffer
*/
~raw_posix_aligned() override {
::free(data);
}
/*
* clone,大小与此data buffer相等,内容为空
*/
raw* clone_empty() override {
return new raw_posix_aligned(len, align);
}
};
raw_static定义
- data buffer使用static buffer
- 无需对齐
class buffer::raw_static : public buffer::raw {
public:
/*
* 构造函数:d是static buffer
* 析构函数:无需释放
*/
raw_static(const char *d, unsigned l) : raw((char*)d, l) { }
~raw_static() override {}
/*
* clone 的buffer,利用new raw_char创建新的buffer
*/
raw* clone_empty() override {
return new buffer::raw_char(len);
}
};
raw_char、raw_claimed_char、raw_claim_buffer定义
- 创建char类型的buffer
- 无需对齐
- raw_char在析构函数中释放资源,因此只能接受new分配的buffer
- raw_claimed_char:不负责释放资源,因此可以是局部变量。针对new创建的buffer,则需手动显示释放
- raw_claim_buffer: 可以接收自定义删除器
定义代码如下:
class buffer::raw_char : public buffer::raw {
public:
/*
* 构造函数:使用new 创建buffer
*/
explicit raw_char(unsigned l) : raw(l) {
if (len)
data = new char[len];
else
data = 0;
}
/*
* 传入已建好的buffer(char *b),作为此raw的buffer,此buffer需要是new创建的
*/
raw_char(unsigned l, char *b) : raw(b, l) {
}
/*
* 释放资源
*/
~raw_char() override {
delete[] data;
}
/*
* clone buffer,new新建
*/
raw* clone_empty() override {
return new raw_char(len);
}
};
class buffer::raw_claimed_char : public buffer::raw {
public:
/*
* 构造函数,传入buffer
* 析构函数不负责释放资源,因此外部buffer是局部变量,或是new资源,则需手动释放
*/
explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
}
~raw_claimed_char() override {
}
raw* clone_empty() override {
return new raw_char(len);
}
};
class buffer::raw_claim_buffer : public buffer::raw {
deleter del;
public:
raw_claim_buffer(const char *b, unsigned l, deleter d)
: raw((char*)b, l), del(std::move(d)) { }
~raw_claim_buffer() override {}
raw* clone_empty() override {
return new buffer::raw_char(len);
}
};
RAW创建接口
创建接口如下,剔除mempool相关的信息:
ceph::unique_leakable_ptr<raw> copy(const char *c, unsigned len);
ceph::unique_leakable_ptr<raw> create(unsigned len);
ceph::unique_leakable_ptr<raw> create_in_mempool(unsigned len, int mempool);
ceph::unique_leakable_ptr<raw> claim_char(unsigned len, char *buf);
ceph::unique_leakable_ptr<raw> create_malloc(unsigned len);
ceph::unique_leakable_ptr<raw> claim_malloc(unsigned len, char *buf);
ceph::unique_leakable_ptr<raw> create_static(unsigned len, char *buf);
ceph::unique_leakable_ptr<raw> create_aligned(unsigned len, unsigned align);
ceph::unique_leakable_ptr<raw> create_aligned_in_mempool(unsigned len, unsigned align, int mempool);
ceph::unique_leakable_ptr<raw> create_page_aligned(unsigned len);
ceph::unique_leakable_ptr<raw> create_small_page_aligned(unsigned len);
ceph::unique_leakable_ptr<raw> claim_buffer(unsigned len, char *buf, deleter del);
copy(const char *c, unsigned len):
调用:
auto r = create_aligned(len, sizeof(size_t));
memcpy(r->data, c, len);
return r;
create(unsigned len);
create_aligned(unsigned len, unsigned align); /* align = sizeof(size_t)*/
create_in_mempool(unsigned len, int mempool)
create_aligned(unsigned len, unsigned align)
调用:create_aligned_in_mempool(unsigned len, unsigned align, int mempool):
/*
* 如果对齐align 是系统PAGE_SIZE的整数倍,或者分配的大小大于PAGE_SIZE的两倍,则使用raw_posix_aligned接口
* 否则采用raw_combined
*/
if ((align & ~CEPH_PAGE_MASK) == 0 ||
len >= CEPH_PAGE_SIZE * 2) {
#ifndef __CYGWIN__
return ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len, align));
#else
return ceph::unique_leakable_ptr<buffer::raw>(new raw_hack_aligned(len, align));
#endif
}
return raw_combined::create(len, align, mempool);
}
claim_char(unsigned len, char *buf):
: new raw_claimed_char(len, buf)
create_malloc(unsigned len):
: new raw_malloc(len)
claim_malloc(unsigned len, char *buf):
: new raw_malloc(len, buf)
create_static(unsigned len, char *buf):
: new raw_static(buf, len)
create_page_aligned(unsigned len):
: 已系统PAGE_SIZE默认对齐
: create_aligned(len, CEPH_PAGE_SIZE)
create_small_page_aligned(unsigned len):
: 当len < CEPH_PAGE_SIZE: create_aligned(len, CEPH_BUFFER_ALLOC_UNIT) /*CEPH_BUFFER_ALLOC_UNIT: 4096u, 4KB*/
: > CEPH_PAGE_SIZE: create_aligned(len, CEPH_PAGE_SIZE)
claim_buffer(unsigned len, char *buf, deleter del):
: new raw_claim_buffer(buf, len, std::move(del))
网友评论