目的
- 提供read-modify-write语义操作,保证原子性。
- Merge-Operator
- 头文件: rocksdb/merge_operator.h
使用场景
-
需求先读取旧的值才能确定新值
-
数据的递增操作
-
比如:
-
list 的append操作
-
string的拼接操作
-
也可以将多个merge操作合并成一个merge操作( 叫做Partial merge或 Associative merge)
如何使用
- 如果client要使用rocksdb的mege操作,首先要实现
AssociativeMergeOperator
或MergeOperator
接口实现满足业务逻辑的mege操作,然后在参数Options.merge_operator
(shared_ptr类型)设置Merge实例对象。否则打开的Db默认不支持merge操作,调用时会抱Status::NotSupport
.
class DB {
// rocksdb提供的mege操作
virtual Status Merge(const WriteOptions& options, const Slice& key, const Slice& value) = 0;
};
Struct Options {
....
// 用户自定义的meger操作类,需要传进来
const std::shared_ptr<MergeOperator> merge_operator;
...
};
- AssociativeMergeOperator 接口
class AssociativeMergeOperator : public MergeOperator {
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.,可以根据key来识别key的业务类型,做不同的merge操作。
// existing_value:(IN) null indicates the key does not exist before this op
// value: (IN) the value to update/merge the existing_value with 对已有值进行update得值,比如对已有值累计1,那么该value应该是1。RocksDb的mege方法中的value值会被传到这里
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
virtual bool Merge(const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const = 0;
// The name of the MergeOperator. Used to check for MergeOperator
// mismatches (i.e., a DB created with one MergeOperator is
// accessed using a different MergeOperator)
virtual const char* Name() const = 0;
...
};
- MergeOperator接口,AssociativeMergeOperator 是MergeOperator的子类
class MergeOperator {
// Gives the client a way to express the read -> modify -> write semantics
// key: (IN) The key that's associated with this merge operation.
// existing: (IN) null indicates that the key does not exist before this op
// operand_list:(IN) the sequence of merge operations to apply, front() first.
// new_value: (OUT) Client is responsible for filling the merge result here
// logger: (IN) Client could use this to log errors during merge.
//
// Return true on success. Return false failure / error / corruption.
// 对key上的积攒的一批merge操作(operand_list)进行merge
virtual bool FullMerge(const Slice& key,
const Slice* existing_value,
const std::deque<std::string>& operand_list,
std::string* new_value,
Logger* logger) const = 0;
};
// This function performs merge(left_op, right_op)
// when both the operands are themselves merge operation types.
// Save the result in *new_value and return true. If it is impossible
// or infeasible to combine the two operations, return false instead.
// 如果两个merge操作可以合并成一个,则调用该方法。
virtual bool PartialMerge(const Slice& key,
const Slice& left_operand,
const Slice& right_operand,
std::string* new_value,
Logger* logger) const = 0;
MergeOperator和AssociativeMergeOperator的选用
-
使用AssociativeMergeOperator的场景
- merge操作数的值的格式和Put的值的格式相同
- 把多个操作数合并进一个也是ok的
-
MergeOperator的场景
- 如果AssociativeMergeOperator的使用条件不满足,则使用MergeOperator。当然MergeOperator可以实现AssociativeMergeOperator的功能。
- 如果有时候可以把多个merge操作合并成一个(允许出现不可以合并的情况)可以使用MergeOperator,并实现其PartialMerge()方法,返回true表示两个操作被合并成了一个。
- 案例:如果value是json,要对json中元素同merge进行修改,这时该操作不满足AssociativeMergeOperator的使用条件。因为其初始值可后续的merge操作值不能同等对待。具体看案例。
案例
- 实现累加器,方法
void Add(const string& key, unit64_t value)
,对key中的已有值累加value,如果key不存在则默认其初始值为0。
1. 先实现AssociativeMergeOperator接口类
// A 'model' merge operator with uint64 addition semantics
class UInt64AddOperator : public AssociativeMergeOperator {
public:
virtual bool Merge(
const Slice& key,
const Slice* existing_value,
const Slice& value,
std::string* new_value,
Logger* logger) const override {
// assuming 0 if no existing value
uint64_t existing = 0;
if (existing_value) {
if (!Deserialize(*existing_value, &existing)) {
// if existing_value is corrupted, treat it as 0
Log(logger, "existing value corruption");
existing = 0;
}
}
uint64_t oper;
if (!Deserialize(value, &oper)) {
// if operand is corrupted, treat it as 0
Log(logger, "operand value corruption");
oper = 0;
}
auto new = existing + oper;
*new_value = Serialize(new);
return true; // always return true for this, since we treat all errors as "zero".
}
virtual const char* Name() const override {
return "UInt64AddOperator";
}
};
- 实现Add方法
// Implement 'add' directly with the new Merge operation
class MergeBasedCounters : public RocksCounters {
public:
MergeBasedCounters(std::shared_ptr<DB> db);
// mapped to a leveldb Merge operation
virtual void Add(const string& key, uint64_t value) override {
string serialized = Serialize(value);
db_->Merge(merge_option_, key, serialized);
}
};
- 打开DB设置merge_operator,调用Add
DB* dbp;
Options options;
options.merge_operator.reset(new UInt64AddOperator);
DB::Open(options, "/tmp/db", &dbp);
std::shared_ptr<DB> db(dbp);
MergeBasedCounters counters(db);
counters.Add("a", 1);
...
uint64_t v;
counters.Get("a", &v);
注意
- rocksdb在执行的get,iteration,compaction等操作时,与merge operator的操作行为是有关联的。
- 如果在没有设置merge operator的Db上调用merge操作会返回
Status::NotSupport
- 同一个Db要打开时要使用相同的merge operator。
- 用户通过实现Merge operator接口来定义满足业务要求的merge语义,所以不同的实现往往有不同的业务含义。比如,累加器的mege是对原来的值加1,而json的mege操作是修改原有json中字段的值,list是mege操作是追加内容等等。
- rocksDB 的put和merge操作在实际中不会被立即执行,可能在下一次Get时候或者compaction时候才执行。How do these methods work?
链接
- Merge+Compaction Implementation Details: For RocksDB engineers who want to know how MergeOperator affects their code.
网友评论