美文网首页
Ceph RGW: libcurl用法介绍

Ceph RGW: libcurl用法介绍

作者: 圣地亚哥_SVIP | 来源:发表于2020-06-29 19:13 被阅读0次

libcurl使用

使用libcurl访问外部http时的一个基本流程:

1. curl_global_init(CURL_GLOBAL_ALL); //此函数只能调用一次
/*
** 以下函数可以保证只调用一次
** long curl_global_flags = CURL_GLOBAL_ALL;
** std::call_once(curl_init_flag, curl_global_init, curl_global_flags);
*/

2. CURLM *curlm = curl_multi_init();

3. 设置easy handle:
CURL *curl1 = NULL;
curl1 = curl_easy_init();
curl_easy_setopt(curl1, CURLOPT_URL, "https://stackoverflow.com/");
curl_easy_setopt(curl1, CURLOPT_WRITEFUNCTION, writeCallback);

4. 将easy handle添加至multi handle:
curl_multi_add_handle(curlm, curl1);

5. 利用select监听读写事件
curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);

6. 当有状态发生变化时:
int mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
CURLMsg* msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left)
/*
** 读取easy handle信息
*/
curl_easy_getinfo();

7. 从multi handle中清理easy handle:
curl_multi_remove_handle((CURLM *)multi_handle, msg->easy_handle);

8. 清理全局信息
curl_multi_cleanup(multi_handle)
curl_global_cleanup();

解析Ceph中进行libcurl请求的实现:

  • class RGWCurlHandle
  • class RGWCurlHandles
  • struct rgw_http_req_data
  • class RGWHTTPClient
  • class RGWHTTPManager

class RGWCurlHandle

对CURL*(easy handle)的一个简单封装
定义如下:

struct RGWCurlHandle {
  int uses;
  /*
  ** 上次使用的时间
  */
  time_point lastuse;
  /*
  ** easy interface
  */
  CURL* h;

  explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
  CURL* operator*() {
    return this->h;
  }
};
class RGWCurlHandles

RGWCurlHandle pool,管理RGWCurlHandle的生成/释放。

/*
** MAXIDLE: 表示一个easy handle多长时间未使用,则重置此easy handle
*/
#define MAXIDLE 5
class RGWCurlHandles : public Thread {
public:
  std::mutex cleaner_lock;
  /*
  ** 存储RGWCCurHandle
  */
  std::vector<RGWCurlHandle*> saved_curl;
  /*
  ** 清理easy interface,终止此线程
  */
  int cleaner_shutdown;
  std::condition_variable cleaner_cond;

  RGWCurlHandles() :
    cleaner_shutdown{0} {
  }

  /*
  ** 获取curl handle
  ** 如果saved_curl为空,则新建一个easy handle:
  ** h = curl_easy_init();
  ** curl = new RGWCurlHandle{h};
  */
  RGWCurlHandle* get_curl_handle();
  void release_curl_handle_now(RGWCurlHandle* curl);
  void release_curl_handle(RGWCurlHandle* curl);
  void flush_curl_handles();
  /*
  ** 线程函数,loop循环:
  **    监听:cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
  */
  void* entry();
  void stop();
};

struct rgw_http_req_data
http client的数据:

/*
** curl_handle: easy handle
** curl_slist *h: http headers
** id: 此client的id
** done: 标记此client事件是否完成
** client: 指向httpclient
** control_io_id: io id
** user_info: 设置的用户信息
** registered: 标记是否已注册
** mgr: RGWHTTPManager
** lock/cond: 同步,client IO是否完成
** user_ret: 设置client请求结束返回的状态吗
** wait: client等待请求完成
** finish: RGWHTTPManager在client请求完成时,设置done,唤醒client;同时清理curl_easy_init返回的句柄,及curl_slist头部
*/
struct rgw_http_req_data : public RefCountedObject {
  RGWCurlHandle *curl_handle{nullptr};
  curl_slist *h{nullptr};
  uint64_t id;
  int ret{0};
  std::atomic<bool> done = { false };
  RGWHTTPClient *client{nullptr};
  rgw_io_id control_io_id;
  void *user_info{nullptr};
  bool registered{false};
  RGWHTTPManager *mgr{nullptr};
  char error_buf[CURL_ERROR_SIZE];
  bool write_paused{false};
  bool read_paused{false};

  optional<int> user_ret;

  ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
  ceph::condition_variable cond;

  int wait(optional_yield y);{
    ...
    std::unique_lock l{lock};
    cond.wait(l, [this]{return done==true;});
  }
  void finish(int r, long http_status = -1) {
    std::lock_guard l{lock};
    ret = r;
    do_curl_easy_cleanup(curl_handle);
    curl_slist_free_all(h);

    curl_handle = NULL;
    h = NULL;
    done = true;
    cond.notify_all();
  }

};
class RGWHTTPClient

对rgw_http_req_data的进一步封装,client的初始化,定义了此client的一系列读写回调函数。

基本的数据结构:

class RGWHTTPClient : public RGWIOProvider
{
    friend class RGWHTTPManager;

    /*
    ** 发送数据的缓存
    */
    bufferlist send_bl;
    bufferlist::iterator send_iter;
    bool has_send_len;
    long http_status;
    bool send_data_hint{false};
    /*
    ** 下次调用receive_data,跳过的字节数
    */
    size_t receive_pause_skip{0}; 
    void *user_info{nullptr};

    /*
    ** 指向rgw_htt[_req_data,具体见上述
    */
    rgw_http_req_data *req_data;

    bool verify_ssl; 
    std::atomic<unsigned> stopped { 0 };


protected:
    CephContext *cct;

    /*
    ** 定义访问的url,http方法
    ** http头部,及超时设置
    */
    string method;
    string url;
    param_vec_t headers;
    long  req_timeout{0L};

    size_t send_len{0};

    /*
    ** client的设置:
    ** 获取CURL* 句柄: curl_easy_init
    ** 设置CURL* 句柄参数:curl_easy_setopt
    ** headers的设置等内容
    */
    int init_request(rgw_http_req_data *req_data);

    RGWHTTPManager *get_manager();

    virtual int receive_header(void *ptr, size_t len) {
      return 0;
    }
    virtual int receive_data(void *ptr, size_t len, bool *pause) {
      return 0;
    }

    virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
      return 0;
    }

    /* Callbacks for libcurl. */
    static size_t receive_http_header(void *ptr, size_t size,size_t nmemb,void *_info);

    static size_t receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info);

    static size_t send_http_data(void *ptr,size_t size,size_t nmemb,void *_info);

    ceph::mutex& get_req_lock();

    /* needs to be called under req_lock() */
    void _set_write_paused(bool pause);
    void _set_read_paused(bool pause);
public:
    static const long HTTP_STATUS_NOSTATUS     = 0;
    static const long HTTP_STATUS_UNAUTHORIZED = 401;
    static const long HTTP_STATUS_NOTFOUND     = 404;

    static constexpr int HTTPCLIENT_IO_READ    = 0x1;
    static constexpr int HTTPCLIENT_IO_WRITE   = 0x2;
    static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;

    virtual ~RGWHTTPClient();
    explicit RGWHTTPClient(CephContext *cct,
                           const string& _method,
                           const string& _url)
      : has_send_len(false),
        http_status(HTTP_STATUS_NOSTATUS),
        req_data(nullptr),
        verify_ssl(cct->_conf->rgw_verify_ssl),
        cct(cct),
        method(_method),
        url(_url) {
    }

    void append_header(const string& name, const string& val);
    void set_send_length(size_t len);
    void set_send_data_hint(bool hint);

    long get_http_status() const;

    void set_http_status(long _http_status);

    void set_verify_ssl(bool flag);

    // set request timeout in seconds
    // zero (default) mean that request will never timeout
    void set_req_timeout(long timeout);

    int process(optional_yield y);

    int wait(optional_yield y);
    void cancel();
    bool is_done();

    rgw_http_req_data *get_req_data();

    string to_str();

    int get_req_retcode();

    void set_url(const string& _url);
    void set_method(const string& _method);

    void set_io_user_info(void *_user_info) override ;

    void *get_io_user_info() override ;
};

init_request
client的一些初始化设置:

  • do_curl_easy_init: 从RGWCurlhandlers中获取CURL*句柄(curl_easy_init初始化的对象)
  • curl_slist *h = headers_to_slist(headers): 设置curl的头部信息
  • curl_easy_setopt: 设置curl一系列的参数

curl_easy_setopt:

  /* 设置http请求的method */
  curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
  /* 设置要访问的url */
  curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
  /* 关闭进度条 */
  curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
  /* 屏蔽信号,不屏蔽SIGPIPE信号。在多线程中需要设置此选项,否则超时机制等产生的信号会导致进程crash */
  curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
  /* receive headers的回调函数 */
  curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
  /* 设置Header的回调函数参数的最后一个参数 */
  curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);

  /* 设置下载数据的回调函数 */
  curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
  /* 
  ** 设置write data回调函数参数的userdata,用来存储数据 
  ** receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info): ptr指向收到的数据,_info即为req_data。数据大小:size * nmemb,将ptr中的数据拷贝至req_data
  */
  curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);

  /* 错误信息的buffer */
  curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);

  /* 
  ** rgw_curl_low_speed_time: 低速传输的时间
  ** rgw_curl_low_speed_limit: 传输带宽限制
  ** 传输速度 < rgw_curl_low_speed_limit,持续rgw_curl_low_speed_time,则放弃传输 
  */
  curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
  curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);

  /*
  ** CURLOPT_READFUNCTION: data upload的回调函数
  ** CURLOPT_READDATA: 数据读取的缓冲
  ** send_http_data(void *ptr,size_t size,size_t nmemb,void *_info): _info为req_data, 将数据从req_data拷贝至ptr
  */
  curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
  curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);

  /*
  ** 使能data upload功能
  */
  if (send_data_hint || is_upload_request(method)) {
    curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
  }

  /*
  ** CURLOPT_INFILESIZE: 要传输的文件的大小
  ** CURLOPT_POSTFIELDSIZE: 指向post data的数据大小
  ** 在上传大于1024Bytes数据时,默认设置"Expect: 100-continue" ,使得libcurl传输数据之前预先和服务器协商,是否允许上传。但是在部分服务端会存在bug或不会应答此类包。
  ** 因此设置curl_slist_append(h, "Expect:") 空头,不让libcurl添加:"Expect: 100-continue"
  ** CURLOPT_HTTPHEADER: 利用设置的头部替代内部的头,如上设置了"Expect:" 替代默认的"Expect: 100-continue"
  */
  if (has_send_len) {
    // TODO: prevent overflow by using curl_off_t
    // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
    const long size = send_len;
    curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
    if (method == "POST") {
      curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size); 
      // TODO: set to size smaller than 1MB should prevent the "Expect" field
      // from being sent. So explicit removal is not needed
      h = curl_slist_append(h, "Expect:");
    }
  }
  if (h) {
    curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
  }

  /*
  ** 关闭ssl功能
  */
  if (!verify_ssl) {
    curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
    curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
  }

  /*
  ** 传递一个void* 的参数,这个参数存储私有的一些数据,关联至handler
  ** 如下,curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);客户端关联一个私有数据(req_data)至此handle
  ** 利用curl_easy_getinfo,通过handle,取出此数据结构
  **    rgw_http_req_data *req_data;
  **    curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); 
  ** Note: 设置的是req_data指针,在CURLOPT_WRITEDATA/CURLOPT_READDATA中,传递的也是req_data的指针;在回调函数中,更新req_data内容;因此getinfo取出的req_data也是更新过的
  ** CURLOPT_TIMEOUT: request请求的超时时间设置
  */
  curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
  curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);

class RGWHTTPManager

RGWCurlHandle、RGWCurlHandles、RGWHTTPClient主要是关于client的管理、定义初始化、读写回调函数,主要是以下工作:

  • curl_easy_init
  • curl_easy_setopt

对于client的连接、IO等操作,主要是在RGWHTTPManager中:

  • curl_multi_init
  • curl_multi_add_handle
  • curl_multi_fdset、select
  • curl_multi_perform
  • curl_multi_info_read
  • curl_easy_getinfo
  • curl_multi_cleanup

主要功能:

  • 通过管道进行线程间通信,如注册client
  • 独立线程,对所有注册的client,进行Libcurl的perform

线程处理函数, reqs_thread_entry():

class RGWHTTPManager {
  /*
  ** client 状态
  */
  struct set_state {
    rgw_http_req_data *req;
    int bitmask;

    set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
  };
  CephContext *cct;
  RGWCompletionManager *completion_mgr;

  /*
  ** CURLM* curl_multi_init()
  */
  void *multi_handle;


  bool is_started = false;
  std::atomic<unsigned> going_down { 0 };
  std::atomic<unsigned> is_stopped { 0 };

  /*
  ** 管理client的读写锁
  ** reqs: 需要进行http请求的client集合
  ** unregistered_reqs: 待
  */
  ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
  map<uint64_t, rgw_http_req_data *> reqs;
  list<rgw_http_req_data *> unregistered_reqs;
  list<set_state> reqs_change_state;
  map<uint64_t, rgw_http_req_data *> complete_reqs;
  int64_t num_reqs = 0;
  int64_t max_threaded_req = 0;

  /*
  ** 线程间同步
  */
  int thread_pipe[2];

  /*
  ** 注册一个libcurl client
  ** 1. std::unique_lock rl{reqs_lock};
  ** 2. req_data->id = num_reqs;
  ** 3. req_data->registered = true;
  ** 4. reqs[num_reqs] = req_data;
  ** 5. num_reqs++;
  */
  void register_request(rgw_http_req_data *req_data);

  /*
  ** 清理request的函数
  ** 1. std::unique_lock rl{reqs_lock};
  ** 2. _complete_request(req_data);
  */
  void complete_request(rgw_http_req_data *req_data);

  /*
  ** 清理reqs中的req_data:
  ** 1. map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
  ** 2. reqs.erase(iter);
  **  {
  **    std::lock_guard l{req_data->lock};
  **    req_data->mgr = nullptr;
  **  }
  **  if (completion_mgr) {
  **    completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
  **  }
  ** 3. 释放一个引用计数: req_data->put();
  */
  void _complete_request(rgw_http_req_data *req_data);

  /*
  ** 注销一个libcurl client,加入unregistered_reqs队列中,待主线程处理:
  ** 1. std::unique_lock rl{reqs_lock};
  ** 2. if (!req_data->registered) {
  **      return false;
  **    }
  ** 3. req_data->get();
  ** 4. req_data->registered = false;
  ** 5. unregistered_reqs.push_back(req_data);
  */
  bool unregister_request(rgw_http_req_data *req_data);

  /*
  ** CURLM* 中删除一个easy interface:
  ** unlink:
      1. std::unique_lock wl{reqs_lock};
      2. _unlink_request(req_data);
  ** _unlink_request:
  ** 从CURLM中删除
  **  1. curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
  ** 判断client请求是否完成:
  ** - 没有完成状态,则设置返回码: ECANCELED; _finish_request中调用req_data->finish(-ECANCELED),通知client
  ** - 如果是完成状态,则无需在调用_finish_request,因为完成状态,说明已经正产调用过finish_request
  **  2. if (!req_data->is_done()) 
  **        _finish_request(req_data, -ECANCELED);
  */
  void unlink_request(rgw_http_req_data *req_data);
  void _unlink_request(rgw_http_req_data *req_data);

  /*
  ** 结束一个client的请求:
  ** 结束一个client请求,两种情形:
  **  1. 请求完成,调用finish_request
  **  2. 取消client请求,调用cancel,调用client->cancel: httpmanager->unregister_request
  ** finish_request(req_data,r,http_status): http_status是http返回码,status是http_status对应的本地定义状态
  **  1. req_data->finish(ret, http_status);
  **  2. complete_request(req_data);
  ** _finish_request(rgw_http_req_data *req_data, int r):
  **  1. req_data->finish(ret);
  **  2. complete_request(req_data);
  */
  void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
  void _finish_request(rgw_http_req_data *req_data, int r);

  /*
  ** 设置req状态req->set_state >> CURLcode rc = curl_easy_pause(**curl_handle, bitmask);:
  **  ss.req->set_state(ss.bitmask)
  */
  void _set_req_state(set_state& ss);


  /*
  ** 将easy_interface绑定至multi handle:
  **  1. CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
  */
  int link_request(rgw_http_req_data *req_data);


  /*
  ** 处理一些处于待处理队列中的client, 如:unregistered_reqs/reqs_change_state,详见函数细节
  */
  void manage_pending_requests();

  class ReqsThread : public Thread {
    RGWHTTPManager *manager;

  public:
    explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
    void *entry() override;
  };

  ReqsThread *reqs_thread = nullptr;

  /*
  ** RGWHTTPManager处理线程,具体逻辑见函数解析
  */
  void *reqs_thread_entry();

  /*
  ** 线程间同步:
  ** 1. write(thread_pipe[1], (void *)&buf, sizeof(buf));
  */
  int signal_thread();

public:

  /*
  ** 构造函数:
  ** 1. 初始化CURLM*
  **    multi_handle = (void *)curl_multi_init();
  */
  RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);

  /*
  ** 析构函数,清理CURLM*:
  ** 1. stop()
  ** 2. curl_multi_cleanup((CURLM *)multi_handle)
  **
  */
  ~RGWHTTPManager();


  /*
  ** 创建libcurl client处理线程:
  ** 1. pipe2(thread_pipe)
  ** 2. fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); 设置非阻塞
  ** 3. is_started = true;
  **    reqs_thread = new ReqsThread(this);
  **    reqs_thread->create("http_manager");
  ** 4. reqs_thread->create() >> manager->reqs_thread_entry();
  */
  int start();

  /*
  ** 关闭libcurl client处理线程:
  ** 1. is_stopped = true
  ** 2. going_down = true
  ** 3. signal_thread()
  ** 4. reqs_thread->join()
  ** 5. delete reqs_thread
  ** 6. close(thread_pipe[0])
  **    close(thread_pipe[1])
  */
  void stop();

  /*
  ** 注册一个client:
  **  1. rgw_http_req_data *req_data = new rgw_http_req_data;
  **  2. client->init_request(req_data)
  **  3. register_request(req_data)
  **  4. link_request(req_data)
  **  5. signal_thread() 通知libcurl的线程有client的改变
  */
  int add_request(RGWHTTPClient *client);

  /*
  ** 删除一个libcurl client:
  **  1. rgw_http_req_data *req_data = client->get_req_data()
  **  2. unregister_request(req_data)
  **  3. signal_thread() 
  */
  int remove_request(RGWHTTPClient *client);

  /*
  ** 设置一个client的状态,加入reqs_change_state队列,待处理线程处理:
  **  1. int bitmask = CURLPAUSE_CONT;
         if (req_data->write_paused) {
            bitmask |= CURLPAUSE_SEND;
         }
         if (req_data->read_paused) {
            bitmask |= CURLPAUSE_RECV;
         }
  ** 2. reqs_change_state.push_back(set_state(req_data, bitmask))
  ** 3. signal_thread()
  */
  int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
};

manage_pending_requests函数:

  • 处理add_request情况,reqs无变化,unregistered_reqs/reqs_change_state为空,则无需进一步处理
  • 处理unregistered_reqeust情况,_unlink_request(req_data),req_data.put()
  • 重新链接easy_interface与multi handle,链接失败的添加如remove_reqs中
  • 处理set_request_state, 处理状态设置的情形
  • 处理remove_reqs,调用_finish_request(req_data,r), r为link_request返回的错误码

代码如下:

void RGWHTTPManager::manage_pending_requests()
{
  reqs_lock.lock_shared();
  if (max_threaded_req == num_reqs && unregistered_reqs.empty() && reqs_change_state.empty()) {
    reqs_lock.unlock_shared();
    return;
  }
  reqs_lock.unlock_shared();

  /* 处理unregistered_reqeust */
  std::unique_lock wl{reqs_lock};
  if (!unregistered_reqs.empty()) {
    for (auto& r : unregistered_reqs) {
      _unlink_request(r);
      r->put();
    }

    unregistered_reqs.clear();
  }

  /* 重新链接 */
  map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
  list<std::pair<rgw_http_req_data *, int> > remove_reqs;
  for (; iter != reqs.end(); ++iter) {
    rgw_http_req_data *req_data = iter->second;
    int r = link_request(req_data);
    if (r < 0) {
      remove_reqs.push_back(std::make_pair(iter->second, r));
    } else {
      max_threaded_req = iter->first + 1;
    }
  }

  /* 处理设置状态的设置 */
  if (!reqs_change_state.empty()) {
    for (auto siter : reqs_change_state) {
      _set_req_state(siter);
    }
    reqs_change_state.clear();
  }
  /* 处理remove_reqs */
  for (auto piter : remove_reqs) {
    rgw_http_req_data *req_data = piter.first;
    int r = piter.second;
    _finish_request(req_data, r);
  }
}

RGWHTTPManager client处理线程
reqs_thread_entry,主要逻辑:
loop(!going_down){

  • do_curl_wait() /* select实现对多个easy_interface及thread_pipe[0]的监听,当有读写发生时返回 */
  • manage_pending_requests(): 处理待处理的client队列
  • curl_multi_perform: 对所有注册的client,执行回调函数,读取/发送数据等
  • curl_multi_info_read: 获取每个client的读取完成情况
    }

具体代码如下:

int still_running;
int mstatus;

/* 线程loop循环 */
while (!going_down) {
  /* select监听,multi handle对应的fd,以及thread_pipe[0] */
  int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
  if (ret < 0) {
    return NULL;
  }

  /* 处理待处理的队列 */
  manage_pending_requests();

  /* 执行multi_hanle中,client的读写 */
  mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
  switch (mstatus) {
    case CURLM_OK:
    case CURLM_CALL_MULTI_PERFORM:
      break;
    default:
      dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
      break;
  }

  /* 获取multi_handle中所有client的读写情况 */
  int msgs_left;
  CURLMsg *msg;
  while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
    /* 表示读写完成,获取对应的client */
    if (msg->msg == CURLMSG_DONE) {
      int result = msg->data.result;
      CURL *e = msg->easy_handle;

      /* 获取设置的私有数据: req_data */
      rgw_http_req_data *req_data;
      curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);

      /* 删除multi_handle中的easy_interface */
      curl_multi_remove_handle((CURLM *)multi_handle, e);

      /* req_data->user_ret: 在回调函数中设置,如req_data->user_ret = client->send_data()等,非0表示error */
      long http_status;
      int status;
      if (!req_data->user_ret) {
        /* 获取client请求的http的返回码 */
        curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);

        status = rgw_http_error_to_errno(http_status);
        if (result != CURLE_OK && status == 0) {
          dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
          status = -EAGAIN;
        }
      } else {
        status = *req_data->user_ret;
        rgw_err err;
        set_req_state_err(err, status, 0);
        http_status = err.http_ret;
      }
      int id = req_data->id;

      /* 结束client的请求,调用req_data->finish()唤醒client;调用complete_request(),清理此reqs中的client */
      finish_request(req_data, status, http_status);
      switch (result) {
        case CURLE_OK:
          break;
        case CURLE_OPERATION_TIMEDOUT:
          dout(0) << "WARNING: curl operation timed out, network average transfer speed less than " 
            << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
        default:
          dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
          dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
    break;
      }
    }
  }
}

/* 资源清理 */
std::unique_lock rl{reqs_lock};
for (auto r : unregistered_reqs) {
  _unlink_request(r);
}
unregistered_reqs.clear();
auto all_reqs = std::move(reqs);
for (auto iter : all_reqs) {
  _unlink_request(iter.second);
}
reqs.clear();
if (completion_mgr) {
  completion_mgr->go_down();
}
return 0;

do_curl_wait实现

  • select 监听multi handle及thead_pipe[0]的文件描述符
static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
{
  fd_set fdread;
  fd_set fdwrite;
  fd_set fdexcep;
  int maxfd = -1;
 
  FD_ZERO(&fdread);
  FD_ZERO(&fdwrite);
  FD_ZERO(&fdexcep);

  /* 获取multi handle中待处理的文件描述符 */ 
  int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
  if (ret) {
    ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
    return -EIO;
  }

  if (signal_fd > 0) {
    FD_SET(signal_fd, &fdread);
    if (signal_fd >= maxfd) {
      maxfd = signal_fd + 1;
    }
  }

  /* 超时设置 */
  uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
  #define RGW_CURL_TIMEOUT 1000
  if (!to)
    to = RGW_CURL_TIMEOUT;
  struct timeval timeout;
  timeout.tv_sec = to / 1000;
  timeout.tv_usec = to % 1000;

  ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
  if (ret < 0) {
    ret = -errno;
    ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
    return ret;
  }

  /* thread_pipe[0]有读写发生,用于同步信息,clear_signal清理此描述符上的读写通知,clear_signal: {ret = ::read(signal_fd); return ret;} */
  if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
    ret = clear_signal(signal_fd);
    if (ret < 0) {
      ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
      return ret;
    }
  }

  return 0;
}

相关文章

网友评论

      本文标题:Ceph RGW: libcurl用法介绍

      本文链接:https://www.haomeiwen.com/subject/tzeafktx.html