buffer::list,结构如下:
list bufferlist_arch成员变量:
- buffers_t _buffers: ptr循环链表
- ptr* _carriage: 指向list中最后一个可修改的ptr指针
- unsigned _len, _num: _len list数据长度,_num list包含的ptr个数
- static ptr always_empty_bptr: 初始化_carriage
buffer::list迭代器:
class iterator_impl
class iterator: iterator_impl
iterator_impl的成员变量
typedef typename std::conditional<is_const,const list,list>::type bl_t;
typedef typename std::conditional<is_const,const buffers_t,buffers_t >::type list_t;
typedef typename std::conditional<is_const,typename buffers_t::const_iterator,typename buffers_t::iterator>::type list_iter_t;
- bl_t* bl; 指向list本身
- list_t* ls; 指向buffers_t,存储ptr的链表
- list_iter_t p; buffers_t的迭代器
- unsigned off; 相对于list.begin()的绝对偏移量,例如调用seek(o): off+=o;p_off更新为当前ptr内部的偏移量
- unsigned p_off; 在当前ptr中的偏移量,如此iterator指向ptr,则实际数据位置位于:_raw[ptr->_off+p_off]
iterator_impl中提供了copy的接口函数,主要是用于decode,将list中的数据拷贝出来。
template<bool is_const>
void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)
代码实现:
- seek(off): 定位iterator的数据位置,off是绝对偏移量,p指向对应的ptr,p_off在此ptr中的偏移量
- 数据读取,p->lenth()-p_off,ptr中包含的数据
- p->copy_out,调用ptr copy_out方法,读取数据
- *this += howmuch,定位下一次需要读取的数据位置
if (p == ls->end()) seek(off);
while (len > 0) {
if (p == ls->end())
throw end_of_buffer();
unsigned howmuch = p->length() - p_off;
if (len < howmuch) howmuch = len;
p->copy_out(p_off, howmuch, dest);
dest += howmuch;
len -= howmuch;
*this += howmuch;
}
list的接口函数:
向list中存储长度为len的数据
- free_in_last: 获取list中剩余空间,实际即是_carriage的剩余空间;get_append_buffer_unused_tail_length: _carriage->unused_tail_length()
- first_round: min(len, free_in_last),先将数据存储在剩余空间
- 判断_carriage是不是list中的最后一个元素,(1. 非通过list接口存储过ptr*,(2. _buffers被std::move(),或是claim_append(),则_carriage!=_buffers.back());
_carriage != &_buffers.back():
- 创建一个新的ptr,ptr指向的raw与_carriage是同一个raw
- 更新_carriage为新的ptr
- _buffers存储新的ptr
- _num自增
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
- _carriage存储第一部分数据
- 计算剩余待存储的数据
- 创建一个新的ptr=new_back,指向一个新建的raw缓存,更新_carriage = new_back, list::push_back(new_back)
- new_back.append()存储剩余的数据
auto& new_back = refill_append_space(second_round);
new_back.append(data + first_round, second_round);
auto refill_append_space(len):
{
/*
** 数据对齐
*/
size_t need = round_up_to(len, sizeof(size_t)) + sizeof(raw_combined);
size_t alen = round_up_to(need, CEPH_BUFFER_ALLOC_UNIT) - sizeof(raw_combined);
auto new_back = ptr_node::create(raw_combined::create(alen, 0, get_mempool()));
new_back->set_length(0); // unused, so far.
_carriage = new_back.get();
_buffers.push_back(*new_back.release());
_num += 1;
return _buffers.back();
}
encode接口:append的实现代码如下:
void buffer::list::append(const char *data, unsigned len)
{
_len += len;
const unsigned free_in_last = get_append_buffer_unused_tail_length();
const unsigned first_round = std::min(len, free_in_last);
if (first_round) {
// _buffers and carriage can desynchronize when 1) a new ptr
// we don't own has been added into the _buffers 2) _buffers
// has been emptied as as a result of std::move or stolen by
// claim_append.
if (unlikely(_carriage != &_buffers.back())) {
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
_carriage->append(data, first_round);
}
const unsigned second_round = len - first_round;
if (second_round) {
auto& new_back = refill_append_space(second_round);
new_back.append(data + first_round, second_round);
}
}
decode的接口: list::iterator_impl<is_const>::copy(unsigned len, char *dest)
:
bufferlist中的数据拷贝至dest,数据长度为len:
- 定位至数据开始位置
- 循环读取迭代器中的数据
template<bool is_const>
void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)
{
if (p == ls->end()) seek(off);
while (len > 0) {
if (p == ls->end())
throw end_of_buffer();
unsigned howmuch = p->length() - p_off;
if (len < howmuch) howmuch = len;
p->copy_out(p_off, howmuch, dest);
dest += howmuch;
len -= howmuch;
*this += howmuch;
}
}
**部分数据结构注释: **
以下数据结构用于获取相邻的可写空间:
struct reserve_t {
char* bp_data; // ptr数据结尾,剩余空间头
unsigned* bp_len; // ptr指针数据长度
unsigned* bl_len; // bufferlist的数据长度
};
class contiguous_appender {
ceph::bufferlist& bl;
ceph::bufferlist::reserve_t space;
char* pos;
bool deep;
/// running count of bytes appended that are not reflected by @pos
size_t out_of_band_offset = 0;
contiguous_appender(bufferlist& bl, size_t len, bool d)
: bl(bl),
space(bl.obtain_contiguous_space(len)),
pos(space.bp_data),
deep(d) {
}
}
/*
** 获取相邻可写空间的接口
** 1. 判断当前_carriage剩余空间是否满足需求
** 1.1 不满足,新建一个raw缓冲区,更新_carriage
** 1.2 满足,利用现有的_carriage缓冲区
** 返回 reserve_t,指向对应的缓冲区
*/
reserve_t obtain_contiguous_space(const unsigned len)
{
// note: if len < the normal append_buffer size it *might*
// be better to allocate a normal-sized append_buffer and
// use part of it. however, that optimizes for the case of
// old-style types including new-style types. and in most
// such cases, this won't be the very first thing encoded to
// the list, so append_buffer will already be allocated.
// OTOH if everything is new-style, we *should* allocate
// only what we need and conserve memory.
if (unlikely(get_append_buffer_unused_tail_length() < len)) {
auto new_back = buffer::ptr_node::create(buffer::create(len)).release();
new_back->set_length(0); // unused, so far.
_buffers.push_back(*new_back);
_num += 1;
_carriage = new_back;
return { new_back->c_str(), &new_back->_len, &_len };
} else {
if (unlikely(_carriage != &_buffers.back())) {
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
}
return { _carriage->end_c_str(), &_carriage->_len, &_len };
}
}
class contiguous_filler: 用于encode时,为元数据预留空间
buffers_t的数据结构如下:
class buffers_t {
/*
** _root用于连接ptr的头尾指针,本身无内容
*/
ptr_hook _root;
ptr_hook* _tail;
public:
/*
** buffers_t的迭代器
*/
template <class T>
class buffers_iterator {
public:
using value_type = T;
using reference = typename std::add_lvalue_reference<T>::type;
using pointer = typename std::add_pointer<T>::type;
using difference_type = std::ptrdiff_t;
using iterator_category = std::forward_iterator_tag;
};
typedef buffers_iterator<const ptr_node> const_iterator;
typedef buffers_iterator<ptr_node> iterator;
typedef const ptr_node& const_reference;
typedef ptr_node& reference;
};
encode、decode方法
前述内容中介绍了raw/ptr/list的定义及数据结构,encoding.h
中定义了对不同类型变量的encode及decode的方法。
如下定义了char float double bool 等类型的encode/decode接口,Bool类型以一字节存储.
template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{
bl.append((char*)&t, sizeof(t));
}
template<class T>
inline void decode_raw(T& t, bufferlist::const_iterator &p)
{
p.copy(sizeof(t), (char*)&t);
}
#define WRITE_RAW_ENCODER(type) \
inline void encode(const type &v, ::ceph::bufferlist& bl, uint64_t features=0) { ::ceph::encode_raw(v, bl); } \
inline void decode(type &v, ::ceph::bufferlist::const_iterator& p) { ::ceph::decode_raw(v, p); }
WRITE_RAW_ENCODER(char)
WRITE_RAW_ENCODER(float)
WRITE_RAW_ENCODER(double)
inline void encode(const bool &v, bufferlist& bl) {
__u8 vv = v;
encode_raw(vv, bl);
}
inline void decode(bool &v, bufferlist::const_iterator& p) {
__u8 vv;
decode_raw(vv, p);
v = vv;
}
针对自定义类型,实际是调用类中自定义的encode方法:
#define WRITE_CLASS_ENCODER(cl) \
inline void encode(const cl& c, ::ceph::buffer::list &bl, uint64_t features=0) { \
ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); } \
inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }
#define WRITE_CLASS_MEMBER_ENCODER(cl) \
inline void encode(const cl &c, ::ceph::bufferlist &bl) const { \
ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); } \
inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }
#define WRITE_CLASS_ENCODER_FEATURES(cl) \
inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features) { \
ENCODE_DUMP_PRE(); c.encode(bl, features); ENCODE_DUMP_POST(cl); } \
inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }
#define WRITE_CLASS_ENCODER_OPTIONAL_FEATURES(cl) \
inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features = 0) { \
ENCODE_DUMP_PRE(); c.encode(bl, features); ENCODE_DUMP_POST(cl); } \
inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }
以自定义类型示例:
- 定义encode方法
- 定义decode方法
- 调用WRITE_CLASS_ENCODER(rgw_data_change),生成关于此自定义类型的encode/decode方法
struct rgw_data_change {
DataLogEntityType entity_type;
string key;
real_time timestamp;
void encode(bufferlist& bl) const {
ENCODE_START(1, 1, bl);
uint8_t t = (uint8_t)entity_type;
encode(t, bl);
encode(key, bl);
encode(timestamp, bl);
ENCODE_FINISH(bl);
}
void decode(bufferlist::const_iterator& bl) {
DECODE_START(1, bl);
uint8_t t;
decode(t, bl);
entity_type = (DataLogEntityType)t;
decode(key, bl);
decode(timestamp, bl);
DECODE_FINISH(bl);
}
void dump(Formatter *f) const;
void decode_json(JSONObj *obj);
};
WRITE_CLASS_ENCODER(rgw_data_change)
ENCODE_START(v, compat, bl)
- 计算元数据大小
- 在list中为元数据预留空间
- 预留空间的起始位置即为filler
/*
** 开始encoding模块
**
** @param v 当前encoding版本
** @param 能够decode的最老版本
** @param bl bufferlist
** Feature:
** bufferlist中预留空间用于存储v,compat,及数据长度
** filler: list::contiguous_filler,包含一个char* pos,指向预留空间的头
**
*/
#define ENCODE_START(v, compat, bl) \
__u8 struct_v = v; \
__u8 struct_compat = compat; \
ceph_le32 struct_len; \
auto filler = (bl).append_hole(sizeof(struct_v) + sizeof(struct_compat) + sizeof(struct_len)); \
const auto starting_bl_len = (bl).length(); \
using ::ceph::encode; \
do {
ENCODE_FINISH(bl)
encode收尾工作:
- 将struct_v、struct_compat、struct_len数据写入预留空间
/**
* finish encoding block
*
* @param bl bufferlist we were encoding to
* @param new_struct_compat struct-compat value to use
*/
#define ENCODE_FINISH_NEW_COMPAT(bl, new_struct_compat) \
} while (false); \
if (new_struct_compat) { \
struct_compat = new_struct_compat; \
} \
struct_len = (bl).length() - starting_bl_len; \
filler.copy_in(sizeof(struct_v), (char *)&struct_v); \
filler.copy_in(sizeof(struct_compat), (char *)&struct_compat); \
filler.copy_in(sizeof(struct_len), (char *)&struct_len);
#define ENCODE_FINISH(bl) ENCODE_FINISH_NEW_COMPAT(bl, 0)
decode(v,bl)
- decode出struct_compat,判断当前版本能否decode,v>=struct_compat说明可以decode
- 解析其余元数据,如struct_v、struct_len等数据
- 依次decode其余数据
/*
** start a decoding block
**
** @param v current version of the encoding that the code supports/encodes
** @param bl bufferlist::iterator for the encoded data
*/
#define DECODE_START(v, bl) \
__u8 struct_v, struct_compat; \
using ::ceph::decode; \
decode(struct_v, bl); \
decode(struct_compat, bl); \
if (v < struct_compat) \
throw ::ceph::buffer::malformed_input(DECODE_ERR_OLDVERSION(__PRETTY_FUNCTION__, v, struct_compat)); \
__u32 struct_len; \
decode(struct_len, bl); \
if (struct_len > bl.get_remaining()) \
throw ::ceph::buffer::malformed_input(DECODE_ERR_PAST(__PRETTY_FUNCTION__)); \
unsigned struct_end = bl.get_off() + struct_len; \
do {
DECODE_FINISH(bl)
- 对数据长度等做校验
- bl.get_off: return off;返回绝对偏移量
- struct_end: 根据struct_len计算出的数据结尾的绝对偏移量
- 如果off>struct_end: 说明读取了不该读取的内容
/**
* finish decode block
*
* @param bl bufferlist::iterator we were decoding from
*/
#define DECODE_FINISH(bl) \
} while (false); \
if (struct_end) { \
if (bl.get_off() > struct_end) \
throw ::ceph::buffer::malformed_input(DECODE_ERR_PAST(__PRETTY_FUNCTION__)); \
if (bl.get_off() < struct_end) \
bl += struct_end - bl.get_off(); \
}
使用
- 定义类中的encode/decode方法
- 类外: WRITE_CLASS_ENCODER(rgw_data_change),声明针对此类的encode、decode方法
encode:
rgw_data_change instance;
bufferlist bl;
encode(instance,bl);
decode:
bufferlist bl;
rgw_data_change instance;
read(bl);
auto iter = bl.cbegin();
decode(instance, iter);
网友评论