Ceph Pacific v16.2.13
Rgw 所有的请求都会交由 process_request
来处理,主要分为三个步骤,源文件 src/rgw/rgw_process.cc
-
获取 rgw handler 并初始化
-
根据请求获取对应的 Op
-
Op 处理
int process_request(rgw::sal::RGWRadosStore* const store,
RGWREST* const rest,
RGWRequest* const req,
const std::string& frontend_prefix,
const rgw_auth_registry_t& auth_registry,
RGWRestfulIO* const client_io,
OpsLogSink* const olog,
optional_yield yield,
rgw::dmclock::Scheduler *scheduler,
string* user,
ceph::coarse_real_clock::duration* latency,
int* http_ret)
{
int ret = client_io->init(g_ceph_context); cct:
dout(1) << "====== starting new request req=" << hex << req << dec
<< " =====" << dendl;
......
s->req_id = store->svc()->zone_utils->unique_id(req->id);
s->trans_id = store->svc()->zone_utils->unique_trans_id(req->id);
s->host_id = store->getRados()->host_id;
s->yield = yield;
ldpp_dout(s, 2) << "initializing for trans_id = " << s->trans_id << dendl;
RGWOp* op = nullptr;
int init_error = 0;
bool should_log = false;
RGWRESTMgr *mgr;
RGWHandler_REST *handler = rest->get_handler(store, s, // 这里根据 URL 里面是否包含 bucket、Object 字段会进一步获取到对应的 handler 类型
auth_registry,
frontend_prefix,
client_io, &mgr, &init_error);
......
ldpp_dout(s, 2) << "getting op " << s->op << dendl;
op = handler->get_op(); // 获取 Op,源文件 src/rgw/rgw_rest_s3.cc 以 s3cmd get 为例,对应的是 RGWGetObj_ObjStore_S3
if (!op) {
abort_early(s, NULL, -ERR_METHOD_NOT_ALLOWED, handler, yield);
goto done;
}
.......
req->op = op;
ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;
......
try {
.....
ret = rgw_process_authenticated(handler, op, req, s, yield); // op 处理
if (ret < 0) {
abort_early(s, op, ret, handler, yield);
goto done;
}
.....
} catch (const ceph::crypto::DigestException& e) {
}
.....
try {
client_io->complete_request();
} catch (rgw::io::Exception& e) {
dout(0) << "ERROR: client_io->complete_request() returned "
<< e.what() << dendl;
}
......
if (op) {
op_ret = op->get_ret();
ldpp_dout(op, 2) << "op status=" << op_ret << dendl;
ldpp_dout(op, 2) << "http status=" << s->err.http_ret << dendl;
} else {
ldpp_dout(s, 2) << "http status=" << s->err.http_ret << dendl;
}
......
dout(1) << "====== req done req=" << hex << req << dec
<< " op status=" << op_ret
<< " http_status=" << s->err.http_ret
<< " latency=" << lat
<< " ======"
<< dendl;
return (ret < 0 ? ret : s->err.ret);
} /* process_request */
rgw handler
获取 rgw handler
以 s3cmd get
为例,返回实例化的 RGWHandler_REST_Obj_S3
~/go/src/ceph (test ✗) s3cmd get s3://test/4M
download: 's3://test/4M' -> './4M' [1 of 1]
4194304 of 4194304 100% in 0s 6.51 MB/s done
(gdb) p s->init_state
$18 = {
url_bucket = "test",
src_bucket = ""
}
(gdb) p *s
$25 = (req_state) {
<DoutPrefixProvider> = {
_vptr.DoutPrefixProvider = 0x7fb52a464418 <vtable for req_state+16>
},
members of req_state:
cct = 0x559e0ab90000,
cio = 0x7fb52a713730,
op = OP_GET,
op_type = RGW_OP_UNKNOWN,
content_started = false,
format = 0,
formatter = 0x0,
decoded_uri = "/test/4M",
relative_uri = "/test/4M", // URL 中会包含对象名
length = 0x559e0bca0290 "0",
content_length = 0,
......
RGWHandler_REST* RGWRESTMgr_S3::get_handler(rgw::sal::RGWRadosStore *store,
struct req_state* const s,
const rgw::auth::StrategyRegistry& auth_registry,
const std::string& frontend_prefix)
{
bool is_s3website = enable_s3website && (s->prot_flags & RGW_REST_WEBSITE);
int ret =
RGWHandler_REST_S3::init_from_header(store, s,
is_s3website ? RGW_FORMAT_HTML :
RGW_FORMAT_XML, true);
if (ret < 0)
return NULL;
RGWHandler_REST* handler;
// TODO: Make this more readable
if (is_s3website) {
if (s->init_state.url_bucket.empty()) {
handler = new RGWHandler_REST_Service_S3Website(auth_registry);
} else if (rgw::sal::RGWObject::empty(s->object.get())) { o:
handler = new RGWHandler_REST_Bucket_S3Website(auth_registry);
} else {
handler = new RGWHandler_REST_Obj_S3Website(auth_registry);
}
} else {
if (s->init_state.url_bucket.empty()) { // bucket 为空
handler = new RGWHandler_REST_Service_S3(auth_registry, enable_sts, enable_iam, enable_pubsub); isSTSEnabled: isIAMEnabled: isPSEnabl
} else if (rgw::sal::RGWObject::empty(s->object.get())) {
handler = new RGWHandler_REST_Bucket_S3(auth_registry, enable_pubsub); // obj为空
} else {
handler = new RGWHandler_REST_Obj_S3(auth_registry); // bucket 和 object 都不为空, s->object 在上面 init_from_header 中被赋值,返回实例化的 RGWHandler_REST_Obj_S3
}
}
ldpp_dout(s, 20) << __func__ << " handler=" << typeid(*handler).name()
<< dendl;
return handler;
}
init_from_header
会解析 url 并为 s->object 赋值
(gdb) p s->bucket
$30 = std::unique_ptr<rgw::sal::RGWBucket> = {
get() = 0x0
}
(gdb) p encoded_obj_str
$31 = "4M"
// src/rgw/rgw_sal_rados.cc
std::unique_ptr<RGWObject> RGWRadosStore::get_object(const rgw_obj_key& k)
{
return std::unique_ptr<RGWObject>(new RGWRadosObject(this, k));
}
rgw handler 初始化
int RGWHandler_REST_S3::init(rgw::sal::RGWRadosStore *store, struct req_state *s,
rgw::io::BasicClient *cio)
{
int ret;
s->dialect = "s3"; // 设置 req_state 方言为 s3
ret = rgw_validate_tenant_name(s->bucket_tenant);
if (ret)
return ret;
if (!s->bucket_name.empty()) { // 注意,此时 s->bucket_name 为空
ret = validate_object_name(s->object->get_name()); // 如果 bucket name 不为空的话,校验对象名
if (ret)
return ret;
}
......
const char *sc = s->info.env->get("HTTP_X_AMZ_STORAGE_CLASS");
if (sc) { // 设置 storage class
s->info.storage_class = sc;
}
return RGWHandler_REST::init(store, s, cio);
}
Op
获取 Op
RGWOp* RGWHandler_REST::get_op(void)
{
RGWOp *op;
switch (s->op) {
case OP_GET:
op = op_get();
break;
case OP_PUT:
op = op_put();
break;
case OP_DELETE:
op = op_delete();
break;
case OP_HEAD:
op = op_head();
break;
case OP_POST:
op = op_post();
break;
case OP_COPY:
op = op_copy();
break;
case OP_OPTIONS:
op = op_options();
break;
default:
return NULL;
}
if (op) {
op->init(store, s, this);
}
return op;
} /* get_op */
根据 req_state.http_op
来生成不同的 rgw op
enum http_op {
OP_GET,
OP_PUT,
OP_DELETE,
OP_HEAD,
OP_POST,
OP_COPY,
OP_OPTIONS,
OP_UNKNOWN,
}
对于 get object 请求,其对应的 http op 为 OP_GET
请求,源文件 src/rgw/rgw_rest_s3.cc
RGWOp *RGWHandler_REST_Obj_S3::op_get()
{
......
return get_obj_op(true);
}
由于需要读取数据,因此 get_data
参数为 true,如果是 head
请求则此参数为 false。
RGWOp *RGWHandler_REST_Obj_S3::get_obj_op(bool get_data) const
{
RGWGetObj_ObjStore_S3 *get_obj_op = new RGWGetObj_ObjStore_S3;
get_obj_op->set_get_data(get_data);
return get_obj_op;
}
返回的 Op 类型实例为 RGWGetObj_ObjStore_S3
。 接着会进行 Op 初始化操作,会将 sent_header 设置为 false。
void init(rgw::sal::RGWRadosStore *store, struct req_state *s, RGWHandler *h) override {
RGWGetObj::init(store, s, h); dialect_handler:
sent_header = false; // sent_header 设置为 false
}
virtual void init(rgw::sal::RGWRadosStore *store, struct req_state *s, RGWHandler *dialect_handler) {
this->store = store;
this->s = s;
this->dialect_handler = dialect_handler;
}
处理 Op
Op 处理分为三个步骤
int rgw_process_authenticated(RGWHandler_REST * const handler,
RGWOp *& op,
RGWRequest * const req,
req_state * const s,
optional_yield y,
const bool skip_retarget)
{
......
ldpp_dout(op, 2) << "verifying op params" << dendl;
ret = op->verify_params();
if (ret < 0) {
return ret;
}
ldpp_dout(op, 2) << "pre-executing" << dendl;
op->pre_exec();
ldpp_dout(op, 2) << "executing" << dendl;
op->execute(y);
ldpp_dout(op, 2) << "completing" << dendl;
op->complete();
return 0;
}
Op 预处理
是否在 response header 中返回 bucket 名,参数 rgw_expose_bucket
默认为 false,因此 dump_bucket 不做任何操作。源文件 src/rgw/rgw_op.cc
void RGWGetObj::pre_exec()
{
rgw_bucket_object_pre_exec(s);
}
void rgw_bucket_object_pre_exec(struct req_state *s)
{
if (s->expect_cont)
dump_continue(s);
dump_bucket_from_state(s);
}
Op 处理
void RGWGetObj::execute(optional_yield y)
{
......
int64_t ofs_x, end_x;
RGWGetObj_CB cb(this); // get obj 回调函数,flush 的时候会调用,即为 send_response_data
RGWGetObj_Filter* filter = (RGWGetObj_Filter *)&cb;
......
std::unique_ptr<rgw::sal::RGWObject::ReadOp> read_op(s->object->get_read_op(s->obj_ctx));
op_ret = get_params(y); // 设置参数
op_ret = init_common();
......
op_ret = read_op->prepare(s->yield, this);
......
op_ret = s->object->range_to_ofs(s->obj_size, ofs, end);
if (need_object_expiration() && s->object->is_expired()) { // 对象是否过期
op_ret = -ENOENT;
goto done_err;
}
......
ofs_x = ofs;
end_x = end;
filter->fixup_range(ofs_x, end_x);
op_ret = read_op->iterate(this, ofs_x, end_x, filter, s->yield); // 读取数据 src/rgw/rgw_rados.cc
if (op_ret >= 0)
op_ret = filter->flush();
perfcounter->tinc(l_rgw_get_lat, s->time_elapsed());
if (op_ret < 0) {
goto done_err;
}
op_ret = send_response_data(bl, 0, 0);
if (op_ret < 0) {
goto done_err;
}
return;
}
- 设置回调函数
class RGWGetObj_CB : public RGWGetObj_Filter
{
RGWGetObj *op;
public:
explicit RGWGetObj_CB(RGWGetObj *_op) : op(_op) {}
~RGWGetObj_CB() override {}
int handle_data(bufferlist& bl, off_t bl_ofs, off_t bl_len) override {
return op->get_data_cb(bl, bl_ofs, bl_len);
}
};
int RGWGetObj::get_data_cb(bufferlist& bl, off_t bl_ofs, off_t bl_len)
{
/* garbage collection related handling:
* defer_gc disabled for https://tracker.ceph.com/issues/47866 */
return send_response_data(bl, bl_ofs, bl_len);
}
- 获取 read_op,源文件
src/rgw/rgw_sal_rados.cc
std::unique_ptr<RGWObject::ReadOp> RGWRadosObject::get_read_op(RGWObjectCtx *ctx)
{
return std::unique_ptr<RGWObject::ReadOp>(new RGWRadosObject::RadosReadOp(this, ctx));
}
RGWRadosObject::RadosReadOp::RadosReadOp(RGWRadosObject *_source, RGWObjectCtx *_rctx) :
source(_source),
rctx(_rctx),
op_target(_source->store->getRados(), // 设置 RGWRados::Object
_source->get_bucket()->get_info(),
*static_cast<RGWObjectCtx *>(rctx),
_source->get_obj()),
parent_op(&op_target) // 设置 RGWRados::Object::Read 的 source
{ }
RadosReadOp prepare 操作会调用 Read 的 prepare 操作源文件 src/rgw/rgw_sal_rados.cc
int RGWRadosObject::RadosReadOp::prepare(optional_yield y, const DoutPrefixProvider *dpp)
{
uint64_t obj_size;
......
int ret = parent_op.prepare(y, dpp); // read prepare
if (ret < 0)
return ret;
source->set_key(parent_op.state.obj.key);
source->set_obj_size(obj_size);
result.head_obj = parent_op.state.head_obj; // 设置 head obj
return ret;
}
(gdb) p parent_op.state.obj.key
$50 = {
name = "4M",
instance = "",
ns = ""
}
(gdb) p parent_op.state.head_obj
$49 = {
pool = {
name = "default.rgw.buckets.data",
ns = ""
},
oid = "e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M",
loc = ""
}
Read::prepare
int RGWRados::Object::Read::prepare(optional_yield y, const DoutPrefixProvider *dpp)
{
RGWRados *store = source->get_store();
CephContext *cct = store->ctx();
bufferlist etag;
map<string, bufferlist>::iterator iter;
RGWObjState *astate;
int r = source->get_state(dpp, &astate, true, y); // 获取 obj 对应的 state
if (r < 0)
return r;
if (!astate->exists) {
return -ENOENT;
}
const RGWBucketInfo& bucket_info = source->get_bucket_info();
state.obj = astate->obj;
store->obj_to_raw(bucket_info.placement_rule, state.obj, &state.head_obj); // 设置 oid, 数据池等等
state.cur_pool = state.head_obj.pool;
state.cur_ioctx = &state.io_ctxs[state.cur_pool];
r = store->get_obj_head_ioctx(dpp, bucket_info, state.obj, state.cur_ioctx); // 打开数据池
if (r < 0) {
return r;
}
......
}
获取对象所在的数据池以及 head obj
(gdb) p state.obj
$40 = {
bucket = {
tenant = "",
name = "test",
marker = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
bucket_id = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
explicit_placement = {
data_pool = {
name = "",
ns = ""
},
data_extra_pool = {
name = "",
ns = ""
},
index_pool = {
name = "",
ns = ""
}
}
},
key = {
name = "4M",
instance = "",
ns = ""
},
in_extra_data = false,
index_hash_source = ""
}
(gdb) p state.head_obj
$41 = {
pool = {
name = "default.rgw.buckets.data",
ns = ""
},
oid = "e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M",
loc = ""
}
查看 manifest 信息
~/go/src/ceph/build (test ✗) rados -p default.rgw.buckets.data listxattr e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M
user.rgw.acl
user.rgw.content_type
user.rgw.etag
user.rgw.idtag
user.rgw.manifest
user.rgw.pg_ver
user.rgw.source_zone
user.rgw.storage_class
user.rgw.tail_tag
user.rgw.x-amz-content-sha256
user.rgw.x-amz-date
user.rgw.x-amz-meta-s3cmd-attrs
~/go/src/ceph/build (test ✗) rados -p default.rgw.buckets.data getxattr e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M user.rgw.manifest > manifest.dump
~/go/src/ceph/build (test ✗) ceph-dencoder type RGWObjManifest import manifest.dump decode dump_json
{
"objs": [],
"obj_size": 4194304,
"explicit_objs": "false",
"head_size": 4194304,
"max_head_size": 4194304,
"prefix": ".z5mhYla6z6OVJdNS68EGQo0q9BkA-0O_",
"rules": [
{
"key": 0,
"val": {
"start_part_num": 0,
"start_ofs": 4194304,
"part_size": 0,
"stripe_max_size": 4194304,
"override_prefix": ""
}
}
],
"tail_instance": "",
"tail_placement": {
"bucket": {
"name": "test",
"marker": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"bucket_id": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"tenant": "",
"explicit_placement": {
"data_pool": "",
"data_extra_pool": "",
"index_pool": ""
}
},
"placement_rule": "default-placement"
},
"begin_iter": {
"part_ofs": 0,
"stripe_ofs": 0,
"ofs": 0,
"stripe_size": 4194304,
"cur_part_id": 0,
"cur_stripe": 0,
"cur_override_prefix": "",
"location": {
"placement_rule": "default-placement",
"obj": {
"bucket": {
"name": "test",
"marker": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"bucket_id": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"tenant": "",
"explicit_placement": {
"data_pool": "",
"data_extra_pool": "",
"index_pool": ""
}
},
"key": {
"name": "4M",
"instance": "",
"ns": ""
}
},
"raw_obj": {
"pool": "",
"oid": "",
"loc": ""
},
"is_raw": false
}
},
"end_iter": {
"part_ofs": 4194304,
"stripe_ofs": 4194304,
"ofs": 4194304,
"stripe_size": 0,
"cur_part_id": 0,
"cur_stripe": 1,
"cur_override_prefix": "",
"location": {
"placement_rule": "default-placement",
"obj": {
"bucket": {
"name": "test",
"marker": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"bucket_id": "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
"tenant": "",
"explicit_placement": {
"data_pool": "",
"data_extra_pool": "",
"index_pool": ""
}
},
"key": {
"name": ".z5mhYla6z6OVJdNS68EGQo0q9BkA-0O_1",
"instance": "",
"ns": "shadow"
}
},
"raw_obj": {
"pool": "",
"oid": "",
"loc": ""
},
"is_raw": false
}
}
}
读取数据
int RGWRados::Object::Read::iterate(const DoutPrefixProvider *dpp, int64_t ofs, int64_t end, RGWGetDataCB *cb,
optional_yield y)
{
RGWRados *store = source->get_store();
CephContext *cct = store->ctx();
RGWObjectCtx& obj_ctx = source->get_ctx();
const uint64_t chunk_size = cct->_conf->rgw_get_obj_max_req_size; // 默认 4M
const uint64_t window_size = cct->_conf->rgw_get_obj_window_size; // 默认 16M
auto aio = rgw::make_throttle(window_size, y);
get_obj_data data(store, cb, &*aio, ofs, y); // get_obj_data 的 flush 操作会调用客户端回调函数,将数据发送回客户端
// 从 rados 中遍历 obj,并调用回调函数 _get_obj_iterate_cb
int r = store->iterate_obj(dpp, obj_ctx, source->get_bucket_info(), state.obj,
ofs, end, chunk_size, _get_obj_iterate_cb, &data, y);
if (r < 0) {
ldpp_dout(dpp, 0) << "iterate_obj() failed with " << r << dendl;
data.cancel(); // drain completions without writing back to client
return r;
}
return data.drain();
}
-
RGWGetObj::execute
src/rgw/rgw_op.cc
-
read_op->iterate
-
RGWRados::Object::Read::iterate 其中 iterate 中会传入客户端回调函数 _get_obj_iterate_cb,本质上是调用 RGWRados::get_obj_iterate_cb,最后实际上是调用客户端回调函数 send_response_data 将数据发送到 client。
src/rgw/rgw_rados.cc
-
RGWRados::iterate_obj
-
obj_to_raw 获取 head obj 信息
-
get_obj_state 获取 obj state 信息
-
_get_obj_iterate_cb
-
RGWRados::get_obj_iterate_cb
-
RGWGetObj_ObjStore_S3::send_response_data
src/rgw/rgw_rest_s3.cc
-
dump_header
-
dump_body
-
-
-
-
-
-
-
int RGWRados::iterate_obj(const DoutPrefixProvider *dpp, RGWObjectCtx& obj_ctx,
const RGWBucketInfo& bucket_info, const rgw_obj& obj,
off_t ofs, off_t end, uint64_t max_chunk_size,
iterate_obj_cb cb, void *arg, optional_yield y)
{
rgw_raw_obj head_obj;
rgw_raw_obj read_obj;
uint64_t read_ofs = ofs;
uint64_t len;
bool reading_from_head = true;
RGWObjState *astate = NULL;
obj_to_raw(bucket_info.placement_rule, obj, &head_obj); // 获取 head obj 信息
int r = get_obj_state(dpp, &obj_ctx, bucket_info, obj, &astate, false, y); // 获取 obj state 信息
if (r < 0) {
return r;
}
if (end < 0)
len = 0;
else
len = end - ofs + 1;
if (astate->manifest) { // 如果 manifest 不为空,manifest 数据通过 head obj 的 attrs 进行获取
/* now get the relevant object stripe */
RGWObjManifest::obj_iterator iter = astate->manifest->obj_find(dpp, ofs);
RGWObjManifest::obj_iterator obj_end = astate->manifest->obj_end(dpp);
for (; iter != obj_end && ofs <= end; ++iter) {
off_t stripe_ofs = iter.get_stripe_ofs();
off_t next_stripe_ofs = stripe_ofs + iter.get_stripe_size();
while (ofs < next_stripe_ofs && ofs <= end) {
read_obj = iter.get_location().get_raw_obj(store); // 设置对象或分段对象信息
uint64_t read_len = std::min(len, iter.get_stripe_size() - (ofs - stripe_ofs));
read_ofs = iter.location_ofs() + (ofs - stripe_ofs);
if (read_len > max_chunk_size) {
read_len = max_chunk_size;
}
reading_from_head = (read_obj == head_obj); // 头对象
// 根据分段信息从 rados 读取对象,并调用回调函数将结果返回给客户端
r = cb(dpp, read_obj, ofs, read_ofs, read_len, reading_from_head, astate, arg); // 读取分段信息并调用回调函数
if (r < 0) {
return r;
}
len -= read_len;
ofs += read_len;
}
}
} else {
while (ofs <= end) {
read_obj = head_obj;
uint64_t read_len = std::min(len, max_chunk_size);
r = cb(dpp, read_obj, ofs, ofs, read_len, reading_from_head, astate, arg);
if (r < 0) {
return r;
}
len -= read_len;
ofs += read_len;
}
}
return 0;
}
其中 head obj 信息:
(gdb) p head_obj
$2 = {
pool = {
name = "default.rgw.buckets.data",
ns = ""
},
oid = "e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M",
loc = ""
}
obj state manifest 信息:
(gdb) p astate->manifest
$7 = std::optional<RGWObjManifest> = {
[contained value] = {
explicit_objs = false,
objs = std::map with 0 elements,
obj_size = 4194304,
obj = {
bucket = {
tenant = "",
name = "test",
marker = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
bucket_id = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
explicit_placement = {
data_pool = {
name = "",
ns = ""
},
data_extra_pool = {
name = "",
ns = ""
},
index_pool = {
name = "",
ns = ""
}
}
},
key = {
name = "4M",
instance = "",
ns = ""
},
in_extra_data = false,
index_hash_source = ""
},
head_size = 0,
head_placement_rule = {
name = "default-placement",
storage_class = ""
},
max_head_size = 0,
prefix = ".z5mhYla6z6OVJdNS68EGQo0q9BkA-0O_",
tail_placement = {
placement_rule = {
name = "default-placement",
storage_class = ""
},
bucket = {
tenant = "",
name = "test",
marker = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
bucket_id = "e0347465-84b6-45cf-b17e-bff817389164.4342.1",
explicit_placement = {
data_pool = {
name = "",
ns = ""
},
data_extra_pool = {
name = "",
ns = ""
},
index_pool = {
name = "",
ns = ""
}
}
}
},
rules = std::map with 2 elements = {
[0] = {
start_part_num = 0,
start_ofs = 4194304,
part_size = 0,
stripe_max_size = 4194304,
override_prefix = ""
}
},
tail_instance = ""
}
}
其中一个 read_obj 信息:
(gdb) p read_obj
$3 = {
pool = {
name = "default.rgw.buckets.data",
ns = ""
},
oid = "e0347465-84b6-45cf-b17e-bff817389164.4342.1_4M",
loc = ""
}
对应的回调函数为 RGWRados::get_obj_iterate_cb
static int _get_obj_iterate_cb(const DoutPrefixProvider *dpp,
const rgw_raw_obj& read_obj, off_t obj_ofs,
off_t read_ofs, off_t len, bool is_head_obj,
RGWObjState *astate, void *arg)
{
struct get_obj_data *d = (struct get_obj_data *)arg;
return d->store->get_obj_iterate_cb(dpp, read_obj, obj_ofs, read_ofs, len,
is_head_obj, astate, arg);
}
int RGWRados::get_obj_iterate_cb(const DoutPrefixProvider *dpp, // get object 回调函数
const rgw_raw_obj& read_obj, off_t obj_ofs,
off_t read_ofs, off_t len, bool is_head_obj,
RGWObjState *astate, void *arg)
{
ObjectReadOperation op;
struct get_obj_data *d = (struct get_obj_data *)arg;
string oid, key;
if (is_head_obj) { // 如果是头对象
/* only when reading from the head object do we need to do the atomic test */
int r = append_atomic_test(dpp, astate, op);
if (r < 0)
return r;
if (astate &&
obj_ofs < astate->data.length()) {
unsigned chunk_len = std::min((uint64_t)astate->data.length() - obj_ofs, (uint64_t)len);
r = d->client_cb->handle_data(astate->data, obj_ofs, chunk_len);
if (r < 0)
return r;
len -= chunk_len;
d->offset += chunk_len;
read_ofs += chunk_len;
obj_ofs += chunk_len;
if (!len)
return 0;
}
}
auto obj = d->store->svc.rados->obj(read_obj); // 从 rados 中读取分段对象
int r = obj.open(dpp);
if (r < 0) {
ldpp_dout(dpp, 4) << "failed to open rados context for " << read_obj << dendl;
return r;
}
ldpp_dout(dpp, 20) << "rados->get_obj_iterate_cb oid=" << read_obj.oid << " obj-ofs=" << obj_ofs << " read_ofs=" << read_ofs << " len=" << len << dendl;
op.read(read_ofs, len, nullptr, nullptr);
const uint64_t cost = len;
const uint64_t id = obj_ofs; // use logical object offset for sorting replies
auto completed = d->aio->get(obj, rgw::Aio::librados_op(std::move(op), d->yield), cost, id);
return d->flush(std::move(completed));
}
每个stripe
读取完成后,会触发一次调用flush()
,该函数会调用client_cb->handle_data()
把http body
数据发回客户端。客户端回调函数为 RGWGetObj_ObjStore_S3::send_response_data
int flush(rgw::AioResultList&& results) {
int r = rgw::check_for_errors(results);
if (r < 0) {
return r;
}
auto cmp = [](const auto& lhs, const auto& rhs) { return lhs.id < rhs.id; };
results.sort(cmp); // merge() requires results to be sorted first p:
completed.merge(results, cmp); // merge results in sorted order
while (!completed.empty() && completed.front().id == offset) {
auto bl = std::move(completed.front().data);
completed.pop_front_and_dispose(std::default_delete<rgw::AioResultEntry>{});
offset += bl.length();
int r = client_cb->handle_data(bl, 0, bl.length()); // 客户端回调函数, RGWGetObj_CB, 即为 send_response_data
if (r < 0) {
return r;
}
}
return 0;
}
int RGWGetObj_ObjStore_S3::send_response_data(bufferlist& bl, off_t bl_ofs,
off_t bl_len)
{
....
done:
for (riter = response_attrs.begin(); riter != response_attrs.end();
++riter) {
dump_header(s, riter->first, riter->second); name: val:
}
if (op_ret == -ERR_NOT_MODIFIED) {
end_header(s, this); op:
} else {
if (!content_type)
content_type = "binary/octet-stream";
end_header(s, this, content_type); // 发送 header 信息
}
if (metadata_bl.length()) {
dump_body(s, metadata_bl); &bl:
}
sent_header = true;
send_data:
if (get_data && !op_ret) {
int r = dump_body(s, bl.c_str() + bl_ofs, bl_len); // 发送 body
if (r < 0)
return r;
}
return 0;
}
manifest 获取流程
上面提到的流程 iterate_obj
时会判断 manifest 是否为空,但是 manifest 的数据是在哪里获取并赋值的呢,代码很多,我们使用 gdb 进行调试,当时翻了一下代码感觉是在 RGWRados::get_obj_state_impl
这里进行读取并赋值的,因此加断点进行验证。
(gdb) info b
Num Type Disp Enb Address What
1 breakpoint keep y <MULTIPLE>
breakpoint already hit 13 times
1.1 y 0x00007f3d7f909af0 <RGWRados::get_obj_state_impl(DoutPrefixProvider const*, RGWObjectCtx*, RGWBucketInfo co
nst&, rgw_obj const&, RGWObjState**, bool, optional_yield, bool)@plt>
1.2 y 0x00007f3d8014891c in RGWRados::get_obj_state_impl(DoutPrefixProvider const*, RGWObjectCtx*, RGWBucketInfo
const&, rgw_obj const&, RGWObjState**, bool, optional_yield, bool) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5354
2 breakpoint keep y 0x00007f3d80148d6d in RGWRados::get_obj_state_impl(DoutPrefixProvider const*, RGWObjectCtx*, RGWBucketInfo co
nst&, rgw_obj const&, RGWObjState**, bool, optional_yield, bool) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5382
breakpoint already hit 1 time
对应的栈信息如下所示
(gdb) bt
#0 RGWRados::get_obj_state_impl (this=0x55acfb5aa000, dpp=0x55acfbf73c00, rctx=0x7f48fe3679f0, bucket_info=..., obj=..., state=0x7f48fe366b08, follow_olh=true, y=..., assume_noent=false) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5371
#1 0x00007f48fd25c38d in RGWRados::get_obj_state (this=0x55acfb5aa000, dpp=0x55acfbf73c00, rctx=0x7f48fe3679f0, bucket_info=..., obj=..., state=0x7f48fe366b08, follow_olh=true, y=..., assume_noent=false) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5537
#2 0x00007f48fd25d575 in RGWRados::Object::get_state (this=0x55acfbdd61d8, dpp=0x55acfbf73c00, pstate=0x7f48fe366b08, follow_olh=true, y=..., assume_noent=false) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5682
#3 0x00007f48fd25c4ac in RGWRados::Object::Read::get_attr (this=0x55acfbdd6aa0, dpp=0x55acfbf73c00, name=0x7f48fd8af81d "user.rgw.acl", dest=..., y=...) at /root/go/src/ceph/src/rgw/rgw_rados.cc:5559
#4 0x00007f48fd3c5eb4 in rgw::sal::RGWRadosObject::RadosReadOp::get_attr (this=0x55acfbdd6100, dpp=0x55acfbf73c00, name=0x7f48fd8af81d "user.rgw.acl", dest=..., y=...) at /root/go/src/ceph/src/rgw/rgw_sal_rados.cc:701
#5 0x00007f48fd187774 in get_obj_policy_from_attr (dpp=0x55acfbf73c00, cct=0x55acfa9a6000, store=0x55acfa98e4c0, obj_ctx=..., bucket_info=..., bucket_attrs=std::map with 1 element = {...}, policy=0x55acfb9adb80, storage_class=0x0, obj=0x55acfcbc58c0, y=...) at /root/go/src/ceph/src/rgw/rgw_op.cc:259
#6 0x00007f48fd189336 in read_obj_policy (dpp=0x55acfbf73c00, store=0x55acfa98e4c0, s=0x7f48fe367ab0, bucket_info=..., bucket_attrs=std::map with 1 element = {...}, acl=0x55acfb9adb80, storage_class=0x0, policy=..., bucket=0x55acfbf6e900, object=0x55acfcbc58c0, y=..., copy_src=false) at /root/go/src/ceph/src/rgw/rgw_op.cc:483
#7 0x00007f48fd18c1dc in rgw_build_object_policies (dpp=0x55acfbf73c00, store=0x55acfa98e4c0, s=0x7f48fe367ab0, prefetch_data=false, y=...) at /root/go/src/ceph/src/rgw/rgw_op.cc:752
#8 0x00007f48fd1d41cc in RGWHandler::do_read_permissions (this=0x55acfcc01ae0, op=0x55acfbf73c00, only_bucket=false, y=...) at /root/go/src/ceph/src/rgw/rgw_op.cc:8279
#9 0x00007f48fd2cab61 in RGWHandler_REST::read_permissions (this=0x55acfcc01ae0, op_obj=0x55acfbf73c00, y=...) at /root/go/src/ceph/src/rgw/rgw_rest.cc:1910
#10 0x00007f48fcc39479 in rgw_process_authenticated (handler=0x55acfcc01ae0, op=@0x7f48fe367660: 0x55acfbf73c00, req=0x7f48fe368680, s=0x7f48fe367ab0, y=..., skip_retarget=false) at /root/go/src/ceph/src/rgw/rgw_process.cc:120
#11 0x00007f48fcc3ca1c in process_request (store=0x55acfa98e4c0, rest=0x7ffcb491e2c0, req=0x7f48fe368680, frontend_prefix="", auth_registry=..., client_io=0x7f48fe368730, olog=0x55acfb7212c0, yield=..., scheduler=0x55acfb81a368, user=0x7f48fe368850, latency=0x7f48fe368550, http_ret=0x7f48fe368544) at /root/go/src/ceph/src/rgw/rgw_process.cc:302
#12 0x00007f48fcb15e08 in (anonymous namespace)::handle_connection<boost::asio::basic_stream_socket<boost::asio::ip::tcp, boost::asio::io_context::executor_type> > (context=..., env=..., stream=..., timeout=..., buffer=..., is_ssl=false, pause_mutex=..., scheduler=0x55acfb81a368, ec=..., yield=...) at /root/go/src/ceph/src/rgw/rgw_asio_frontend.cc:276
#13 0x00007f48fcb10415 in (anonymous namespace)::AsioFrontend::<lambda(yield_context)>::operator()(yield_context) (__closure=0x55acfcb80db8, yield=...) at /root/go/src/ceph/src/rgw/rgw_asio_frontend.cc:1025
int RGWRados::get_obj_state_impl(const DoutPrefixProvider *dpp, RGWObjectCtx *rctx, const RGWBucketInfo& bucket_info, const rgw_obj& obj,
RGWObjState **state, bool follow_olh, optional_yield y, bool assume_noent)
{
if (obj.empty()) {
return -EINVAL;
}
bool need_follow_olh = follow_olh && obj.key.instance.empty();
RGWObjState *s = rctx->get_state(obj); // 这里会判断 obj 对应的 state 是否存在, 相当于缓存
ldpp_dout(dpp, 20) << "get_obj_state: rctx=" << (void *)rctx << " obj=" << obj << " state=" << (void *)s << " s->prefetch_data=" << s->prefetch_data << dendl;
*state = s;
if (s->has_attrs) { // 如果已经获取到 attrs 了会直接返回
if (s->is_olh && need_follow_olh) {
return get_olh_target_state(dpp, *rctx, bucket_info, obj, s, state, y);
}
return 0;
}
s->obj = obj;
rgw_raw_obj raw_obj;
obj_to_raw(bucket_info.placement_rule, obj, &raw_obj);
int r = -ENOENT;
if (!assume_noent) { // 注意这里会读取 head obj 的所有 attrs,然后赋值给 s->attrset
r = RGWRados::raw_obj_stat(dpp, raw_obj, &s->size, &s->mtime, &s->epoch, &s->attrset, (s->prefetch_data ? &s->data : NULL), NULL, y);
}
......
s->exists = true;
s->has_attrs = true;
s->accounted_size = s->size;
......
bufferlist manifest_bl = s->attrset[RGW_ATTR_MANIFEST];
if (manifest_bl.length()) {
auto miter = manifest_bl.cbegin(); : const_iterator
try {
s->manifest.emplace();
decode(*s->manifest, miter); // 设置 manifest
s->manifest->set_head(bucket_info.placement_rule, obj, s->size); /* patch manifest to reflect the head we just read, some manifests might be placement_rule: o: s:
broken due to old bugs */
s->size = s->manifest->get_obj_size();
if (!compressed)
s->accounted_size = s->size;
} catch (buffer::error& err) {
ldpp_dout(dpp, 0) << "ERROR: couldn't decode manifest" << dendl;
return -EIO;
}
ldpp_dout(dpp, 10) << "manifest: total_size = " << s->manifest->get_obj_size() << dendl;
if (cct->_conf->subsys.should_gather<ceph_subsys_rgw, 20>() && \
s->manifest->has_explicit_objs()) {
RGWObjManifest::obj_iterator mi;
for (mi = s->manifest->obj_begin(dpp); mi != s->manifest->obj_end(dpp); ++mi) {
ldpp_dout(dpp, 20) << "manifest: ofs=" << mi.get_ofs() << " loc=" << mi.get_location().get_raw_obj(store) << dendl;
}
}
......
}
调用栈
完整的调用栈如下所示:
-> process_request(store, rest, req, ...) // src/rgw/rgw_process.cc
-> RGWREST::get_handler(...) // 获取 rgw handler src/rgw/rgw_rest.cc
-> RGWRESTMgr_S3::get_handler(...) // src/rgw/rgw_rest_s3.cc
-> RGWHandler_REST_S3::init(...) // src/rgw/rgw_rest_s3.cc
-> RGWHandler_REST::get_op() // src/rgw/rgw_rest.cc
-> RGWHandler_REST_Obj_S3::op_get() // src/rgw/rgw_rest_s3.cc
-> RGWHandler_REST_Obj_S3::get_obj_op(true) // src/rgw/rgw_rest_s3.cc 返回一个 RGWListBucket_ObjStore_S3 对象。设置 get_data 为 true
-> RGWGetObj_ObjStore_S3::init(rgw::sal::RGWRadosStore *store, struct req_state *s, RGWHandler *h) // src/rgw/rgw_rest.h
-> RGWHandler_REST_S3::postauth_init() // src/rgw/rgw_rest_s3.cc
-> rgw_parse_url_bucket() // 根据 URL 解析设置 bucket 名
-> rgw_process_authenticated(handler, op, req, s)
-> RGWHandler_REST::init_permissions(op) // src/rgw/rgw_rest.cc
-> RGWHandler::do_init_permissions()
-> rgw_build_bucket_policies(store, s) // src/rgw/rgw_op.cc
-> RGWRadosStore::get_bucket(bucket) // 从 rados 中读取 bucket,并设置 bucket
-> RGWHandler_REST::read_permissions()
-> RGWHandler::do_read_permissions()
-> rgw_build_object_policies() // src/rgw/rgw_op.cc
-> RGWRadosObject::set_atomic() // src/rgw/rgw_sal_rados.cc
-> read_obj_policy(store, req_state *s, bucket_info, bucket_attrs, ..., bucket, object) // src/rgw/rgw_op.cc
-> get_obj_policy_from_attr(store, bucket_ifno, bucket_attrs, ..., obj)
-> RGWRadosObject::RadosReadOp::get_attr()
-> RGWRados::Object::Read::get_attr()
-> RGWRados::Object::get_state(pstate)
-> RGWRados::get_obj_state(bucket_info, obj, state)
-> RGWRados::get_obj_state_impl(bucket_info, obj, state) // src/rgw/rgw_rados.cc
-> RGWRados::raw_obj_stat(obj, &s->attrset) // 从 head obj 读取 attrs 并为 attrs 赋值
-> decode(s->manifest, s->attrset[RGW_ATTR_MANIFEST]) // 为 s->manifest 赋值
-> RGWGetObj::pre_exec()
-> RGWGetObj::execute() // src/rgw/rgw_op.cc
-> RGWGetObj_ObjStore::get_params() // src/rgw/rgw_rest.cc
-> RGWGetObj::init_common()
-> RGWRadosObject::RadosReadOp::prepare() // src/rgw/rgw_sal_rados.cc
-> RGWRados::Object::Read::prepare()
-> RGWRados::Object::get_state()
-> RGWRados::get_obj_state() // src/rgw/rgw_rados.cc
-> RGWRados::get_obj_state_impl()
-> RGWRados::obj_to_raw()
-> get_obj_bucket_and_oid_loc()
-> prepend_bucket_marker()
-> RGWRados::get_obj_data_pool()
-> RGWRados::get_obj_head_ioctx()
-> get_obj_bucket_and_oid_loc()
-> prepend_bucket_marker()
-> RGWRados::get_obj_data_pool()
-> RGWRados::open_pool_ctx() // src/rgw/rgw_rados.cc
-> RGWObject::range_to_ofs()
-> RGWRadosObject::RadosReadOp::iterate() // src/rgw/rgw_sal_rados.cc
-> RGWRados::Object::Read::iterate()
-> RGWRados::iterate_obj()
-> RGWRados::obj_to_raw()
-> get_obj_bucket_and_oid_loc()
-> prepend_bucket_marker()
-> RGWRados::get_obj_data_pool()
-> RGWRados::get_obj_state() // src/rgw/rgw_rados.cc
-> RGWRados::get_obj_state_impl()
-> rgw_obj_select::get_raw_obj()
-> RGWRadosStore::get_raw_obj() // src/rgw/rgw_sal_rados.cc
-> RGWRados::obj_to_raw()
-> get_obj_bucket_and_oid_loc()
-> prepend_bucket_marker()
-> RGWRados::get_obj_data_pool()
-> get_obj_data::flush() // src/rgw/rgw_rados.cc
-> RGWGetObj_ObjStore_S3::send_response_data() // src/rgw/rgw_rest_s3.cc
-> RGWOp::complete() // src/rgw/rgw_op.h
Log
13509 2023-09-21T16:25:44.021+0800 7f484ff8b700 2 req 1021457579059305375 0.011000000s s3:get_obj verifying op params
13510 2023-09-21T16:25:44.021+0800 7f484ff8b700 2 req 1021457579059305375 0.011000000s s3:get_obj pre-executing
13511 2023-09-21T16:25:44.021+0800 7f484ff8b700 2 req 1021457579059305375 0.011000000s s3:get_obj executing
13512 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj get_obj_state: rctx=0x7f48fe3679f0 obj=test:4M state=0x55acfbdcebe8 s->prefetch_data=1
13513 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.acl
13514 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.content_type
13515 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.etag
13516 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.idtag
13517 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.manifest
13518 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.pg_ver
13519 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.source_zone
13520 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.storage_class
13521 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.tail_tag
13522 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.x-amz-content-sha256
13523 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.x-amz-date
13524 2023-09-21T16:25:44.021+0800 7f484ff8b700 20 req 1021457579059305375 0.011000000s s3:get_obj Read xattr rgw_rados: user.rgw.x-amz-meta-s3cmd-attrs
13525 2023-09-21T16:25:44.021+0800 7f484ff8b700 15 req 1021457579059305375 0.011000000s Encryption mode:
网友评论