代码详细见 log_writer.h / log_writer.cc。
成员
WritableFile *dest_;
int block_offset_; // Current offset in block
// crc32c values for all supported record types. These are
// pre-computed to reduce the overhead of computing the crc of the
// record type stored in the header.
uint32_t type_crc_[kMaxRecordType + 1];
AddRecord
该方法将记录写入一个Slice结构,调用 AddRecord
就会写入 WALLog 文件,AddRecord 根据写入的记录大小确定是否需要跨块,并据此得出相应的头部类型,然后将记录写入并且刷新到磁盘。
先来几个例子:
假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为500字节,一个块大小为32768字节,当前已经写入1000字节,那么该条500字节的记录可以完整的写入这个块中。
假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为 31755 字节,当前块还剩下:32768 - 1000 = 31768,那么写入该 record + header 则 剩下 32768 - 1000 - 31755 - 7 = 6 字节,而一个 record header 需要7字节,这时,这块剩余的6字节将被填充为 \x00\x00\x00\x00\x00\x00。
image.png假设现在当前 WALLog 文件已经写入了 1000 字节,当前的待写入的 record 记录为 50000 字节,那么最终会为:
image.pngStatus Writer::AddRecord(const Slice &slice)
{
const char *ptr = slice.data();
size_t left = slice.size();
// Fragment the record if necessary and emit it. Note that if slice
// is empty, we still want to iterate once to emit a single
// zero-length record
Status s;
bool begin = true;
do
{
const int leftover = kBlockSize - block_offset_;
assert(leftover >= 0);
if (leftover < kHeaderSize)
{
// Switch to a new block
if (leftover > 0)
{
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
// Invariant: we never leave < kHeaderSize bytes in a block.
assert(kBlockSize - block_offset_ - kHeaderSize >= 0);
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
RecordType type;
const bool end = (left == fragment_length);
if (begin && end)
{
type = kFullType;
}
else if (begin)
{
type = kFirstType;
}
else if (end)
{
type = kLastType;
}
else
{
type = kMiddleType;
}
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
} while (s.ok() && left > 0);
return s;
}
const char *ptr = slice.data();
size_t left = slice.size();
left 表示此次写入数据的长度。随后进入do-while循环。
查看当前 block 剩下的是否 <7,如果 <7 则补位,并重置 block偏移。这样做的目的是保证:header部分不会跨block。
const int leftover = kBlockSize - block_offset_; //
assert(leftover >= 0);
if (leftover < kHeaderSize)
{
// Switch to a new block
if (leftover > 0)
{
// Fill the trailer (literal below relies on kHeaderSize being 7)
static_assert(kHeaderSize == 7, "");
dest_->Append(Slice("\x00\x00\x00\x00\x00\x00", leftover));
}
block_offset_ = 0;
}
计算 block 剩余大小(除开header长度)avail ,以及本次 log record 可写入数据长度 fragment_length 。
const size_t avail = kBlockSize - block_offset_ - kHeaderSize;
const size_t fragment_length = (left < avail) ? left : avail;
下面分为几种情况,决定此次的 RecordType
const bool end = (left == fragment_length);
if (begin && end) { type = kFullType; }
else if (begin) { type = kFirstType; }
else if (end) { type = kLastType; }
else { type = kMiddleType; }
s = EmitPhysicalRecord(type, ptr, fragment_length);
ptr += fragment_length;
left -= fragment_length;
begin = false;
1、假设 left = 36,avail = 18,fragment_length = avail = 18,显然当前的Record不够,需要2个。所以第一次(begin && end) 成立,为
kFirstType,
2、假设left = 18,avail=36,fragment_length = avail = 18,显然,当前的Record就够了,只需要一次,(begin && end)成立,为kFullType
Status Writer::EmitPhysicalRecord(RecordType t, const char *ptr, size_t length)
{
assert(length <= 0xffff); // Must fit in two bytes
assert(block_offset_ + kHeaderSize + length <= kBlockSize);
// Format the header
char buf[kHeaderSize];
buf[4] = static_cast<char>(length & 0xff);
buf[5] = static_cast<char>(length >> 8);
buf[6] = static_cast<char>(t);
// Compute the crc of the record type and the payload.
uint32_t crc = crc32c::Extend(type_crc_[t], ptr, length);
crc = crc32c::Mask(crc); // Adjust for storage
EncodeFixed32(buf, crc);
// Write the header and the payload
Status s = dest_->Append(Slice(buf, kHeaderSize));
if (s.ok())
{
s = dest_->Append(Slice(ptr, length));
if (s.ok())
{
s = dest_->Flush();
}
}
block_offset_ += kHeaderSize + length;
return s;
}
WALLog 文件
LevelDB 每次进行写操作时,都需要调用AddRecord
方法向 WALLog 文件写入此次的增加的键值对,并且根据WriteOptions
来决定是否需要进行刷新磁盘操作。调用AddRecord
的代码位于中的 write 方法
DBImpl::Write(const WriteOptions &options, WriteBatch *updates)
{
...
status = log_->AddRecord(WriteBatchInternal::Contents(write_batch));
bool sync_error = false;
if (status.ok() && options.sync)
{
status = logfile_->Sync(); // 调用刷盘函数
if (!status.ok())
{
sync_error = true;
}
}
...
}
最终插入WALLog文件的数据格式可能如下:
image.png
- 序列号:8个字节,表示表该批次写入的序列号
- 个数:占4个字节,表示该批次有多少key-value对
- 类型:0x0 表示删除该键值对,0x1表示新增该键值对
网友评论