美文网首页
Ceph RGW: libcurl用法介绍

Ceph RGW: libcurl用法介绍

作者: 圣地亚哥_SVIP | 来源:发表于2020-06-29 19:13 被阅读0次

    libcurl使用

    使用libcurl访问外部http时的一个基本流程:

    1. curl_global_init(CURL_GLOBAL_ALL); //此函数只能调用一次
    /*
    ** 以下函数可以保证只调用一次
    ** long curl_global_flags = CURL_GLOBAL_ALL;
    ** std::call_once(curl_init_flag, curl_global_init, curl_global_flags);
    */
    
    2. CURLM *curlm = curl_multi_init();
    
    3. 设置easy handle:
    CURL *curl1 = NULL;
    curl1 = curl_easy_init();
    curl_easy_setopt(curl1, CURLOPT_URL, "https://stackoverflow.com/");
    curl_easy_setopt(curl1, CURLOPT_WRITEFUNCTION, writeCallback);
    
    4. 将easy handle添加至multi handle:
    curl_multi_add_handle(curlm, curl1);
    
    5. 利用select监听读写事件
    curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
    select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
    
    6. 当有状态发生变化时:
    int mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
    CURLMsg* msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left)
    /*
    ** 读取easy handle信息
    */
    curl_easy_getinfo();
    
    7. 从multi handle中清理easy handle:
    curl_multi_remove_handle((CURLM *)multi_handle, msg->easy_handle);
    
    8. 清理全局信息
    curl_multi_cleanup(multi_handle)
    curl_global_cleanup();
    

    解析Ceph中进行libcurl请求的实现:

    • class RGWCurlHandle
    • class RGWCurlHandles
    • struct rgw_http_req_data
    • class RGWHTTPClient
    • class RGWHTTPManager

    class RGWCurlHandle

    对CURL*(easy handle)的一个简单封装
    定义如下:

    struct RGWCurlHandle {
      int uses;
      /*
      ** 上次使用的时间
      */
      time_point lastuse;
      /*
      ** easy interface
      */
      CURL* h;
    
      explicit RGWCurlHandle(CURL* h) : uses(0), h(h) {};
      CURL* operator*() {
        return this->h;
      }
    };
    
    class RGWCurlHandles

    RGWCurlHandle pool,管理RGWCurlHandle的生成/释放。

    /*
    ** MAXIDLE: 表示一个easy handle多长时间未使用,则重置此easy handle
    */
    #define MAXIDLE 5
    class RGWCurlHandles : public Thread {
    public:
      std::mutex cleaner_lock;
      /*
      ** 存储RGWCCurHandle
      */
      std::vector<RGWCurlHandle*> saved_curl;
      /*
      ** 清理easy interface,终止此线程
      */
      int cleaner_shutdown;
      std::condition_variable cleaner_cond;
    
      RGWCurlHandles() :
        cleaner_shutdown{0} {
      }
    
      /*
      ** 获取curl handle
      ** 如果saved_curl为空,则新建一个easy handle:
      ** h = curl_easy_init();
      ** curl = new RGWCurlHandle{h};
      */
      RGWCurlHandle* get_curl_handle();
      void release_curl_handle_now(RGWCurlHandle* curl);
      void release_curl_handle(RGWCurlHandle* curl);
      void flush_curl_handles();
      /*
      ** 线程函数,loop循环:
      **    监听:cleaner_cond.wait_for(lock, std::chrono::seconds(MAXIDLE));
      */
      void* entry();
      void stop();
    };
    

    struct rgw_http_req_data
    http client的数据:

    /*
    ** curl_handle: easy handle
    ** curl_slist *h: http headers
    ** id: 此client的id
    ** done: 标记此client事件是否完成
    ** client: 指向httpclient
    ** control_io_id: io id
    ** user_info: 设置的用户信息
    ** registered: 标记是否已注册
    ** mgr: RGWHTTPManager
    ** lock/cond: 同步,client IO是否完成
    ** user_ret: 设置client请求结束返回的状态吗
    ** wait: client等待请求完成
    ** finish: RGWHTTPManager在client请求完成时,设置done,唤醒client;同时清理curl_easy_init返回的句柄,及curl_slist头部
    */
    struct rgw_http_req_data : public RefCountedObject {
      RGWCurlHandle *curl_handle{nullptr};
      curl_slist *h{nullptr};
      uint64_t id;
      int ret{0};
      std::atomic<bool> done = { false };
      RGWHTTPClient *client{nullptr};
      rgw_io_id control_io_id;
      void *user_info{nullptr};
      bool registered{false};
      RGWHTTPManager *mgr{nullptr};
      char error_buf[CURL_ERROR_SIZE];
      bool write_paused{false};
      bool read_paused{false};
    
      optional<int> user_ret;
    
      ceph::mutex lock = ceph::make_mutex("rgw_http_req_data::lock");
      ceph::condition_variable cond;
    
      int wait(optional_yield y);{
        ...
        std::unique_lock l{lock};
        cond.wait(l, [this]{return done==true;});
      }
      void finish(int r, long http_status = -1) {
        std::lock_guard l{lock};
        ret = r;
        do_curl_easy_cleanup(curl_handle);
        curl_slist_free_all(h);
    
        curl_handle = NULL;
        h = NULL;
        done = true;
        cond.notify_all();
      }
    
    };
    
    class RGWHTTPClient

    对rgw_http_req_data的进一步封装,client的初始化,定义了此client的一系列读写回调函数。

    基本的数据结构:

    class RGWHTTPClient : public RGWIOProvider
    {
        friend class RGWHTTPManager;
    
        /*
        ** 发送数据的缓存
        */
        bufferlist send_bl;
        bufferlist::iterator send_iter;
        bool has_send_len;
        long http_status;
        bool send_data_hint{false};
        /*
        ** 下次调用receive_data,跳过的字节数
        */
        size_t receive_pause_skip{0}; 
        void *user_info{nullptr};
    
        /*
        ** 指向rgw_htt[_req_data,具体见上述
        */
        rgw_http_req_data *req_data;
    
        bool verify_ssl; 
        std::atomic<unsigned> stopped { 0 };
    
    
    protected:
        CephContext *cct;
    
        /*
        ** 定义访问的url,http方法
        ** http头部,及超时设置
        */
        string method;
        string url;
        param_vec_t headers;
        long  req_timeout{0L};
    
        size_t send_len{0};
    
        /*
        ** client的设置:
        ** 获取CURL* 句柄: curl_easy_init
        ** 设置CURL* 句柄参数:curl_easy_setopt
        ** headers的设置等内容
        */
        int init_request(rgw_http_req_data *req_data);
    
        RGWHTTPManager *get_manager();
    
        virtual int receive_header(void *ptr, size_t len) {
          return 0;
        }
        virtual int receive_data(void *ptr, size_t len, bool *pause) {
          return 0;
        }
    
        virtual int send_data(void *ptr, size_t len, bool *pause=nullptr) {
          return 0;
        }
    
        /* Callbacks for libcurl. */
        static size_t receive_http_header(void *ptr, size_t size,size_t nmemb,void *_info);
    
        static size_t receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info);
    
        static size_t send_http_data(void *ptr,size_t size,size_t nmemb,void *_info);
    
        ceph::mutex& get_req_lock();
    
        /* needs to be called under req_lock() */
        void _set_write_paused(bool pause);
        void _set_read_paused(bool pause);
    public:
        static const long HTTP_STATUS_NOSTATUS     = 0;
        static const long HTTP_STATUS_UNAUTHORIZED = 401;
        static const long HTTP_STATUS_NOTFOUND     = 404;
    
        static constexpr int HTTPCLIENT_IO_READ    = 0x1;
        static constexpr int HTTPCLIENT_IO_WRITE   = 0x2;
        static constexpr int HTTPCLIENT_IO_CONTROL = 0x4;
    
        virtual ~RGWHTTPClient();
        explicit RGWHTTPClient(CephContext *cct,
                               const string& _method,
                               const string& _url)
          : has_send_len(false),
            http_status(HTTP_STATUS_NOSTATUS),
            req_data(nullptr),
            verify_ssl(cct->_conf->rgw_verify_ssl),
            cct(cct),
            method(_method),
            url(_url) {
        }
    
        void append_header(const string& name, const string& val);
        void set_send_length(size_t len);
        void set_send_data_hint(bool hint);
    
        long get_http_status() const;
    
        void set_http_status(long _http_status);
    
        void set_verify_ssl(bool flag);
    
        // set request timeout in seconds
        // zero (default) mean that request will never timeout
        void set_req_timeout(long timeout);
    
        int process(optional_yield y);
    
        int wait(optional_yield y);
        void cancel();
        bool is_done();
    
        rgw_http_req_data *get_req_data();
    
        string to_str();
    
        int get_req_retcode();
    
        void set_url(const string& _url);
        void set_method(const string& _method);
    
        void set_io_user_info(void *_user_info) override ;
    
        void *get_io_user_info() override ;
    };
    
    

    init_request
    client的一些初始化设置:

    • do_curl_easy_init: 从RGWCurlhandlers中获取CURL*句柄(curl_easy_init初始化的对象)
    • curl_slist *h = headers_to_slist(headers): 设置curl的头部信息
    • curl_easy_setopt: 设置curl一系列的参数

    curl_easy_setopt:

      /* 设置http请求的method */
      curl_easy_setopt(easy_handle, CURLOPT_CUSTOMREQUEST, method.c_str());
      /* 设置要访问的url */
      curl_easy_setopt(easy_handle, CURLOPT_URL, url.c_str());
      /* 关闭进度条 */
      curl_easy_setopt(easy_handle, CURLOPT_NOPROGRESS, 1L);
      /* 屏蔽信号,不屏蔽SIGPIPE信号。在多线程中需要设置此选项,否则超时机制等产生的信号会导致进程crash */
      curl_easy_setopt(easy_handle, CURLOPT_NOSIGNAL, 1L);
      /* receive headers的回调函数 */
      curl_easy_setopt(easy_handle, CURLOPT_HEADERFUNCTION, receive_http_header);
      /* 设置Header的回调函数参数的最后一个参数 */
      curl_easy_setopt(easy_handle, CURLOPT_WRITEHEADER, (void *)req_data);
    
      /* 设置下载数据的回调函数 */
      curl_easy_setopt(easy_handle, CURLOPT_WRITEFUNCTION, receive_http_data);
      /* 
      ** 设置write data回调函数参数的userdata,用来存储数据 
      ** receive_http_data(void *ptr,size_t size,size_t nmemb,void *_info): ptr指向收到的数据,_info即为req_data。数据大小:size * nmemb,将ptr中的数据拷贝至req_data
      */
      curl_easy_setopt(easy_handle, CURLOPT_WRITEDATA, (void *)req_data);
    
      /* 错误信息的buffer */
      curl_easy_setopt(easy_handle, CURLOPT_ERRORBUFFER, (void *)req_data->error_buf);
    
      /* 
      ** rgw_curl_low_speed_time: 低速传输的时间
      ** rgw_curl_low_speed_limit: 传输带宽限制
      ** 传输速度 < rgw_curl_low_speed_limit,持续rgw_curl_low_speed_time,则放弃传输 
      */
      curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_TIME, cct->_conf->rgw_curl_low_speed_time);
      curl_easy_setopt(easy_handle, CURLOPT_LOW_SPEED_LIMIT, cct->_conf->rgw_curl_low_speed_limit);
    
      /*
      ** CURLOPT_READFUNCTION: data upload的回调函数
      ** CURLOPT_READDATA: 数据读取的缓冲
      ** send_http_data(void *ptr,size_t size,size_t nmemb,void *_info): _info为req_data, 将数据从req_data拷贝至ptr
      */
      curl_easy_setopt(easy_handle, CURLOPT_READFUNCTION, send_http_data);
      curl_easy_setopt(easy_handle, CURLOPT_READDATA, (void *)req_data);
    
      /*
      ** 使能data upload功能
      */
      if (send_data_hint || is_upload_request(method)) {
        curl_easy_setopt(easy_handle, CURLOPT_UPLOAD, 1L);
      }
    
      /*
      ** CURLOPT_INFILESIZE: 要传输的文件的大小
      ** CURLOPT_POSTFIELDSIZE: 指向post data的数据大小
      ** 在上传大于1024Bytes数据时,默认设置"Expect: 100-continue" ,使得libcurl传输数据之前预先和服务器协商,是否允许上传。但是在部分服务端会存在bug或不会应答此类包。
      ** 因此设置curl_slist_append(h, "Expect:") 空头,不让libcurl添加:"Expect: 100-continue"
      ** CURLOPT_HTTPHEADER: 利用设置的头部替代内部的头,如上设置了"Expect:" 替代默认的"Expect: 100-continue"
      */
      if (has_send_len) {
        // TODO: prevent overflow by using curl_off_t
        // and: CURLOPT_INFILESIZE_LARGE, CURLOPT_POSTFIELDSIZE_LARGE
        const long size = send_len;
        curl_easy_setopt(easy_handle, CURLOPT_INFILESIZE, size);
        if (method == "POST") {
          curl_easy_setopt(easy_handle, CURLOPT_POSTFIELDSIZE, size); 
          // TODO: set to size smaller than 1MB should prevent the "Expect" field
          // from being sent. So explicit removal is not needed
          h = curl_slist_append(h, "Expect:");
        }
      }
      if (h) {
        curl_easy_setopt(easy_handle, CURLOPT_HTTPHEADER, (void *)h);
      }
    
      /*
      ** 关闭ssl功能
      */
      if (!verify_ssl) {
        curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYPEER, 0L);
        curl_easy_setopt(easy_handle, CURLOPT_SSL_VERIFYHOST, 0L);
      }
    
      /*
      ** 传递一个void* 的参数,这个参数存储私有的一些数据,关联至handler
      ** 如下,curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);客户端关联一个私有数据(req_data)至此handle
      ** 利用curl_easy_getinfo,通过handle,取出此数据结构
      **    rgw_http_req_data *req_data;
      **    curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data); 
      ** Note: 设置的是req_data指针,在CURLOPT_WRITEDATA/CURLOPT_READDATA中,传递的也是req_data的指针;在回调函数中,更新req_data内容;因此getinfo取出的req_data也是更新过的
      ** CURLOPT_TIMEOUT: request请求的超时时间设置
      */
      curl_easy_setopt(easy_handle, CURLOPT_PRIVATE, (void *)req_data);
      curl_easy_setopt(easy_handle, CURLOPT_TIMEOUT, req_timeout);
    
    
    class RGWHTTPManager

    RGWCurlHandle、RGWCurlHandles、RGWHTTPClient主要是关于client的管理、定义初始化、读写回调函数,主要是以下工作:

    • curl_easy_init
    • curl_easy_setopt

    对于client的连接、IO等操作,主要是在RGWHTTPManager中:

    • curl_multi_init
    • curl_multi_add_handle
    • curl_multi_fdset、select
    • curl_multi_perform
    • curl_multi_info_read
    • curl_easy_getinfo
    • curl_multi_cleanup

    主要功能:

    • 通过管道进行线程间通信,如注册client
    • 独立线程,对所有注册的client,进行Libcurl的perform

    线程处理函数, reqs_thread_entry():

    class RGWHTTPManager {
      /*
      ** client 状态
      */
      struct set_state {
        rgw_http_req_data *req;
        int bitmask;
    
        set_state(rgw_http_req_data *_req, int _bitmask) : req(_req), bitmask(_bitmask) {}
      };
      CephContext *cct;
      RGWCompletionManager *completion_mgr;
    
      /*
      ** CURLM* curl_multi_init()
      */
      void *multi_handle;
    
    
      bool is_started = false;
      std::atomic<unsigned> going_down { 0 };
      std::atomic<unsigned> is_stopped { 0 };
    
      /*
      ** 管理client的读写锁
      ** reqs: 需要进行http请求的client集合
      ** unregistered_reqs: 待
      */
      ceph::shared_mutex reqs_lock = ceph::make_shared_mutex("RGWHTTPManager::reqs_lock");
      map<uint64_t, rgw_http_req_data *> reqs;
      list<rgw_http_req_data *> unregistered_reqs;
      list<set_state> reqs_change_state;
      map<uint64_t, rgw_http_req_data *> complete_reqs;
      int64_t num_reqs = 0;
      int64_t max_threaded_req = 0;
    
      /*
      ** 线程间同步
      */
      int thread_pipe[2];
    
      /*
      ** 注册一个libcurl client
      ** 1. std::unique_lock rl{reqs_lock};
      ** 2. req_data->id = num_reqs;
      ** 3. req_data->registered = true;
      ** 4. reqs[num_reqs] = req_data;
      ** 5. num_reqs++;
      */
      void register_request(rgw_http_req_data *req_data);
    
      /*
      ** 清理request的函数
      ** 1. std::unique_lock rl{reqs_lock};
      ** 2. _complete_request(req_data);
      */
      void complete_request(rgw_http_req_data *req_data);
    
      /*
      ** 清理reqs中的req_data:
      ** 1. map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(req_data->id);
      ** 2. reqs.erase(iter);
      **  {
      **    std::lock_guard l{req_data->lock};
      **    req_data->mgr = nullptr;
      **  }
      **  if (completion_mgr) {
      **    completion_mgr->complete(NULL, req_data->control_io_id, req_data->user_info);
      **  }
      ** 3. 释放一个引用计数: req_data->put();
      */
      void _complete_request(rgw_http_req_data *req_data);
    
      /*
      ** 注销一个libcurl client,加入unregistered_reqs队列中,待主线程处理:
      ** 1. std::unique_lock rl{reqs_lock};
      ** 2. if (!req_data->registered) {
      **      return false;
      **    }
      ** 3. req_data->get();
      ** 4. req_data->registered = false;
      ** 5. unregistered_reqs.push_back(req_data);
      */
      bool unregister_request(rgw_http_req_data *req_data);
    
      /*
      ** CURLM* 中删除一个easy interface:
      ** unlink:
          1. std::unique_lock wl{reqs_lock};
          2. _unlink_request(req_data);
      ** _unlink_request:
      ** 从CURLM中删除
      **  1. curl_multi_remove_handle((CURLM *)multi_handle, req_data->get_easy_handle());
      ** 判断client请求是否完成:
      ** - 没有完成状态,则设置返回码: ECANCELED; _finish_request中调用req_data->finish(-ECANCELED),通知client
      ** - 如果是完成状态,则无需在调用_finish_request,因为完成状态,说明已经正产调用过finish_request
      **  2. if (!req_data->is_done()) 
      **        _finish_request(req_data, -ECANCELED);
      */
      void unlink_request(rgw_http_req_data *req_data);
      void _unlink_request(rgw_http_req_data *req_data);
    
      /*
      ** 结束一个client的请求:
      ** 结束一个client请求,两种情形:
      **  1. 请求完成,调用finish_request
      **  2. 取消client请求,调用cancel,调用client->cancel: httpmanager->unregister_request
      ** finish_request(req_data,r,http_status): http_status是http返回码,status是http_status对应的本地定义状态
      **  1. req_data->finish(ret, http_status);
      **  2. complete_request(req_data);
      ** _finish_request(rgw_http_req_data *req_data, int r):
      **  1. req_data->finish(ret);
      **  2. complete_request(req_data);
      */
      void finish_request(rgw_http_req_data *req_data, int r, long http_status = -1);
      void _finish_request(rgw_http_req_data *req_data, int r);
    
      /*
      ** 设置req状态req->set_state >> CURLcode rc = curl_easy_pause(**curl_handle, bitmask);:
      **  ss.req->set_state(ss.bitmask)
      */
      void _set_req_state(set_state& ss);
    
    
      /*
      ** 将easy_interface绑定至multi handle:
      **  1. CURLMcode mstatus = curl_multi_add_handle((CURLM *)multi_handle, req_data->get_easy_handle());
      */
      int link_request(rgw_http_req_data *req_data);
    
    
      /*
      ** 处理一些处于待处理队列中的client, 如:unregistered_reqs/reqs_change_state,详见函数细节
      */
      void manage_pending_requests();
    
      class ReqsThread : public Thread {
        RGWHTTPManager *manager;
    
      public:
        explicit ReqsThread(RGWHTTPManager *_m) : manager(_m) {}
        void *entry() override;
      };
    
      ReqsThread *reqs_thread = nullptr;
    
      /*
      ** RGWHTTPManager处理线程,具体逻辑见函数解析
      */
      void *reqs_thread_entry();
    
      /*
      ** 线程间同步:
      ** 1. write(thread_pipe[1], (void *)&buf, sizeof(buf));
      */
      int signal_thread();
    
    public:
    
      /*
      ** 构造函数:
      ** 1. 初始化CURLM*
      **    multi_handle = (void *)curl_multi_init();
      */
      RGWHTTPManager(CephContext *_cct, RGWCompletionManager *completion_mgr = NULL);
    
      /*
      ** 析构函数,清理CURLM*:
      ** 1. stop()
      ** 2. curl_multi_cleanup((CURLM *)multi_handle)
      **
      */
      ~RGWHTTPManager();
    
    
      /*
      ** 创建libcurl client处理线程:
      ** 1. pipe2(thread_pipe)
      ** 2. fcntl(thread_pipe[0], F_SETFL, O_NONBLOCK); 设置非阻塞
      ** 3. is_started = true;
      **    reqs_thread = new ReqsThread(this);
      **    reqs_thread->create("http_manager");
      ** 4. reqs_thread->create() >> manager->reqs_thread_entry();
      */
      int start();
    
      /*
      ** 关闭libcurl client处理线程:
      ** 1. is_stopped = true
      ** 2. going_down = true
      ** 3. signal_thread()
      ** 4. reqs_thread->join()
      ** 5. delete reqs_thread
      ** 6. close(thread_pipe[0])
      **    close(thread_pipe[1])
      */
      void stop();
    
      /*
      ** 注册一个client:
      **  1. rgw_http_req_data *req_data = new rgw_http_req_data;
      **  2. client->init_request(req_data)
      **  3. register_request(req_data)
      **  4. link_request(req_data)
      **  5. signal_thread() 通知libcurl的线程有client的改变
      */
      int add_request(RGWHTTPClient *client);
    
      /*
      ** 删除一个libcurl client:
      **  1. rgw_http_req_data *req_data = client->get_req_data()
      **  2. unregister_request(req_data)
      **  3. signal_thread() 
      */
      int remove_request(RGWHTTPClient *client);
    
      /*
      ** 设置一个client的状态,加入reqs_change_state队列,待处理线程处理:
      **  1. int bitmask = CURLPAUSE_CONT;
             if (req_data->write_paused) {
                bitmask |= CURLPAUSE_SEND;
             }
             if (req_data->read_paused) {
                bitmask |= CURLPAUSE_RECV;
             }
      ** 2. reqs_change_state.push_back(set_state(req_data, bitmask))
      ** 3. signal_thread()
      */
      int set_request_state(RGWHTTPClient *client, RGWHTTPRequestSetState state);
    };
    

    manage_pending_requests函数:

    • 处理add_request情况,reqs无变化,unregistered_reqs/reqs_change_state为空,则无需进一步处理
    • 处理unregistered_reqeust情况,_unlink_request(req_data),req_data.put()
    • 重新链接easy_interface与multi handle,链接失败的添加如remove_reqs中
    • 处理set_request_state, 处理状态设置的情形
    • 处理remove_reqs,调用_finish_request(req_data,r), r为link_request返回的错误码

    代码如下:

    void RGWHTTPManager::manage_pending_requests()
    {
      reqs_lock.lock_shared();
      if (max_threaded_req == num_reqs && unregistered_reqs.empty() && reqs_change_state.empty()) {
        reqs_lock.unlock_shared();
        return;
      }
      reqs_lock.unlock_shared();
    
      /* 处理unregistered_reqeust */
      std::unique_lock wl{reqs_lock};
      if (!unregistered_reqs.empty()) {
        for (auto& r : unregistered_reqs) {
          _unlink_request(r);
          r->put();
        }
    
        unregistered_reqs.clear();
      }
    
      /* 重新链接 */
      map<uint64_t, rgw_http_req_data *>::iterator iter = reqs.find(max_threaded_req);
      list<std::pair<rgw_http_req_data *, int> > remove_reqs;
      for (; iter != reqs.end(); ++iter) {
        rgw_http_req_data *req_data = iter->second;
        int r = link_request(req_data);
        if (r < 0) {
          remove_reqs.push_back(std::make_pair(iter->second, r));
        } else {
          max_threaded_req = iter->first + 1;
        }
      }
    
      /* 处理设置状态的设置 */
      if (!reqs_change_state.empty()) {
        for (auto siter : reqs_change_state) {
          _set_req_state(siter);
        }
        reqs_change_state.clear();
      }
      /* 处理remove_reqs */
      for (auto piter : remove_reqs) {
        rgw_http_req_data *req_data = piter.first;
        int r = piter.second;
        _finish_request(req_data, r);
      }
    }
    

    RGWHTTPManager client处理线程
    reqs_thread_entry,主要逻辑:
    loop(!going_down){

    • do_curl_wait() /* select实现对多个easy_interface及thread_pipe[0]的监听,当有读写发生时返回 */
    • manage_pending_requests(): 处理待处理的client队列
    • curl_multi_perform: 对所有注册的client,执行回调函数,读取/发送数据等
    • curl_multi_info_read: 获取每个client的读取完成情况
      }

    具体代码如下:

    int still_running;
    int mstatus;
    
    /* 线程loop循环 */
    while (!going_down) {
      /* select监听,multi handle对应的fd,以及thread_pipe[0] */
      int ret = do_curl_wait(cct, (CURLM *)multi_handle, thread_pipe[0]);
      if (ret < 0) {
        return NULL;
      }
    
      /* 处理待处理的队列 */
      manage_pending_requests();
    
      /* 执行multi_hanle中,client的读写 */
      mstatus = curl_multi_perform((CURLM *)multi_handle, &still_running);
      switch (mstatus) {
        case CURLM_OK:
        case CURLM_CALL_MULTI_PERFORM:
          break;
        default:
          dout(10) << "curl_multi_perform returned: " << mstatus << dendl;
          break;
      }
    
      /* 获取multi_handle中所有client的读写情况 */
      int msgs_left;
      CURLMsg *msg;
      while ((msg = curl_multi_info_read((CURLM *)multi_handle, &msgs_left))) {
        /* 表示读写完成,获取对应的client */
        if (msg->msg == CURLMSG_DONE) {
          int result = msg->data.result;
          CURL *e = msg->easy_handle;
    
          /* 获取设置的私有数据: req_data */
          rgw_http_req_data *req_data;
          curl_easy_getinfo(e, CURLINFO_PRIVATE, (void **)&req_data);
    
          /* 删除multi_handle中的easy_interface */
          curl_multi_remove_handle((CURLM *)multi_handle, e);
    
          /* req_data->user_ret: 在回调函数中设置,如req_data->user_ret = client->send_data()等,非0表示error */
          long http_status;
          int status;
          if (!req_data->user_ret) {
            /* 获取client请求的http的返回码 */
            curl_easy_getinfo(e, CURLINFO_RESPONSE_CODE, (void **)&http_status);
    
            status = rgw_http_error_to_errno(http_status);
            if (result != CURLE_OK && status == 0) {
              dout(0) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << ", maybe network unstable" << dendl;
              status = -EAGAIN;
            }
          } else {
            status = *req_data->user_ret;
            rgw_err err;
            set_req_state_err(err, status, 0);
            http_status = err.http_ret;
          }
          int id = req_data->id;
    
          /* 结束client的请求,调用req_data->finish()唤醒client;调用complete_request(),清理此reqs中的client */
          finish_request(req_data, status, http_status);
          switch (result) {
            case CURLE_OK:
              break;
            case CURLE_OPERATION_TIMEDOUT:
              dout(0) << "WARNING: curl operation timed out, network average transfer speed less than " 
                << cct->_conf->rgw_curl_low_speed_limit << " Bytes per second during " << cct->_conf->rgw_curl_low_speed_time << " seconds." << dendl;
            default:
              dout(20) << "ERROR: msg->data.result=" << result << " req_data->id=" << id << " http_status=" << http_status << dendl;
              dout(20) << "ERROR: curl error: " << curl_easy_strerror((CURLcode)result) << dendl;
        break;
          }
        }
      }
    }
    
    /* 资源清理 */
    std::unique_lock rl{reqs_lock};
    for (auto r : unregistered_reqs) {
      _unlink_request(r);
    }
    unregistered_reqs.clear();
    auto all_reqs = std::move(reqs);
    for (auto iter : all_reqs) {
      _unlink_request(iter.second);
    }
    reqs.clear();
    if (completion_mgr) {
      completion_mgr->go_down();
    }
    return 0;
    

    do_curl_wait实现

    • select 监听multi handle及thead_pipe[0]的文件描述符
    static int do_curl_wait(CephContext *cct, CURLM *handle, int signal_fd)
    {
      fd_set fdread;
      fd_set fdwrite;
      fd_set fdexcep;
      int maxfd = -1;
     
      FD_ZERO(&fdread);
      FD_ZERO(&fdwrite);
      FD_ZERO(&fdexcep);
    
      /* 获取multi handle中待处理的文件描述符 */ 
      int ret = curl_multi_fdset(handle, &fdread, &fdwrite, &fdexcep, &maxfd);
      if (ret) {
        ldout(cct, 0) << "ERROR: curl_multi_fdset returned " << ret << dendl;
        return -EIO;
      }
    
      if (signal_fd > 0) {
        FD_SET(signal_fd, &fdread);
        if (signal_fd >= maxfd) {
          maxfd = signal_fd + 1;
        }
      }
    
      /* 超时设置 */
      uint64_t to = cct->_conf->rgw_curl_wait_timeout_ms;
      #define RGW_CURL_TIMEOUT 1000
      if (!to)
        to = RGW_CURL_TIMEOUT;
      struct timeval timeout;
      timeout.tv_sec = to / 1000;
      timeout.tv_usec = to % 1000;
    
      ret = select(maxfd+1, &fdread, &fdwrite, &fdexcep, &timeout);
      if (ret < 0) {
        ret = -errno;
        ldout(cct, 0) << "ERROR: select returned " << ret << dendl;
        return ret;
      }
    
      /* thread_pipe[0]有读写发生,用于同步信息,clear_signal清理此描述符上的读写通知,clear_signal: {ret = ::read(signal_fd); return ret;} */
      if (signal_fd > 0 && FD_ISSET(signal_fd, &fdread)) {
        ret = clear_signal(signal_fd);
        if (ret < 0) {
          ldout(cct, 0) << "ERROR: " << __func__ << "(): read() returned " << ret << dendl;
          return ret;
        }
      }
    
      return 0;
    }
    

    相关文章

      网友评论

          本文标题:Ceph RGW: libcurl用法介绍

          本文链接:https://www.haomeiwen.com/subject/tzeafktx.html