美文网首页
Ceph Bufferlist: buffer::raw

Ceph Bufferlist: buffer::raw

作者: 圣地亚哥_SVIP | 来源:发表于2020-05-18 14:09 被阅读0次

    Bufferlist的设计与实现

    Buffer::list是Ceph中用于序列化的工具。将所有数据结构能够转换成二进制的数据,用于存储及网络传输。每一个数据结构需要提供encode及decode方法。

    Bufferlist,主要三个结构体:

    • buffer::raw
    • buffer::ptr
    • buffer::list

    自定义智能指针:

    • 自定义智能指针的析构函数,此处智能指针不释放资源
    • unique_leakable_ptr只涉及控制权的管理,仍需要手动(delete)释放对应资源
    template <class T>
    struct nop_delete {
      void operator()(T*) {}
    };
    template <class T>
    struct unique_leakable_ptr : public std::unique_ptr<T, ceph::nop_delete<T>> {
      using std::unique_ptr<T, ceph::nop_delete<T>>::unique_ptr;
    };
    

    三个数据结构体之间的关系如下图:

    buffer::raw数据结构如下,其中mempool用于跟踪容器使用的内存大小,从raw中剔除出去已更清晰的查看raw的数据结构:

    class raw {
    public:
        /*
        * 大小为sizeof(ptr_node),alignof(ptr_node)对齐的类型。用于ptr_node的构造(new placement方式),实际并未使用
        */
        std::aligned_storage<sizeof(ptr_node),alignof(ptr_node)>::type bptr_storage;
        /*
        * buffer
        */
        char *data;
        unsigned len;
        /*
        * 引用计数
        */
        ceph::atomic<unsigned> nref { 0 };
    
        /*
        * 指定数据段的校验值
        */
        std::pair<size_t, size_t> last_crc_offset {std::numeric_limits<size_t>::max(), std::numeric_limits<size_t>::max()};
        std::pair<uint32_t, uint32_t> last_crc_val;
    
        /*
        * spinlock: std::atomic_flag(原子布尔类型),实现自旋互斥,免锁结构
        * class spinlock{
            std::atomoc_flag af = ATOMIC_FLAG_INIT;
            void lock(){
                while(af.test_and_set(std::memory_order_acquire));
            }
            void unlock(){
                af.clear(std::memory_order_release);
            }
        };
        */
        mutable ceph::spinlock crc_spinlock;
    
        /*
        * raw构造函数,raw是作为基类,实际数据分配方式是由子类实现
        */
        explicit raw(unsigned l)
          : data(nullptr), len(l), nref(0){
        }
        raw(char *c, unsigned l)
          : data(c), len(l), nref(0) {
        }
        virtual ~raw() {
        }
    
        void _set_len(unsigned l) {
          len = l;
        }
    
    private:
        /*
        * 禁止拷贝复制
        */
        raw(const raw &other) = delete;
        const raw& operator=(const raw &other) = delete;
    public:
        /*
        * 获取原始数据
        */
        char *get_data() {
          return data;
        }
    
        /*
        * clone data,具体实现由子类定义
        */
        virtual raw* clone_empty() = 0;
    
        /*
        * 数据拷贝
        */
        ceph::unique_leakable_ptr<raw> clone() {
          raw* const c = clone_empty();
          memcpy(c->data, data, len);
          return ceph::unique_leakable_ptr<raw>(c);
        }
        /*
        * crc相关设置
        */
        bool get_crc(const std::pair<size_t, size_t> &fromto,std::pair<uint32_t, uint32_t> *crc) const {
          std::lock_guard lg(crc_spinlock);
          if (last_crc_offset == fromto) {
            *crc = last_crc_val;
            return true;
          }
          return false;
        }
        void set_crc(const std::pair<size_t, size_t> &fromto,const std::pair<uint32_t, uint32_t> &crc) {
          std::lock_guard lg(crc_spinlock);
          last_crc_offset = fromto;
          last_crc_val = crc;
        }
        void invalidate_crc() {
          std::lock_guard lg(crc_spinlock);
          last_crc_offset.first = std::numeric_limits<size_t>::max();
          last_crc_offset.second = std::numeric_limits<size_t>::max();
        }
      };
    

    raw不同buffer分配方式的子类:

    • class raw_malloc;
    • class raw_static;
    • class raw_posix_aligned;
    • class raw_hack_aligned;
    • class raw_char;
    • class raw_claimed_char;
    • class raw_unshareable;
    • class raw_combined;
    • class raw_claim_buffer;

    raw_combined的定义如下:

    • raw_combined的object与data buffer分配在一个buffer上,data处于buffer的开头,object为buffer尾
    • buffer大小以alignment对齐
    class buffer::raw_combined : public buffer::raw {
      /*
      * 对齐大小
      */
      size_t alignment;
    public:
      /*
      * 构造函数,dataptr指向已分配好的buffer
      */
      raw_combined(char *dataptr, unsigned l, unsigned align)
        :raw(dataptr, l),
         alignment(align) {}
    
      /*
      * 定义本类的clone的方法,创建一个新的buffer,调用release()返回裸指针
      */
      raw* clone_empty() override {
        return create(len, alignment).release();
      }
    
      /*
      * 入口静态函数:
        1. 使用posix_memalign,创建align对齐的buffer,buffer大小为:data+self(raw_combined)
        2. 在新分配的buffer上,构造raw_combined对象,使用operator new (buf) class
        3. operator delete,定义本类的delete函数,在使用delete {*raw_combined}时,调用此函数
      */
      static ceph::unique_leakable_ptr<buffer::raw> create(unsigned len, unsigned alig)
      {
        if (!align)
          align = sizeof(size_t);
        size_t rawlen = round_up_to(sizeof(buffer::raw_combined),alignof(buffer::raw_combined));
        size_t datalen = round_up_to(len, alignof(buffer::raw_combined));
    
        char *ptr = 0;
        int r = ::posix_memalign((void**)(void*)&ptr, align, rawlen + datalen);
        if (r)
         throw bad_alloc();
    
        return ceph::unique_leakable_ptr<buffer::raw>(new (ptr + datalen) raw_combined(ptr, len, align));
      }
    
      static void operator delete(void *ptr) {
        raw_combined *raw = (raw_combined *)ptr;
        ::free((void *)raw->data);
      }
    };
    

    raw_malloc定义如下:

    • 使用mollac进行buffer的分配
    • 无对齐要求
    class buffer::raw_malloc : public buffer::raw {
      public:
        /*
        * 构造函数,使用malloc buffer的分配
        */
        explicit raw_malloc(unsigned l) : raw(l) {
          if (len) {
            data = (char *)malloc(len);
            if (!data)
              throw bad_alloc();
          } else {
            data = 0;
          }
        }
        /*
        * 利用创建好的buffer初始化raw_malloc
        */
        raw_malloc(unsigned l, char *b) : raw(b, l) {
        }
        /*
        * 资源释放
        */
        ~raw_malloc() override {
          free(data);
        }
        raw* clone_empty() override {
          return new raw_malloc(len);
        }
    };
    

    raw_posix_aligned定义

    • 利用posix_memalign, 创建align对齐的data buffer
    • 与raw_combined的区别是,buffer就是data buffer,不包含raw_posix_aligned object

    实现如下:

    class buffer::raw_posix_aligned : public buffer::raw {
      /*
      * data buffer对齐大小
      */
      unsigned align;
    
    public:
      /*
      * 构造函数。利用posix_memalign,构造大小为len,以align对齐的data buffer
      */
      raw_posix_aligned(unsigned l, unsigned _align) : raw(l) {
        align = _align;
        assert((align >= sizeof(void *)) && (align & (align - 1)) == 0);
        int r = ::posix_memalign((void**)(void*)&data, align, len);
        if (r)
          throw bad_alloc();
      }
      /*
      * 释放data buffer
      */
      ~raw_posix_aligned() override {
        ::free(data);
      }
    
      /*
      * clone,大小与此data buffer相等,内容为空
      */
      raw* clone_empty() override {
        return new raw_posix_aligned(len, align);
      }
    };
    

    raw_static定义

    • data buffer使用static buffer
    • 无需对齐
    class buffer::raw_static : public buffer::raw {
      public:
    
        /*
        * 构造函数:d是static buffer
        * 析构函数:无需释放
        */
        raw_static(const char *d, unsigned l) : raw((char*)d, l) { }
        ~raw_static() override {}
        /*
        * clone 的buffer,利用new raw_char创建新的buffer
        */
        raw* clone_empty() override {
          return new buffer::raw_char(len);
        }
    };
    

    raw_char、raw_claimed_char、raw_claim_buffer定义

    • 创建char类型的buffer
    • 无需对齐
    • raw_char在析构函数中释放资源,因此只能接受new分配的buffer
    • raw_claimed_char:不负责释放资源,因此可以是局部变量。针对new创建的buffer,则需手动显示释放
    • raw_claim_buffer: 可以接收自定义删除器

    定义代码如下:

    class buffer::raw_char : public buffer::raw {
    public:
      /*
      * 构造函数:使用new 创建buffer
      */
      explicit raw_char(unsigned l) : raw(l) {
        if (len)
          data = new char[len];
        else
          data = 0;
      }
      /*
      * 传入已建好的buffer(char *b),作为此raw的buffer,此buffer需要是new创建的
      */
      raw_char(unsigned l, char *b) : raw(b, l) {
      }
    
      /*
      * 释放资源
      */
      ~raw_char() override {
        delete[] data;
      }
    
      /*
      * clone buffer,new新建
      */
      raw* clone_empty() override {
        return new raw_char(len);
      }
    };
    
    
    class buffer::raw_claimed_char : public buffer::raw {
    public:
    
      /*
      * 构造函数,传入buffer
      * 析构函数不负责释放资源,因此外部buffer是局部变量,或是new资源,则需手动释放
      */
      explicit raw_claimed_char(unsigned l, char *b) : raw(b, l) {
      }
      ~raw_claimed_char() override {
      }
      raw* clone_empty() override {
        return new raw_char(len);
      }
    };
    
    class buffer::raw_claim_buffer : public buffer::raw {
      deleter del;
     public:
      raw_claim_buffer(const char *b, unsigned l, deleter d)
          : raw((char*)b, l), del(std::move(d)) { }
      ~raw_claim_buffer() override {}
      raw* clone_empty() override {
        return new buffer::raw_char(len);
      }
    };
    

    RAW创建接口

    创建接口如下,剔除mempool相关的信息:

      ceph::unique_leakable_ptr<raw> copy(const char *c, unsigned len);
      ceph::unique_leakable_ptr<raw> create(unsigned len);
      ceph::unique_leakable_ptr<raw> create_in_mempool(unsigned len, int mempool);
      ceph::unique_leakable_ptr<raw> claim_char(unsigned len, char *buf);
      ceph::unique_leakable_ptr<raw> create_malloc(unsigned len);
      ceph::unique_leakable_ptr<raw> claim_malloc(unsigned len, char *buf);
      ceph::unique_leakable_ptr<raw> create_static(unsigned len, char *buf);
      ceph::unique_leakable_ptr<raw> create_aligned(unsigned len, unsigned align);
      ceph::unique_leakable_ptr<raw> create_aligned_in_mempool(unsigned len, unsigned align, int mempool);
      ceph::unique_leakable_ptr<raw> create_page_aligned(unsigned len);
      ceph::unique_leakable_ptr<raw> create_small_page_aligned(unsigned len);
      ceph::unique_leakable_ptr<raw> claim_buffer(unsigned len, char *buf, deleter del);
    
    • copy(const char *c, unsigned len):
    调用:
    auto r = create_aligned(len, sizeof(size_t));
    memcpy(r->data, c, len);
    return r;
    
    create(unsigned len);
        create_aligned(unsigned len, unsigned align); /* align = sizeof(size_t)*/
        create_in_mempool(unsigned len, int mempool)
        create_aligned(unsigned len, unsigned align)
    

    调用:create_aligned_in_mempool(unsigned len, unsigned align, int mempool):

    /*
    * 如果对齐align 是系统PAGE_SIZE的整数倍,或者分配的大小大于PAGE_SIZE的两倍,则使用raw_posix_aligned接口
    * 否则采用raw_combined
    */
    if ((align & ~CEPH_PAGE_MASK) == 0 ||
      len >= CEPH_PAGE_SIZE * 2) {
    #ifndef __CYGWIN__
          return ceph::unique_leakable_ptr<buffer::raw>(new raw_posix_aligned(len, align));
    #else
          return ceph::unique_leakable_ptr<buffer::raw>(new raw_hack_aligned(len, align));
    #endif
        }
        return raw_combined::create(len, align, mempool);
      }
    
    • claim_char(unsigned len, char *buf):
    : new raw_claimed_char(len, buf)
    
    • create_malloc(unsigned len):
    : new raw_malloc(len)
    
    • claim_malloc(unsigned len, char *buf):
    : new raw_malloc(len, buf)
    
    • create_static(unsigned len, char *buf):
    : new raw_static(buf, len)
    
    • create_page_aligned(unsigned len):
    : 已系统PAGE_SIZE默认对齐
    : create_aligned(len, CEPH_PAGE_SIZE)
    
    • create_small_page_aligned(unsigned len):
    : 当len < CEPH_PAGE_SIZE: create_aligned(len, CEPH_BUFFER_ALLOC_UNIT) /*CEPH_BUFFER_ALLOC_UNIT: 4096u, 4KB*/
    : > CEPH_PAGE_SIZE: create_aligned(len, CEPH_PAGE_SIZE)
    
    • claim_buffer(unsigned len, char *buf, deleter del):
    : new raw_claim_buffer(buf, len, std::move(del))
    

    相关文章

      网友评论

          本文标题:Ceph Bufferlist: buffer::raw

          本文链接:https://www.haomeiwen.com/subject/zduuohtx.html