美文网首页
Bufferlist::list设计与实现

Bufferlist::list设计与实现

作者: 圣地亚哥_SVIP | 来源:发表于2020-06-17 23:15 被阅读0次

buffer::list,结构如下:

list bufferlist_arch

成员变量:

  • buffers_t _buffers: ptr循环链表
  • ptr* _carriage: 指向list中最后一个可修改的ptr指针
  • unsigned _len, _num: _len list数据长度,_num list包含的ptr个数
  • static ptr always_empty_bptr: 初始化_carriage

buffer::list迭代器:

class iterator_impl
class iterator: iterator_impl

iterator_impl的成员变量

typedef typename std::conditional<is_const,const list,list>::type bl_t;
typedef typename std::conditional<is_const,const buffers_t,buffers_t >::type list_t;
typedef typename std::conditional<is_const,typename buffers_t::const_iterator,typename buffers_t::iterator>::type list_iter_t;
  • bl_t* bl; 指向list本身
  • list_t* ls; 指向buffers_t,存储ptr的链表
  • list_iter_t p; buffers_t的迭代器
  • unsigned off; 相对于list.begin()的绝对偏移量,例如调用seek(o): off+=o;p_off更新为当前ptr内部的偏移量
  • unsigned p_off; 在当前ptr中的偏移量,如此iterator指向ptr,则实际数据位置位于:_raw[ptr->_off+p_off]

iterator_impl中提供了copy的接口函数,主要是用于decode,将list中的数据拷贝出来。

template<bool is_const>
void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)

代码实现:

  • seek(off): 定位iterator的数据位置,off是绝对偏移量,p指向对应的ptr,p_off在此ptr中的偏移量
  • 数据读取,p->lenth()-p_off,ptr中包含的数据
  • p->copy_out,调用ptr copy_out方法,读取数据
  • *this += howmuch,定位下一次需要读取的数据位置
if (p == ls->end()) seek(off);
while (len > 0) {
  if (p == ls->end())
    throw end_of_buffer();

  unsigned howmuch = p->length() - p_off;
  if (len < howmuch) howmuch = len;
  p->copy_out(p_off, howmuch, dest);
  dest += howmuch;

  len -= howmuch;
  *this += howmuch;
}

list的接口函数:

向list中存储长度为len的数据

  1. free_in_last: 获取list中剩余空间,实际即是_carriage的剩余空间;get_append_buffer_unused_tail_length: _carriage->unused_tail_length()
  2. first_round: min(len, free_in_last),先将数据存储在剩余空间
  3. 判断_carriage是不是list中的最后一个元素,(1. 非通过list接口存储过ptr*,(2. _buffers被std::move(),或是claim_append(),则_carriage!=_buffers.back());
    _carriage != &_buffers.back():
- 创建一个新的ptr,ptr指向的raw与_carriage是同一个raw
- 更新_carriage为新的ptr
- _buffers存储新的ptr
- _num自增
auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
_carriage = bptr.get();
_buffers.push_back(*bptr.release());
_num += 1;
  1. _carriage存储第一部分数据
  2. 计算剩余待存储的数据
  3. 创建一个新的ptr=new_back,指向一个新建的raw缓存,更新_carriage = new_back, list::push_back(new_back)
  4. new_back.append()存储剩余的数据
auto& new_back = refill_append_space(second_round);
new_back.append(data + first_round, second_round);

auto refill_append_space(len):
  {
    /*
    ** 数据对齐
    */
    size_t need = round_up_to(len, sizeof(size_t)) + sizeof(raw_combined);
    size_t alen = round_up_to(need, CEPH_BUFFER_ALLOC_UNIT) - sizeof(raw_combined);
    auto new_back = ptr_node::create(raw_combined::create(alen, 0, get_mempool()));
    new_back->set_length(0);   // unused, so far.
    _carriage = new_back.get();
    _buffers.push_back(*new_back.release());
    _num += 1;
    return _buffers.back();
  }

encode接口:append的实现代码如下:

void buffer::list::append(const char *data, unsigned len)
{
  _len += len;

  const unsigned free_in_last = get_append_buffer_unused_tail_length();
  const unsigned first_round = std::min(len, free_in_last);
  if (first_round) {
    // _buffers and carriage can desynchronize when 1) a new ptr
    // we don't own has been added into the _buffers 2) _buffers
    // has been emptied as as a result of std::move or stolen by
    // claim_append.
    if (unlikely(_carriage != &_buffers.back())) {
      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
      _carriage = bptr.get();
      _buffers.push_back(*bptr.release());
      _num += 1;
    }
    _carriage->append(data, first_round);
  }

  const unsigned second_round = len - first_round;
  if (second_round) {
    auto& new_back = refill_append_space(second_round);
    new_back.append(data + first_round, second_round);
  }
}

decode的接口: list::iterator_impl<is_const>::copy(unsigned len, char *dest):
bufferlist中的数据拷贝至dest,数据长度为len:

  1. 定位至数据开始位置
  2. 循环读取迭代器中的数据
template<bool is_const>
void buffer::list::iterator_impl<is_const>::copy(unsigned len, char *dest)
{
  if (p == ls->end()) seek(off);
  while (len > 0) {
    if (p == ls->end())
throw end_of_buffer();

    unsigned howmuch = p->length() - p_off;
    if (len < howmuch) howmuch = len;
    p->copy_out(p_off, howmuch, dest);
    dest += howmuch;

    len -= howmuch;
    *this += howmuch;
  }
}

**部分数据结构注释: **
以下数据结构用于获取相邻的可写空间:

struct reserve_t {
  char* bp_data;    // ptr数据结尾,剩余空间头
  unsigned* bp_len; // ptr指针数据长度
  unsigned* bl_len; // bufferlist的数据长度
};

class contiguous_appender {
  ceph::bufferlist& bl;
  ceph::bufferlist::reserve_t space;
  char* pos;
  bool deep;

  /// running count of bytes appended that are not reflected by @pos
  size_t out_of_band_offset = 0;
  contiguous_appender(bufferlist& bl, size_t len, bool d)
    : bl(bl),
      space(bl.obtain_contiguous_space(len)),
      pos(space.bp_data),
      deep(d) {
  }
}

/*
** 获取相邻可写空间的接口
** 1. 判断当前_carriage剩余空间是否满足需求
** 1.1 不满足,新建一个raw缓冲区,更新_carriage
** 1.2 满足,利用现有的_carriage缓冲区
** 返回 reserve_t,指向对应的缓冲区
*/
reserve_t obtain_contiguous_space(const unsigned len)
{
  // note: if len < the normal append_buffer size it *might*
  // be better to allocate a normal-sized append_buffer and
  // use part of it.  however, that optimizes for the case of
  // old-style types including new-style types.  and in most
  // such cases, this won't be the very first thing encoded to
  // the list, so append_buffer will already be allocated.
  // OTOH if everything is new-style, we *should* allocate
  // only what we need and conserve memory.
  if (unlikely(get_append_buffer_unused_tail_length() < len)) {
    auto new_back = buffer::ptr_node::create(buffer::create(len)).release();
    new_back->set_length(0);   // unused, so far.
    _buffers.push_back(*new_back);
    _num += 1;
    _carriage = new_back;
    return { new_back->c_str(), &new_back->_len, &_len };
  } else {
    if (unlikely(_carriage != &_buffers.back())) {
      auto bptr = ptr_node::create(*_carriage, _carriage->length(), 0);
      _carriage = bptr.get();
      _buffers.push_back(*bptr.release());
      _num += 1;
    }
    return { _carriage->end_c_str(), &_carriage->_len, &_len };
  }
}

class contiguous_filler: 用于encode时,为元数据预留空间

buffers_t的数据结构如下:

class buffers_t {
  /*
  ** _root用于连接ptr的头尾指针,本身无内容
  */
  ptr_hook _root;
  ptr_hook* _tail;

public:
  /*
  ** buffers_t的迭代器
  */
  template <class T>
  class buffers_iterator {
    public:
      using value_type = T;
      using reference = typename std::add_lvalue_reference<T>::type;
      using pointer = typename std::add_pointer<T>::type;
      using difference_type = std::ptrdiff_t;
      using iterator_category = std::forward_iterator_tag;
  };

  typedef buffers_iterator<const ptr_node> const_iterator;
  typedef buffers_iterator<ptr_node> iterator;

  typedef const ptr_node& const_reference;
  typedef ptr_node& reference;
};

encode、decode方法

前述内容中介绍了raw/ptr/list的定义及数据结构,encoding.h中定义了对不同类型变量的encode及decode的方法。
如下定义了char float double bool 等类型的encode/decode接口,Bool类型以一字节存储.

template<class T>
inline void encode_raw(const T& t, bufferlist& bl)
{
  bl.append((char*)&t, sizeof(t));
}
template<class T>
inline void decode_raw(T& t, bufferlist::const_iterator &p)
{
  p.copy(sizeof(t), (char*)&t);
}

#define WRITE_RAW_ENCODER(type)           \
  inline void encode(const type &v, ::ceph::bufferlist& bl, uint64_t features=0) { ::ceph::encode_raw(v, bl); } \
  inline void decode(type &v, ::ceph::bufferlist::const_iterator& p) { ::ceph::decode_raw(v, p); }

WRITE_RAW_ENCODER(char)
WRITE_RAW_ENCODER(float)
WRITE_RAW_ENCODER(double)

inline void encode(const bool &v, bufferlist& bl) {
  __u8 vv = v;
  encode_raw(vv, bl);
}
inline void decode(bool &v, bufferlist::const_iterator& p) {
  __u8 vv;
  decode_raw(vv, p);
  v = vv;
}

针对自定义类型,实际是调用类中自定义的encode方法:

#define WRITE_CLASS_ENCODER(cl)           \
  inline void encode(const cl& c, ::ceph::buffer::list &bl, uint64_t features=0) { \
    ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); }    \
  inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }

#define WRITE_CLASS_MEMBER_ENCODER(cl)          \
  inline void encode(const cl &c, ::ceph::bufferlist &bl) const { \
    ENCODE_DUMP_PRE(); c.encode(bl); ENCODE_DUMP_POST(cl); }    \
  inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }

#define WRITE_CLASS_ENCODER_FEATURES(cl)        \
  inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features) { \
    ENCODE_DUMP_PRE(); c.encode(bl, features); ENCODE_DUMP_POST(cl); }  \
  inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }

#define WRITE_CLASS_ENCODER_OPTIONAL_FEATURES(cl)       \
  inline void encode(const cl &c, ::ceph::bufferlist &bl, uint64_t features = 0) { \
    ENCODE_DUMP_PRE(); c.encode(bl, features); ENCODE_DUMP_POST(cl); }  \
  inline void decode(cl &c, ::ceph::bufferlist::const_iterator &p) { c.decode(p); }

以自定义类型示例:

  • 定义encode方法
  • 定义decode方法
  • 调用WRITE_CLASS_ENCODER(rgw_data_change),生成关于此自定义类型的encode/decode方法
struct rgw_data_change {
  DataLogEntityType entity_type;
  string key;
  real_time timestamp;

  void encode(bufferlist& bl) const {
    ENCODE_START(1, 1, bl);
    uint8_t t = (uint8_t)entity_type;
    encode(t, bl);
    encode(key, bl);
    encode(timestamp, bl);
    ENCODE_FINISH(bl);
  }

  void decode(bufferlist::const_iterator& bl) {
     DECODE_START(1, bl);
     uint8_t t;
     decode(t, bl);
     entity_type = (DataLogEntityType)t;
     decode(key, bl);
     decode(timestamp, bl);
     DECODE_FINISH(bl);
  }

  void dump(Formatter *f) const;
  void decode_json(JSONObj *obj);
};
WRITE_CLASS_ENCODER(rgw_data_change)

ENCODE_START(v, compat, bl)

  1. 计算元数据大小
  2. 在list中为元数据预留空间
  3. 预留空间的起始位置即为filler
/*
** 开始encoding模块
**
** @param v 当前encoding版本
** @param 能够decode的最老版本
** @param bl bufferlist
** Feature:
**  bufferlist中预留空间用于存储v,compat,及数据长度
**  filler: list::contiguous_filler,包含一个char* pos,指向预留空间的头
**
*/
#define ENCODE_START(v, compat, bl)          \
  __u8 struct_v = v;                                         \
  __u8 struct_compat = compat;                         \
  ceph_le32 struct_len;                    \
  auto filler = (bl).append_hole(sizeof(struct_v) + sizeof(struct_compat) + sizeof(struct_len));       \
  const auto starting_bl_len = (bl).length();        \
  using ::ceph::encode;              \
  do {

ENCODE_FINISH(bl)
encode收尾工作:

  • 将struct_v、struct_compat、struct_len数据写入预留空间
/**
 * finish encoding block
 *
 * @param bl bufferlist we were encoding to
 * @param new_struct_compat struct-compat value to use
 */
#define ENCODE_FINISH_NEW_COMPAT(bl, new_struct_compat)      \
  } while (false);                                           \
  if (new_struct_compat) {                                   \
    struct_compat = new_struct_compat;                       \
  }                                                          \
  struct_len = (bl).length() - starting_bl_len;              \
  filler.copy_in(sizeof(struct_v), (char *)&struct_v);       \
  filler.copy_in(sizeof(struct_compat), (char *)&struct_compat);             \
  filler.copy_in(sizeof(struct_len), (char *)&struct_len);

#define ENCODE_FINISH(bl) ENCODE_FINISH_NEW_COMPAT(bl, 0)

decode(v,bl)

  • decode出struct_compat,判断当前版本能否decode,v>=struct_compat说明可以decode
  • 解析其余元数据,如struct_v、struct_len等数据
  • 依次decode其余数据
/*
** start a decoding block
**
** @param v current version of the encoding that the code supports/encodes
** @param bl bufferlist::iterator for the encoded data
*/
#define DECODE_START(v, bl)           \
  __u8 struct_v, struct_compat;           \
  using ::ceph::decode;             \
  decode(struct_v, bl);           \
  decode(struct_compat, bl);            \
  if (v < struct_compat)            \
    throw ::ceph::buffer::malformed_input(DECODE_ERR_OLDVERSION(__PRETTY_FUNCTION__, v, struct_compat)); \
  __u32 struct_len;             \
  decode(struct_len, bl);           \
  if (struct_len > bl.get_remaining())          \
    throw ::ceph::buffer::malformed_input(DECODE_ERR_PAST(__PRETTY_FUNCTION__)); \
  unsigned struct_end = bl.get_off() + struct_len;      \
  do {

DECODE_FINISH(bl)

  • 对数据长度等做校验
  • bl.get_off: return off;返回绝对偏移量
  • struct_end: 根据struct_len计算出的数据结尾的绝对偏移量
  • 如果off>struct_end: 说明读取了不该读取的内容
/**
 * finish decode block
 *
 * @param bl bufferlist::iterator we were decoding from
 */
#define DECODE_FINISH(bl)           \
  } while (false);              \
  if (struct_end) {             \
    if (bl.get_off() > struct_end)          \
      throw ::ceph::buffer::malformed_input(DECODE_ERR_PAST(__PRETTY_FUNCTION__)); \
    if (bl.get_off() < struct_end)          \
      bl += struct_end - bl.get_off();          \
  }

使用

  • 定义类中的encode/decode方法
  • 类外: WRITE_CLASS_ENCODER(rgw_data_change),声明针对此类的encode、decode方法
    encode:
rgw_data_change instance;
bufferlist bl;
encode(instance,bl);

decode:

bufferlist bl;
rgw_data_change instance;
read(bl);
auto iter = bl.cbegin();
decode(instance, iter);

相关文章

网友评论

      本文标题:Bufferlist::list设计与实现

      本文链接:https://www.haomeiwen.com/subject/cioexktx.html