美文网首页
实现自己的数据库四

实现自己的数据库四

作者: 明翼 | 来源:发表于2023-01-29 22:37 被阅读0次

一前言

上一篇已经说明了B+树的一些原理,也讲到,我们目前采用的持久化数据的方式,而且我们是单独的插入数据,没有任何元数据信息,虽然插入的速度很快,因为是采用追加的方式。但是这种方式插入速度很快,像上次所说,查询和删除的速度会很慢。

数据结构性能对比图

我们以前用的就是非排序数组行,保存的是数据,没有其他信息。插入性能最好,但是删除和查找时间复杂度为O(n),排序数组查找很快,可以采用二分法查找,时间复杂度为O(log(n)),但是插入和删除时间复杂度为O(n),而采用B+树方式,且保存了元数据和主键的情况下,无论是查找、插入还是删除,性能都达到了均衡。

二 改造

2.1 元数据信息

采用树的方式来保存数据库的数据时候,就不能是简单的只记录原始信息,还需要记录诸如子节点指针信息,
为了方便遍历到兄弟节点,还需要保存这指向父节点的指针信息(这里面的指针类似c语言的指针,在磁盘保存时候,要看具体的实现,可能是个页号)。


拆分根节点

同样我们为了区分子节点和叶子节点,需要保存节点的类别,以及是否为root节点这些信息。

用这种排序的树保存数据,还有个好处,就是遍历比较方便。

节点类型

typedef enum { NODE_INTERNAL, NODE_LEAF } NodeType;

root节点元数据

/*
 * Common Node Header Layout
 */
// 节点类型数据的大小,其实只有一个bit就可以区分叶子节点和根节点,这里面浪费了点
const uint32_t NODE_TYPE_SIZE = sizeof(uint8_t);
// 节点类型的偏移,放在页节点的开头
const uint32_t NODE_TYPE_OFFSET = 0;
// 是否为root的元数据大小
const uint32_t IS_ROOT_SIZE = sizeof(uint8_t);
// 是否为root的元数据的偏移量
const uint32_t IS_ROOT_OFFSET = NODE_TYPE_SIZE;
// 指向父指针的指针大小
const uint32_t PARENT_POINTER_SIZE = sizeof(uint32_t);
// 指向父指针的偏移量
const uint32_t PARENT_POINTER_OFFSET = IS_ROOT_OFFSET + IS_ROOT_SIZE;
// 整个Node节点的元数据整体尺寸
const uint8_t COMMON_NODE_HEADER_SIZE =
  NODE_TYPE_SIZE + IS_ROOT_SIZE + PARENT_POINTER_SIZE;

除了root节点外,就是叶子节点,叶子节点保存完整的数据信息,所以和root节点的内容有所不同。

/*
 * Leaf Node Header Layout
 */
// 叶子节点保存的cell数量,一个cell由key和value组成 可以看作一个key后面跟着持久化的行
const uint32_t LEAF_NODE_NUM_CELLS_SIZE = sizeof(uint32_t);
// 叶子节点的cell数量的偏移量
const uint32_t LEAF_NODE_NUM_CELLS_OFFSET = COMMON_NODE_HEADER_SIZE;
// 叶子节点的元数据大小
const uint32_t LEAF_NODE_HEADER_SIZE =
  COMMON_NODE_HEADER_SIZE + LEAF_NODE_NUM_CELLS_SIZE;

叶子节点保存的cell数量,一个cell由key和value组成 可以看作一个key后面跟着持久化的行。


叶子节点的结构

示意图很清楚说明了第一个字节是node_type,接着一个字节是is_root,后面四个字节是父节点的指针,再下面4个字节为cell的数量,这里面有个错误就是写了两个,剩下内容就是cell,即key+value,都是这样部署的,Node节点不够存cell的就浪费了。

访问叶子节点方法

// 叶子节点中cell数量地址获取
uint32_t* leaf_node_num_cells(void* node) {
return node + LEAF_NODE_NUM_CELLS_OFFSET;
}
// 叶子节点上第cell_num个cell的偏移量的地址
void* leaf_node_cell(void* node, uint32_t cell_num) {
 return node + LEAF_NODE_HEADER_SIZE + cell_num * LEAF_NODE_CELL_SIZE;
}
// 叶子节点上第cell_num个key的偏移量,因为cell的前面放的是key
uint32_t* leaf_node_key(void* node, uint32_t cell_num) {
  return leaf_node_cell(node, cell_num);
}
// 叶子节点上第cell_num个cell的value地址获取
void* leaf_node_value(void* node, uint32_t cell_num) {
 return leaf_node_cell(node, cell_num) + LEAF_NODE_KEY_SIZE;
}
// 初始化一个节点,将cell_num设置为0
void initialize_leaf_node(void* node) { *leaf_node_num_cells(node) = 0; }

2.2 Table和Pager的改动

首先整个设计考虑简单点,不支持部分页面的读取,每次读取是读取整个页面。

 const uint32_t PAGE_SIZE = 4096;
 const uint32_t TABLE_MAX_PAGES = 100;
 
 
 typedef struct {
   int file_descriptor;
   uint32_t file_length;
+  uint32_t num_pages;
   void* pages[TABLE_MAX_PAGES];
 } Pager;
 
 typedef struct {
   Pager* pager;
-  uint32_t num_rows;
+  uint32_t root_page_num;
 } Table;

在新的定义中,我们固定了每个表的最大行数。另外我们将page的数量保存在Pager中。在Table中保存根页面的page_num,这样我们就可以通过表方便找到root页面了。

下面是一些关键行数的修改:

void* get_page(Pager* pager, uint32_t page_num) {
     pager->pages[page_num] = page;
    if (page_num >= pager->num_pages) {
     pager->num_pages = page_num + 1;
    }
   return pager->pages[page_num];
  }

///////////////////////////////////////////
Pager* pager_open(const char* filename) {
   Pager* pager = malloc(sizeof(Pager));
   pager->file_descriptor = fd;
   pager->file_length = file_length;
  pager->num_pages = (file_length / PAGE_SIZE);
 
  if (file_length % PAGE_SIZE != 0) {
     printf("Db file is not a whole number of pages. Corrupt file.\n");
    exit(EXIT_FAILURE);
   }
 

2.3 游标的更改

游标定位数据,以前通过行来定位,现在通过页面号和cell号来定位数据。

 typedef struct {
   Table* table;
-  uint32_t row_num;
+  uint32_t page_num;
+  uint32_t cell_num;
   bool end_of_table;  // Indicates a position one past the last element
 } Cursor;

游标创建:

 Cursor* table_start(Table* table) {
   Cursor* cursor = malloc(sizeof(Cursor));
   cursor->table = table;
-  cursor->row_num = 0;
-  cursor->end_of_table = (table->num_rows == 0);
+  cursor->page_num = table->root_page_num;
+  cursor->cell_num = 0;
+
+  void* root_node = get_page(table->pager, table->root_page_num);
+  uint32_t num_cells = *leaf_node_num_cells(root_node);
+  cursor->end_of_table = (num_cells == 0);
 
   return cursor;
 }

表开始的游标定位,游标的页面为表的根页面编号,通过leaf_node_num_cells行数获取root节点中cell的数量。

 Cursor* table_end(Table* table) {
   Cursor* cursor = malloc(sizeof(Cursor));
   cursor->table = table;
-  cursor->row_num = table->num_rows;
+  cursor->page_num = table->root_page_num;
+
+  void* root_node = get_page(table->pager, table->root_page_num);
+  uint32_t num_cells = *leaf_node_num_cells(root_node);
+  cursor->cell_num = num_cells;
   cursor->end_of_table = true;
 
   return cursor;
 }

这个是表的结束页的游标,设置end_of_table的标志。

 void* cursor_value(Cursor* cursor) {
-  uint32_t row_num = cursor->row_num;
-  uint32_t page_num = row_num / ROWS_PER_PAGE;
+  uint32_t page_num = cursor->page_num;
   void* page = get_page(cursor->table->pager, page_num);
-  uint32_t row_offset = row_num % ROWS_PER_PAGE;
-  uint32_t byte_offset = row_offset * ROW_SIZE;
-  return page + byte_offset;
+  return leaf_node_value(page, cursor->cell_num);
 }

获取游标处的值,获取cell_num的value,由于每个cell的大小一样,所以也是类似的取值方法。
游标的递增:

 void cursor_advance(Cursor* cursor) {
-  cursor->row_num += 1;
-  if (cursor->row_num >= cursor->table->num_rows) {
+  uint32_t page_num = cursor->page_num;
+  void* node = get_page(cursor->table->pager, page_num);
+
+  cursor->cell_num += 1;
+  if (cursor->cell_num >= (*leaf_node_num_cells(node))) {
     cursor->end_of_table = true;
   }
 }

这里面每次递增,只增加cell的数量,也许你会说cell递增到最后一个cell怎么办,其实增加到最后到最后一个cell后,设置了表结束的标志,循环退出了。

数据库打开
打开数据库,如果是一个新的数据库,初始化一个页面作为叶子节点。

Table* db_open(const char* filename) {
   Pager* pager = pager_open(filename);
   Table* table = malloc(sizeof(Table));
   table->pager = pager;
  table->root_page_num = 0;

  if (pager->num_pages == 0) {
    // New database file. Initialize page 0 as leaf node.
    void* root_node = get_page(pager, 0);
    initialize_leaf_node(root_node);
  }
 
   return table;
 }

下面是个关键行数,先叶子节点插入数据:

void leaf_node_insert(Cursor* cursor, uint32_t key, Row* value) {
 void* node = get_page(cursor->table->pager, cursor->page_num);

  uint32_t num_cells = *leaf_node_num_cells(node);
 
  if (num_cells >= LEAF_NODE_MAX_CELLS) {
    // 节点满了
    printf("Need to implement splitting a leaf node.\n");
   exit(EXIT_FAILURE);
  }

  if (cursor->cell_num < num_cells) {
    // 为一个cell腾出位置位置
  for (uint32_t i = num_cells; i > cursor->cell_num; i--) {
     memcpy(leaf_node_cell(node, i), leaf_node_cell(node, i - 1),
            LEAF_NODE_CELL_SIZE);
    }
  }
// 增加cell_num,设置key和持久化row。
  *(leaf_node_num_cells(node)) += 1;
  *(leaf_node_key(node, cursor->cell_num)) = key;
  serialize_row(value, leaf_node_value(node, cursor->cell_num));
}

这个函数假设树只有一个页面,目前版本先不支持多个页面。
插入操作:

 ExecuteResult execute_insert(Statement* statement, Table* table) {
  void* node = get_page(table->pager, table->root_page_num);
  if ((*leaf_node_num_cells(node) >= LEAF_NODE_MAX_CELLS)) {
     return EXECUTE_TABLE_FULL;
   }
 
   Row* row_to_insert = &(statement->row_to_insert);
   Cursor* cursor = table_end(table);
  leaf_node_insert(cursor, row_to_insert->id, row_to_insert);
   free(cursor);
}

三 打印命令

打印meta信息即元数据信息。

void print_constants() {
 printf("ROW_SIZE: %d\n", ROW_SIZE);
  printf("COMMON_NODE_HEADER_SIZE: %d\n", COMMON_NODE_HEADER_SIZE);
  printf("LEAF_NODE_HEADER_SIZE: %d\n", LEAF_NODE_HEADER_SIZE);
  printf("LEAF_NODE_CELL_SIZE: %d\n", LEAF_NODE_CELL_SIZE);
  printf("LEAF_NODE_SPACE_FOR_CELLS: %d\n", LEAF_NODE_SPACE_FOR_CELLS);
  printf("LEAF_NODE_MAX_CELLS: %d\n", LEAF_NODE_MAX_CELLS);
}

MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
   if (strcmp(input_buffer->buffer, ".exit") == 0) {
     db_close(table);
     exit(EXIT_SUCCESS);
  } else if (strcmp(input_buffer->buffer, ".constants") == 0) {
      printf("Constants:\n");
      print_constants();
     return META_COMMAND_SUCCESS;
   } else {
     return META_COMMAND_UNRECOGNIZED_COMMAND;
   }

没啥需要特别说明的,只是支持一个打印常量元数据的命令。

四 树的可视化

为了帮助调试,增加打印树的功能:

void print_leaf_node(void* node) {
  uint32_t num_cells = *leaf_node_num_cells(node);
  printf("leaf (size %d)\n", num_cells);
  for (uint32_t i = 0; i < num_cells; i++) {
    uint32_t key = *leaf_node_key(node, i);
   printf("  - %d : %d\n", i, key);
  }
}

遍历节点,打印cell信息。增加meta命令:

MetaCommandResult do_meta_command(InputBuffer* input_buffer, Table* table) {
   if (strcmp(input_buffer->buffer, ".exit") == 0) {
     db_close(table);
     exit(EXIT_SUCCESS);
  } else if (strcmp(input_buffer->buffer, ".btree") == 0) {
    printf("Tree:\n");
    print_leaf_node(get_page(table->pager, 0));
   return META_COMMAND_SUCCESS;
   } else if (strcmp(input_buffer->buffer, ".constants") == 0) {
     printf("Constants:\n");
     print_constants();
     return META_COMMAND_SUCCESS;
   } else {
     return META_COMMAND_UNRECOGNIZED_COMMAND;
   }

这次最大的改动是文件的存储结构改变,改成B+树方式,但是我们并没有对文件里面的cell按照key进行排序,而且我们只支持一个页面,不过仍然是重大的进步,慢慢来吧。

相关文章

  • 实现自己的数据库四

    一前言 上一篇已经说明了B+树的一些原理,也讲到,我们目前采用的持久化数据的方式,而且我们是单独的插入数据,没有任...

  • JDBC和XML

    JDBC 访问数据库的规范,实现由数据库厂商实现,具体实现对应数据库驱动。 注册驱动 Class.forName(...

  • jdbc基础

    JDBC概念 java连接数据库的一系列接口,接口的实现类交给第三方数据库厂商去实现 四大参数 驱动 com.m...

  • redis(1)简单动态字符串

    本节开始,我们来一起了解一下redis的原理,分为四个部分,数据结构与对象,单机数据库实现,多机数据库实现,独立功...

  • 数据库引擎

    Innodb引擎 Innodb引擎提供了对数据库ACID事务的支持,并且实现了SQL标准的四种隔离级别,关于数据库...

  • 面试合集:数据库+数据结构+JVM+网络+JAVA+分布式+操作

    第一个模块:数据库 1.1 腾讯数据库面试问题 解释ACID四大特性 原子性的底层实现 数据库宕机后恢复的过程 如...

  • AskMe项目数据库建立

    建立数据库 使用的是可视化的Mysql工具Navicat,实现数据库AskMe,四个表的建立,分别为user co...

  • 分布式锁入门

    目前主流的有三种: 基于数据库实现 基于Redis实现 基于ZooKeeper实现 1. 基于数据库实现: 基于数...

  • 实现自己的数据库一

    一 前言 从上篇原创文章到现在又是新的一年,今天是2023年的大年初三,先在这里祝各位亲爱的老铁们新年快乐,身体健...

  • 实现自己的数据库三

    一 前言 上篇实现了数据库的持久化,就是一个质的飞跃,虽然代码不复杂,但是对没有这方面经验者来说,还是意思的,下一...

网友评论

      本文标题:实现自己的数据库四

      本文链接:https://www.haomeiwen.com/subject/equphdtx.html