美文网首页程序员C++
leveldb源码学习--log

leveldb源码学习--log

作者: icecity96 | 来源:发表于2017-02-16 21:49 被阅读0次

    所有的写操作在写入memtable之前都必须先成功写入log文件中,主要两点好处:

    1. 可以将随机的写IO变成append,极大的提高写磁盘速度;
    2. 防止在节点宕机导致内存数据丢失,造成数据丢失。

    格式

    The log file contents are a sequence of 32KB blocks.  The only
    exception is that the tail of the file may contain a partial block.
    
    Each block consists of a sequence of records:
       block := record* trailer?
       record :=
        checksum: uint32    // crc32c of type and data[] ; little-endian
        length: uint16      // little-endian
        type: uint8     // One of FULL, FIRST, MIDDLE, LAST
        data: uint8[length]
    
    A record never starts within the last six bytes of a block (since it
    won't fit).  Any leftover bytes here form the trailer, which must
    consist entirely of zero bytes and must be skipped by readers.
    

    也就是说,日志文件由连续的大小为32KB的block组成,block又由连续的record组成,record的格式为
    | CRC(4 byte) | Length(2 byte) | type(1 byte) | data |

    Type有四种类型

    • FULL: 说明该log record包含一个完整的user record;
    • FIRST,说明是user record的第一条log record
    • MIDDLE,说明是user record中间的log record
    • LAST,说明是user record最后的一条log record

    看文档上给出的一个例子:

    Example: consider a sequence of user records:
       A: length 1000
       B: length 97270
       C: length 8000
    A will be stored as a FULL record in the first block.
    
    B will be split into three fragments: first fragment occupies the rest
    of the first block, second fragment occupies the entirety of the
    second block, and the third fragment occupies a prefix of the third
    block.  This will leave six bytes free in the third block, which will
    be left empty as the trailer.
    
    C will be stored as a FULL record in the fourth block.
    
    示意图

    Note: 由于一条logrecord长度最短为7,如果一个block的剩余空间小于7byte,那么将被填充为空字符串,另外长度为7的logrecord是不包括任何用户数据的。

    log_format.h

    namespace leveldb {
    namespace log {
    enum RecordType {
      // Zero is reserved for preallocated files
      kZeroType = 0,
      kFullType = 1,
      // For fragments
      kFirstType = 2,
      kMiddleType = 3,
      kLastType = 4
    };
    static const int kMaxRecordType = kLastType;
    static const int kBlockSize = 32768;
    // Header is checksum (4 bytes), length (2 bytes), type (1 byte).
    static const int kHeaderSize = 4 + 2 + 1;
    }  // namespace log
    }  // namespace leveldb
    

    Writer

    先看下Writer类:

    class Writer {
     public:
      // Create a writer that will append data to "*dest".
      // "*dest" must be initially empty.
      // "*dest" must remain live while this Writer is in use.
      explicit Writer(WritableFile* dest);
    
      // Create a writer that will append data to "*dest".
      // "*dest" must have initial length "dest_length".
      // "*dest" must remain live while this Writer is in use.
      Writer(WritableFile* dest, uint64_t dest_length);
    
      ~Writer();
    
      Status AddRecord(const Slice& slice);
    
     private:
      WritableFile* dest_;
      int block_offset_;       // Current offset in block
    
      // crc32c values for all supported record types.  These are
      // pre-computed to reduce the overhead of computing the crc of the
      // record type stored in the header.
      uint32_t type_crc_[kMaxRecordType + 1];
    
      Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);
    
      // No copying allowed
      Writer(const Writer&);
      void operator=(const Writer&);
    };
    

    类的结构比较简单,公开借口只有一个传入Slice参数的AddRecord,由于RecordType是固定的几种,所以为了效率,类的成员type_crc_数组,这里存放的为RecordType预先计算的CRC32值。
    下面分析AddRecord的实现

    AddRecord

    1.首先取出Slice的字符串指针和长度,初始化begin=true,表明这是一条record的开始

    const char* ptr = slice.data();
    size_t left = slice.size();
    bool begin = true;
    

    2.然后进入一个do{}while循环,直到写入出错,或者成功写入全部数据

    • 检查当前block的大小是否小于kHeaderSize,如果小于则先将,剩余的部份补0,然后重制块位移
    const int leftover = kBlockSize - block_offset_;
        assert(leftover >= 0);
        if (leftover < kHeaderSize) {
          // Switch to a new block
          if (leftover > 0) {
            // Fill the trailer (literal below relies on kHeaderSize being 7)
            assert(kHeaderSize == 7);
            dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
          }
          block_offset_ = 0;
        }
    
    • 计算block剩余大小,本次可写入长度
    const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
    const size_t fragment_length = (left < avail) ? left : avail;
    
    • 判断logType
    RecordType type;
        const bool end = (left == fragment_length);
        if (begin && end) {
          type = kFullType;
        } else if (begin) {
          type = kFirstType;
        } else if (end) {
          type = kLastType;
        } else {
          type = kMiddleType;
        }
    
    • 调用EmitPhysicalRecord函数,append日志;并更新指针、剩余长度和begin标记。
    s = EmitPhysicalRecord(type, ptr, fragment_length);
    ptr += fragment_length;
    left -= fragment_length;
    begin = false;
    

    EmitPhysicalRecord

    这是实际写入的地方

    • 首先计算head,并append到log中
      // Format the header
      char buf[kHeaderSize];
      buf[4] = static_cast<char>(n & 0xff);
      buf[5] = static_cast<char>(n >> 8);
      buf[6] = static_cast<char>(t);
    
      // Compute the crc of the record type and the payload.
      uint32_t crc = crc32c::Extend(type_crc_[t], ptr, n);
      crc = crc32c::Mask(crc);                 // Adjust for storage
      EncodeFixed32(buf, crc);
      // Write the header and the payload
      Status s = dest_->Append(Slice(buf, kHeaderSize));
    
    • 写入,并Flush,更新block的当前偏移
    if (s.ok()) {
        s = dest_->Append(Slice(ptr, n));
        if (s.ok()) {
          s = dest_->Flush();
        }
      }
      block_offset_ += kHeaderSize + n;
    

    Reader

    先看下Reader类的成员变量(基本都有注释,无需详解)

      SequentialFile* const file_;
      Reporter* const reporter_;
      bool const checksum_;
      char* const backing_store_;
      Slice buffer_;
      bool eof_;   // Last Read() indicated EOF by returning < kBlockSize
    
      // Offset of the last record returned by ReadRecord.
      uint64_t last_record_offset_;
      // Offset of the first location past the end of buffer_.
      uint64_t end_of_buffer_offset_;
    
      // Offset at which to start looking for the first record to return
      uint64_t const initial_offset_;
    
      // True if we are resynchronizing after a seek (initial_offset_ > 0). In
      // particular, a run of kMiddleType and kLastType records can be silently
      // skipped in this mode
      bool resyncing_;
      // Extend record types with the following special values
      enum {
        kEof = kMaxRecordType + 1,
        // Returned whenever we find an invalid physical record.
        // Currently there are three situations in which this happens:
        // * The record has an invalid CRC (ReadPhysicalRecord reports a drop)
        // * The record is a 0-length record (No drop is reported)
        // * The record is below constructor's initial_offset (No drop is reported)
        kBadRecord = kMaxRecordType + 2
      };
    

    需要注意的两个类

    Reporter : 用来记录错误的产生
    SequentialFile: 用来从log文件中读取数据

    Reader类公开了两个接口ReadRecordLastRecordOffset,比较重要的也是ReadRecord,下面重点分析一下这个函数:

    • 根据initialoffset跳转到调用者指定的位置,开始读取日志文件。跳转就是直接调用SequentialFile的Seek接口。另外,需要先调整调用者传入的initialoffset参数,调整和跳转逻辑在SkipToInitialBlock函数中。
     if (last_record_offset_ < initial_offset_) {
        if (!SkipToInitialBlock()) {
          return false;
        }
      }
    

    SkipToInitialBlock的函数逻辑如下

    bool Reader::SkipToInitialBlock() {
      // 计算在block内的偏移位置,并圆整到开始读取block的起始位置
      size_t offset_in_block = initial_offset_ % kBlockSize;
      uint64_t block_start_location = initial_offset_ - offset_in_block;
      // 如果偏移在最后的6byte里,肯定不是一条完整的记录,跳到下一个block
      if (offset_in_block > kBlockSize - 6) {
        offset_in_block = 0;
        block_start_location += kBlockSize;
      }
     // 设置读取偏移
      end_of_buffer_offset_ = block_start_location;
      // Skip to start of first block that can contain the initial record
      if (block_start_location > 0) {
        Status skip_status = file_->Skip(block_start_location);
        if (!skip_status.ok()) {
          ReportDrop(block_start_location, skip_status);
          return false;
        }
      }
      return true;
    }
    
    • 进入while循环之前进行一些标记
      bool in_fragmented_record = false;// 是否遇到FIRST类型的type
      // Record offset of the logical record that we're reading
      // 0 is a dummy value to make compilers happy
      uint64_t prospective_record_offset = 0;//正在读取的逻辑record的偏移
    
    • 进入到while(true)循环,直到读取到KLastType或者KFullType的record,或者到了文件结尾。读取出现错误时,并不会退出循环,而是汇报错误,继续执行,直到成功读取一条user record,或者遇到文件结尾。
      • 首先读取一个record采用的是ReadPhysicalRecord函数
    unsigned int Reader::ReadPhysicalRecord(Slice* result) {
      while (true) {
        if (buffer_.size() < kHeaderSize) {
          // 如果未到达文件结尾,清空buffer,读取数据
          if (!eof_) {
            buffer_.clear();
            Status status = file_->Read(kBlockSize, &buffer_, backing_store_);
            end_of_buffer_offset_ += buffer_.size();
            if (!status.ok()) {
              buffer_.clear();
              ReportDrop(kBlockSize, status);
              eof_ = true;
              return kEof;
            } else if (buffer_.size() < kBlockSize) {  // 实际读取字节<指定(Block Size),表明到了文件结尾
              eof_ = true;
            }
            continue;
          } else {
            buffer_.clear();
            return kEof;
          }
        }
        // 解析record头
        const char* header = buffer_.data();
        const uint32_t a = static_cast<uint32_t>(header[4]) & 0xff;
        const uint32_t b = static_cast<uint32_t>(header[5]) & 0xff;
        const unsigned int type = header[6];
        const uint32_t length = a | (b << 8);
        // 长度超出,汇报错误
        if (kHeaderSize + length > buffer_.size()) {
          size_t drop_size = buffer_.size();
          buffer_.clear();
          if (!eof_) {
            ReportCorruption(drop_size, "bad record length");
            return kBadRecord;
          }
          return kEof;
        }
        
        if (type == kZeroType && length == 0) { // 对于Zero Type类型处理
          buffer_.clear();
          return kBadRecord;
        }
    
        // 检查crc
        if (checksum_) {
          uint32_t expected_crc = crc32c::Unmask(DecodeFixed32(header));
          uint32_t actual_crc = crc32c::Value(header + 6, 1 + length);
          if (actual_crc != expected_crc) {  // 如果出错,汇报错误
            size_t drop_size = buffer_.size();
            buffer_.clear();
            ReportCorruption(drop_size, "checksum mismatch");
            return kBadRecord;
          }
        }
    
        buffer_.remove_prefix(kHeaderSize + length);
    
        if (end_of_buffer_offset_ - buffer_.size() - kHeaderSize - length <
            initial_offset_) {
          result->clear();
          return kBadRecord;
        }
        *result = Slice(header + kHeaderSize, length);
        return type;
      }
    
    • 接下来,对所读取的record进行判断
      • 如果一开始就读到kMiddleType, kLastType的record显然是不正确(完整)的,所以简单的抛弃,继续读后面的。
    if (resyncing_) {
          if (record_type == kMiddleType) {
            continue;
          } else if (record_type == kLastType) {
            resyncing_ = false;
            continue;
          } else {
            resyncing_ = false;
          }
        }
    
    • 根据类型采取相应动作(逻辑非常简单)
    switch (record_type) {
          case kFullType:
            if (in_fragmented_record) {
              if (scratch->empty()) {
                in_fragmented_record = false;
              } else {
                ReportCorruption(scratch->size(), "partial record without end(1)");
              }
            }
            prospective_record_offset = physical_record_offset;
            scratch->clear();
            *record = fragment;
            last_record_offset_ = prospective_record_offset;
            return true;
    
          case kFirstType:
            if (in_fragmented_record) {
              if (scratch->empty()) {
                in_fragmented_record = false;
              } else {
                ReportCorruption(scratch->size(), "partial record without end(2)");
              }
            }
            prospective_record_offset = physical_record_offset;
            scratch->assign(fragment.data(), fragment.size());
            in_fragmented_record = true;
            break;
    
          case kMiddleType:
            if (!in_fragmented_record) {
              ReportCorruption(fragment.size(),
                               "missing start of fragmented record(1)");
            } else {
              scratch->append(fragment.data(), fragment.size());
            }
            break;
    
          case kLastType:
            if (!in_fragmented_record) {
              ReportCorruption(fragment.size(),
                               "missing start of fragmented record(2)");
            } else {
              scratch->append(fragment.data(), fragment.size());
              *record = Slice(*scratch);
              last_record_offset_ = prospective_record_offset;
              return true;
            }
            break;
    
          case kEof:
            if (in_fragmented_record) {
              scratch->clear();
            }
            return false;
    
          case kBadRecord:
            if (in_fragmented_record) {
              ReportCorruption(scratch->size(), "error in middle of record");
              in_fragmented_record = false;
              scratch->clear();
            }
            break;
    
          default: {
            char buf[40];
            snprintf(buf, sizeof(buf), "unknown record type %u", record_type);
            ReportCorruption(
                (fragment.size() + (in_fragmented_record ? scratch->size() : 0)),
                buf);
            in_fragmented_record = false;
            scratch->clear();
            break;
          }
        }
    

    相关文章

      网友评论

        本文标题:leveldb源码学习--log

        本文链接:https://www.haomeiwen.com/subject/uzvcwttx.html