libcurl使用
使用libcurl访问外部http时的一个基本流程:
1. curl_global_init(CURL_GLOBAL_ALL); //此函数只能调用一次
/*
** 以下函数可以保证只调用一次
** long curl_global_flags = CURL_GLOBAL_ALL;
** std::call_once(curl_init_flag, curl_global_init, curl_global_flags);
*/
2. CURLM *curlm = curl_multi_init();
3. 设置easy handle:
CURL *curl1 = NULL;
curl1 = curl_easy_init();
curl_easy_setopt(curl1, CURLOPT_URL, "https://stackoverflow.com/");
curl_easy_setopt(curl1, CURLOPT_WRITEFUNCTION, writeCallback);
4. 将easy handle添加至multi handle:
curl_multi_add_handle(curlm, curl1);
5. 利用select监听读写事件
curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
6. 当有状态发生变化时:
int mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
CURLMsg* msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left)
/*
** 读取easy handle信息
*/
curl_easy_getinfo();
7. 从multi handle中清理easy handle:
curl_multi_remove_handle((CURLM *)multi_handle, msg->easy_handle);
8. 清理全局信息
curl_multi_cleanup(multi_handle)
curl_global_cleanup();
解析Ceph中进行libcurl请求的实现:
- class RGWCurlHandle
- class RGWCurlHandles
- struct rgw_http_req_data
- class RGWHTTPClient
- class RGWHTTPManager
class RGWCurlHandle
对CURL*(easy handle)的一个简单封装
定义如下:
struct RGWCurlHandle {
int uses;
/*
** 上次使用的时间
*/
time_point lastuse;
/*
** easy interface
*/
CURL* h;
explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
CURL* operator*() {
return this->h;
}
};
class RGWCurlHandles
RGWCurlHandle pool,管理RGWCurlHandle的生成/释放。
/*
** MAXIDLE: 表示一个easy handle多长时间未使用,则重置此easy handle
*/
#define MAXIDLE 5
class RGWCurlHandles : public Thread {
public:
std::mutex cleaner_lock;
/*
** 存储RGWCCurHandle
*/
std::vector<RGWCurlHandle*> saved_curl;
/*
** 清理easy interface,终止此线程
*/
int cleaner_shutdown;
std::condition_variable cleaner_cond;
RGWCurlHandles() :
cleaner_shutdown{0} {
}
/*
** 获取curl handle
** 如果saved_curl为空,则新建一个easy handle:
** h = curl_easy_init();
** curl = new RGWCurlHandle{h};
*/
RGWCurlHandle* get_curl_handle();
void release_curl_handle_now(RGWCurlHandle* curl);
void release_curl_handle(RGWCurlHandle* curl);
void flush_curl_handles();
/*
** 线程函数,loop循环:
** 监听:cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
*/
void* entry();
void stop();
};
struct rgw_http_req_data
http client的数据:
/*
** curl_handle: easy handle
** curl_slist *h: http headers
** id: 此client的id
** done: 标记此client事件是否完成
** client: 指向httpclient
** control_io_id: io id
** user_info: 设置的用户信息
** registered: 标记是否已注册
** mgr: RGWHTTPManager
** lock/cond: 同步,client IO是否完成
** user_ret: 设置client请求结束返回的状态吗
** wait: client等待请求完成
** finish: RGWHTTPManager在client请求完成时,设置done,唤醒client;同时清理curl_easy_init返回的句柄,及curl_slist头部
*/
struct rgw_http_req_data : public RefCountedObject {
RGWCurlHandle *curl_handle{nullptr};
curl_slist *h{nullptr};
uint64_t id;
int ret{0};
std::atomic<bool> done = { false };
RGWHTTPClient *client{nullptr};
rgw_io_id control_io_id;
void *user_info{nullptr};
bool registered{false};
RGWHTTPManager *mgr{nullptr};
char error_buf[CURL_ERROR_SIZE];
bool write_paused{false};
bool read_paused{false};
optional<int> user_ret;
ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
ceph::condition_variable cond;
int wait(optional_yield y);{
...
std::unique_lock l{lock};
cond.wait(l, [this]{return done==true;});
}
void finish(int r, long http_status = -1) {
std::lock_guard l{lock};
ret = r;
do_curl_easy_cleanup(curl_handle);
curl_slist_free_all(h);
curl_handle = NULL;
h = NULL;
done = true;
cond.notify_all();
}
};
class RGWHTTPClient
对rgw_http_req_data的进一步封装,client的初始化,定义了此client的一系列读写回调函数。
基本的数据结构:
class RGWHTTPClient : public RGWIOProvider
{
friend class RGWHTTPManager;
/*
** 发送数据的缓存
*/
bufferlist send_bl;
bufferlist::iterator send_iter;
bool has_send_len;
long http_status;
bool send_data_hint{false};
/*
** 下次调用receive_data,跳过的字节数
*/
size_t receive_pause_skip{0};
void *user_info{nullptr};
/*
** 指向rgw_htt[_req_data,具体见上述
*/
rgw_http_req_data *req_data;
bool verify_ssl;
std::atomic<unsigned> stopped { 0 };
protected:
CephContext *cct;
/*
** 定义访问的url,http方法
** http头部,及超时设置
*/
string method;
string url;
param_vec_t headers;
long req_timeout{0L};
size_t send_len{0};
/*
** client的设置:
** 获取CURL* 句柄: curl_easy_init
** 设置CURL* 句柄参数:curl_easy_setopt
** headers的设置等内容
*/
int init_request(rgw_http_req_data *req_data);
RGWHTTPManager *get_manager();
virtual int receive_header(void *ptr, size_t len) {
return 0;
}
virtual int receive_data(void *ptr, size_t len, bool *pause) {
return 0;
}
virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
return 0;
}
/* Callbacks for libcurl. */
static size_t receive_http_header(void *ptr, size_t size,size_t nmemb,void *_info);
static size_t receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info);
static size_t send_http_data(void *ptr,size_t size,size_t nmemb,void *_info);
ceph::mutex& get_req_lock();
/* needs to be called under req_lock() */
void _set_write_paused(bool pause);
void _set_read_paused(bool pause);
public:
static const long HTTP_STATUS_NOSTATUS = 0;
static const long HTTP_STATUS_UNAUTHORIZED = 401;
static const long HTTP_STATUS_NOTFOUND = 404;
static constexpr int HTTPCLIENT_IO_READ = 0x1;
static constexpr int HTTPCLIENT_IO_WRITE = 0x2;
static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
virtual ~RGWHTTPClient();
explicit RGWHTTPClient(CephContext *cct,
const string& _method,
const string& _url)
: has_send_len(false),
http_status(HTTP_STATUS_NOSTATUS),
req_data(nullptr),
verify_ssl(cct->_conf->rgw_verify_ssl),
cct(cct),
method(_method),
url(_url) {
}
void append_header(const string& name, const string& val);
void set_send_length(size_t len);
void set_send_data_hint(bool hint);
long get_http_status() const;
void set_http_status(long _http_status);
void set_verify_ssl(bool flag);
// set request timeout in seconds
// zero (default) mean that request will never timeout
void set_req_timeout(long timeout);
int process(optional_yield y);
int wait(optional_yield y);
void cancel();
bool is_done();
rgw_http_req_data *get_req_data();
string to_str();
int get_req_retcode();
void set_url(const string& _url);
void set_method(const string& _method);
void set_io_user_info(void *_user_info) override ;
void *get_io_user_info() override ;
};
init_request
client的一些初始化设置:
- do_curl_easy_init: 从RGWCurlhandlers中获取CURL*句柄(curl_easy_init初始化的对象)
- curl_slist *h = headers_to_slist(headers): 设置curl的头部信息
- curl_easy_setopt: 设置curl一系列的参数
curl_easy_setopt:
/* 设置http请求的method */
curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
/* 设置要访问的url */
curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
/* 关闭进度条 */
curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
/* 屏蔽信号,不屏蔽SIGPIPE信号。在多线程中需要设置此选项,否则超时机制等产生的信号会导致进程crash */
curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
/* receive headers的回调函数 */
curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
/* 设置Header的回调函数参数的最后一个参数 */
curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
/* 设置下载数据的回调函数 */
curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
/*
** 设置write data回调函数参数的userdata,用来存储数据
** receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info): ptr指向收到的数据,_info即为req_data。数据大小:size * nmemb,将ptr中的数据拷贝至req_data
*/
curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
/* 错误信息的buffer */
curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
/*
** rgw_curl_low_speed_time: 低速传输的时间
** rgw_curl_low_speed_limit: 传输带宽限制
** 传输速度 < rgw_curl_low_speed_limit,持续rgw_curl_low_speed_time,则放弃传输
*/
curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
/*
** CURLOPT_READFUNCTION: data upload的回调函数
** CURLOPT_READDATA: 数据读取的缓冲
** send_http_data(void *ptr,size_t size,size_t nmemb,void *_info): _info为req_data, 将数据从req_data拷贝至ptr
*/
curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
/*
** 使能data upload功能
*/
if (send_data_hint || is_upload_request(method)) {
curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
}
/*
** CURLOPT_INFILESIZE: 要传输的文件的大小
** CURLOPT_POSTFIELDSIZE: 指向post data的数据大小
** 在上传大于1024Bytes数据时,默认设置"Expect: 100-continue" ,使得libcurl传输数据之前预先和服务器协商,是否允许上传。但是在部分服务端会存在bug或不会应答此类包。
** 因此设置curl_slist_append(h, "Expect:") 空头,不让libcurl添加:"Expect: 100-continue"
** CURLOPT_HTTPHEADER: 利用设置的头部替代内部的头,如上设置了"Expect:" 替代默认的"Expect: 100-continue"
*/
if (has_send_len) {
// TODO: prevent overflow by using curl_off_t
// and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
const long size = send_len;
curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
if (method == "POST") {
curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size);
// TODO: set to size smaller than 1MB should prevent the "Expect" field
// from being sent. So explicit removal is not needed
h = curl_slist_append(h, "Expect:");
}
}
if (h) {
curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
}
/*
** 关闭ssl功能
*/
if (!verify_ssl) {
curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
}
/*
** 传递一个void* 的参数,这个参数存储私有的一些数据,关联至handler
** 如下,curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);客户端关联一个私有数据(req_data)至此handle
** 利用curl_easy_getinfo,通过handle,取出此数据结构
** rgw_http_req_data *req_data;
** curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
** Note: 设置的是req_data指针,在CURLOPT_WRITEDATA/CURLOPT_READDATA中,传递的也是req_data的指针;在回调函数中,更新req_data内容;因此getinfo取出的req_data也是更新过的
** CURLOPT_TIMEOUT: request请求的超时时间设置
*/
curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
class RGWHTTPManager
RGWCurlHandle、RGWCurlHandles、RGWHTTPClient主要是关于client的管理、定义初始化、读写回调函数,主要是以下工作:
- curl_easy_init
- curl_easy_setopt
对于client的连接、IO等操作,主要是在RGWHTTPManager中:
- curl_multi_init
- curl_multi_add_handle
- curl_multi_fdset、select
- curl_multi_perform
- curl_multi_info_read
- curl_easy_getinfo
- curl_multi_cleanup
主要功能:
- 通过管道进行线程间通信,如注册client
- 独立线程,对所有注册的client,进行Libcurl的perform
线程处理函数, reqs_thread_entry():
class RGWHTTPManager {
/*
** client 状态
*/
struct set_state {
rgw_http_req_data *req;
int bitmask;
set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
};
CephContext *cct;
RGWCompletionManager *completion_mgr;
/*
** CURLM* curl_multi_init()
*/
void *multi_handle;
bool is_started = false;
std::atomic<unsigned> going_down { 0 };
std::atomic<unsigned> is_stopped { 0 };
/*
** 管理client的读写锁
** reqs: 需要进行http请求的client集合
** unregistered_reqs: 待
*/
ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
map<uint64_t, rgw_http_req_data *> reqs;
list<rgw_http_req_data *> unregistered_reqs;
list<set_state> reqs_change_state;
map<uint64_t, rgw_http_req_data *> complete_reqs;
int64_t num_reqs = 0;
int64_t max_threaded_req = 0;
/*
** 线程间同步
*/
int thread_pipe[2];
/*
** 注册一个libcurl client
** 1. std::unique_lock rl{reqs_lock};
** 2. req_data->id = num_reqs;
** 3. req_data->registered = true;
** 4. reqs[num_reqs] = req_data;
** 5. num_reqs++;
*/
void register_request(rgw_http_req_data *req_data);
/*
** 清理request的函数
** 1. std::unique_lock rl{reqs_lock};
** 2. _complete_request(req_data);
*/
void complete_request(rgw_http_req_data *req_data);
/*
** 清理reqs中的req_data:
** 1. map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
** 2. reqs.erase(iter);
** {
** std::lock_guard l{req_data->lock};
** req_data->mgr = nullptr;
** }
** if (completion_mgr) {
** completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
** }
** 3. 释放一个引用计数: req_data->put();
*/
void _complete_request(rgw_http_req_data *req_data);
/*
** 注销一个libcurl client,加入unregistered_reqs队列中,待主线程处理:
** 1. std::unique_lock rl{reqs_lock};
** 2. if (!req_data->registered) {
** return false;
** }
** 3. req_data->get();
** 4. req_data->registered = false;
** 5. unregistered_reqs.push_back(req_data);
*/
bool unregister_request(rgw_http_req_data *req_data);
/*
** CURLM* 中删除一个easy interface:
** unlink:
1. std::unique_lock wl{reqs_lock};
2. _unlink_request(req_data);
** _unlink_request:
** 从CURLM中删除
** 1. curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
** 判断client请求是否完成:
** - 没有完成状态,则设置返回码: ECANCELED; _finish_request中调用req_data->finish(-ECANCELED),通知client
** - 如果是完成状态,则无需在调用_finish_request,因为完成状态,说明已经正产调用过finish_request
** 2. if (!req_data->is_done())
** _finish_request(req_data, -ECANCELED);
*/
void unlink_request(rgw_http_req_data *req_data);
void _unlink_request(rgw_http_req_data *req_data);
/*
** 结束一个client的请求:
** 结束一个client请求,两种情形:
** 1. 请求完成,调用finish_request
** 2. 取消client请求,调用cancel,调用client->cancel: httpmanager->unregister_request
** finish_request(req_data,r,http_status): http_status是http返回码,status是http_status对应的本地定义状态
** 1. req_data->finish(ret, http_status);
** 2. complete_request(req_data);
** _finish_request(rgw_http_req_data *req_data, int r):
** 1. req_data->finish(ret);
** 2. complete_request(req_data);
*/
void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
void _finish_request(rgw_http_req_data *req_data, int r);
/*
** 设置req状态req->set_state >> CURLcode rc = curl_easy_pause(**curl_handle, bitmask);:
** ss.req->set_state(ss.bitmask)
*/
void _set_req_state(set_state& ss);
/*
** 将easy_interface绑定至multi handle:
** 1. CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
*/
int link_request(rgw_http_req_data *req_data);
/*
** 处理一些处于待处理队列中的client, 如:unregistered_reqs/reqs_change_state,详见函数细节
*/
void manage_pending_requests();
class ReqsThread : public Thread {
RGWHTTPManager *manager;
public:
explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
void *entry() override;
};
ReqsThread *reqs_thread = nullptr;
/*
** RGWHTTPManager处理线程,具体逻辑见函数解析
*/
void *reqs_thread_entry();
/*
** 线程间同步:
** 1. write(thread_pipe[1], (void *)&buf, sizeof(buf));
*/
int signal_thread();
public:
/*
** 构造函数:
** 1. 初始化CURLM*
** multi_handle = (void *)curl_multi_init();
*/
RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
/*
** 析构函数,清理CURLM*:
** 1. stop()
** 2. curl_multi_cleanup((CURLM *)multi_handle)
**
*/
~RGWHTTPManager();
/*
** 创建libcurl client处理线程:
** 1. pipe2(thread_pipe)
** 2. fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); 设置非阻塞
** 3. is_started = true;
** reqs_thread = new ReqsThread(this);
** reqs_thread->create("http_manager");
** 4. reqs_thread->create() >> manager->reqs_thread_entry();
*/
int start();
/*
** 关闭libcurl client处理线程:
** 1. is_stopped = true
** 2. going_down = true
** 3. signal_thread()
** 4. reqs_thread->join()
** 5. delete reqs_thread
** 6. close(thread_pipe[0])
** close(thread_pipe[1])
*/
void stop();
/*
** 注册一个client:
** 1. rgw_http_req_data *req_data = new rgw_http_req_data;
** 2. client->init_request(req_data)
** 3. register_request(req_data)
** 4. link_request(req_data)
** 5. signal_thread() 通知libcurl的线程有client的改变
*/
int add_request(RGWHTTPClient *client);
/*
** 删除一个libcurl client:
** 1. rgw_http_req_data *req_data = client->get_req_data()
** 2. unregister_request(req_data)
** 3. signal_thread()
*/
int remove_request(RGWHTTPClient *client);
/*
** 设置一个client的状态,加入reqs_change_state队列,待处理线程处理:
** 1. int bitmask = CURLPAUSE_CONT;
if (req_data->write_paused) {
bitmask |= CURLPAUSE_SEND;
}
if (req_data->read_paused) {
bitmask |= CURLPAUSE_RECV;
}
** 2. reqs_change_state.push_back(set_state(req_data, bitmask))
** 3. signal_thread()
*/
int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
};
manage_pending_requests函数:
- 处理add_request情况,reqs无变化,unregistered_reqs/reqs_change_state为空,则无需进一步处理
- 处理unregistered_reqeust情况,_unlink_request(req_data),req_data.put()
- 重新链接easy_interface与multi handle,链接失败的添加如remove_reqs中
- 处理set_request_state, 处理状态设置的情形
- 处理remove_reqs,调用_finish_request(req_data,r), r为link_request返回的错误码
代码如下:
void RGWHTTPManager::manage_pending_requests()
{
reqs_lock.lock_shared();
if (max_threaded_req == num_reqs && unregistered_reqs.empty() && reqs_change_state.empty()) {
reqs_lock.unlock_shared();
return;
}
reqs_lock.unlock_shared();
/* 处理unregistered_reqeust */
std::unique_lock wl{reqs_lock};
if (!unregistered_reqs.empty()) {
for (auto& r : unregistered_reqs) {
_unlink_request(r);
r->put();
}
unregistered_reqs.clear();
}
/* 重新链接 */
map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
list<std::pair<rgw_http_req_data *, int> > remove_reqs;
for (; iter != reqs.end(); ++iter) {
rgw_http_req_data *req_data = iter->second;
int r = link_request(req_data);
if (r < 0) {
remove_reqs.push_back(std::make_pair(iter->second, r));
} else {
max_threaded_req = iter->first + 1;
}
}
/* 处理设置状态的设置 */
if (!reqs_change_state.empty()) {
for (auto siter : reqs_change_state) {
_set_req_state(siter);
}
reqs_change_state.clear();
}
/* 处理remove_reqs */
for (auto piter : remove_reqs) {
rgw_http_req_data *req_data = piter.first;
int r = piter.second;
_finish_request(req_data, r);
}
}
RGWHTTPManager client处理线程
reqs_thread_entry,主要逻辑:
loop(!going_down){
- do_curl_wait() /* select实现对多个easy_interface及thread_pipe[0]的监听,当有读写发生时返回 */
- manage_pending_requests(): 处理待处理的client队列
- curl_multi_perform: 对所有注册的client,执行回调函数,读取/发送数据等
- curl_multi_info_read: 获取每个client的读取完成情况
}
具体代码如下:
int still_running;
int mstatus;
/* 线程loop循环 */
while (!going_down) {
/* select监听,multi handle对应的fd,以及thread_pipe[0] */
int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
if (ret < 0) {
return NULL;
}
/* 处理待处理的队列 */
manage_pending_requests();
/* 执行multi_hanle中,client的读写 */
mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
switch (mstatus) {
case CURLM_OK:
case CURLM_CALL_MULTI_PERFORM:
break;
default:
dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
break;
}
/* 获取multi_handle中所有client的读写情况 */
int msgs_left;
CURLMsg *msg;
while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
/* 表示读写完成,获取对应的client */
if (msg->msg == CURLMSG_DONE) {
int result = msg->data.result;
CURL *e = msg->easy_handle;
/* 获取设置的私有数据: req_data */
rgw_http_req_data *req_data;
curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
/* 删除multi_handle中的easy_interface */
curl_multi_remove_handle((CURLM *)multi_handle, e);
/* req_data->user_ret: 在回调函数中设置,如req_data->user_ret = client->send_data()等,非0表示error */
long http_status;
int status;
if (!req_data->user_ret) {
/* 获取client请求的http的返回码 */
curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
status = rgw_http_error_to_errno(http_status);
if (result != CURLE_OK && status == 0) {
dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
status = -EAGAIN;
}
} else {
status = *req_data->user_ret;
rgw_err err;
set_req_state_err(err, status, 0);
http_status = err.http_ret;
}
int id = req_data->id;
/* 结束client的请求,调用req_data->finish()唤醒client;调用complete_request(),清理此reqs中的client */
finish_request(req_data, status, http_status);
switch (result) {
case CURLE_OK:
break;
case CURLE_OPERATION_TIMEDOUT:
dout(0) << "WARNING: curl operation timed out, network average transfer speed less than "
<< cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
default:
dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
break;
}
}
}
}
/* 资源清理 */
std::unique_lock rl{reqs_lock};
for (auto r : unregistered_reqs) {
_unlink_request(r);
}
unregistered_reqs.clear();
auto all_reqs = std::move(reqs);
for (auto iter : all_reqs) {
_unlink_request(iter.second);
}
reqs.clear();
if (completion_mgr) {
completion_mgr->go_down();
}
return 0;
do_curl_wait实现
- select 监听multi handle及thead_pipe[0]的文件描述符
static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
{
fd_set fdread;
fd_set fdwrite;
fd_set fdexcep;
int maxfd = -1;
FD_ZERO(&fdread);
FD_ZERO(&fdwrite);
FD_ZERO(&fdexcep);
/* 获取multi handle中待处理的文件描述符 */
int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
if (ret) {
ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
return -EIO;
}
if (signal_fd > 0) {
FD_SET(signal_fd, &fdread);
if (signal_fd >= maxfd) {
maxfd = signal_fd + 1;
}
}
/* 超时设置 */
uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
#define RGW_CURL_TIMEOUT 1000
if (!to)
to = RGW_CURL_TIMEOUT;
struct timeval timeout;
timeout.tv_sec = to / 1000;
timeout.tv_usec = to % 1000;
ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
if (ret < 0) {
ret = -errno;
ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
return ret;
}
/* thread_pipe[0]有读写发生,用于同步信息,clear_signal清理此描述符上的读写通知,clear_signal: {ret = ::read(signal_fd); return ret;} */
if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
ret = clear_signal(signal_fd);
if (ret < 0) {
ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
return ret;
}
}
return 0;
}
网友评论