美文网首页
跨全平台高性能HttpClient尝试用OpenSocket开发

跨全平台高性能HttpClient尝试用OpenSocket开发

作者: openlinyou | 来源:发表于2023-03-08 23:07 被阅读0次

    OpenSocket是一个跨全平台的高性能网络并发库。

    它使用了高性能IO,Linux和安卓用epoll,Win32用IOCP,iOS和Mac用kqueue,其他系统使用select。

    本文用这种高性能socket库,设计开发一个HttpClient。

    为了开发方面,我们使用OpenThread作为线程库。OpenThread可以实现多线程三大设计模式,开发这个HttpClient,使用Worker模式。

    设计思路如下:

    1. 每个HttpClient是一条线程OpenThreader。一个HttpClient对象,可以处理任意个Http请求。它由Factory类管理和创建。

    2. 一次Http请求就是一个task,从Factory挑选一个HttpClient对象,向该对象发送task。并进行阻塞等待结果。

    3. HttpClient对象收到task消息。把IP和端口做参数,调用OpenSocket的connect。
      connect有两个作用,一个是执行网络连接产生一个fd,同时把这个fd加入到poll,fd与HttpClient对象的线程id进行绑定。
      fd加入到poll成功以后,该socket的任何消息可以通过线程Id,发到对应的HttpClient。

    4. HttpClient对象接收到socket的open消息后,向服务器发送http报文。

    5. 因为connect把fd和线程id进行绑定。所以,HttpClient会收到服务器返回的Http报文。

    6. HttpClient接收完Http报文,就唤醒请求线程。请求线程被唤醒,拿到Http请求数据。

    测试例子是获取交易所的最新龙虎数据。

    具体源码如下:

    #include <assert.h>
    #include <time.h>
    #include <math.h>
    #include <map>
    #include "open/openthread.h"
    #include "opensocket.h"
    using namespace open;
    
    //请求http对象,包含返回对象。
    class HttpRequest
    {
        std::string url_;
    public:
        int port_;
        std::string host_;
        std::string ip_;
        std::string path_;
        std::string method_;
        std::string body_;
            //http请求头
        std::map<std::string, std::string> headers_;
        HttpRequest() :port_(80) {}
        std::string& operator[](const std::string& key) { return headers_[key]; }
        //指定url,并进行解析和域名解析
        void setUrl(const std::string& url)
        {
            if (url.empty()) return;
            url_ = url;
            int len = (int)url.length();
            char* ptr = (char*)url.c_str();
            if (len >= 8)
            {
                if (memcmp(ptr, "http://", strlen("http://")) == 0)
                    ptr += strlen("http://");
                else if (memcmp(ptr, "https://", strlen("https://")) == 0)
                    ptr += strlen("https://");
            }
            const char* tmp = strstr(ptr, "/");
            path_.clear();
            if (tmp != 0)
            {
                path_.append(tmp);
                host_.clear();
                host_.append(ptr, tmp - ptr);
            }
            else
            {
                host_ = ptr;
            }
            port_ = 80;
            ip_.clear();
            ptr = (char*)host_.c_str();
            tmp = strstr(ptr, ":");
            if (tmp != 0)
            {
                ip_.append(ptr, tmp - ptr);
                tmp += 1;
                port_ = atoi(tmp);
            }
            else
            {
                ip_ = ptr;
            }
            //域名解析,把域名转ip。可以缓存,提供效率
            ip_ = OpenSocket::DomainNameToIp(ip_);
        }
        inline void operator=(const std::string& url) { setUrl(url); }
        //http返回对象
        struct HttpResponse
        {
            int code_;
            int clen_;
            std::string head_;
            std::string body_;
            //std::multimap<std::string, std::string> headers_;
            std::map<std::string, std::string> headers_;
            std::string& operator[](const std::string& key) { return headers_[key]; }
            HttpResponse():code_(0), clen_(0) {}
            //解析返回http消息头
            void parseHeader()
            {
                if (!headers_.empty() || head_.size() < 12) return;
                std::string line;
                const char* ptr = strstr(head_.c_str(), "\r\n");
                if (!ptr) return;
                code_ = 0;
                clen_ = 0;
                line.append(head_.c_str(), ptr - head_.c_str());
                for (size_t i = 0; i < line.size(); i++)
                {
                    if (line[i] == ' ')
                    {
                        while (i < line.size() && line[i] == ' ') ++i;
                        code_ = std::atoi(line.data() + i);
                        break;
                    }
                }
                if (code_ <= 0) return;
                line.clear();
                int k = -1;
                int j = -1;
                std::string key;
                std::string value;
                for (size_t i = ptr - head_.c_str() + 2; i < head_.size() - 1; i++)
                {
                    if (head_[i] == '\r' && head_[i + 1] == '\n')
                    {
                        if (j >  0)
                        {
                            k = 0;
                            while (k < line.size() && line[k] == ' ') ++k;
                            while (k >= 0 && line.back() == ' ') line.pop_back();
                            value = line.data() + j + 1;
                            while (j >= 0 && line[j] == ' ') j--;
                            key.clear();
                            key.append(line.data(), j);
                            for (size_t x = 0; x < key.size(); x++)
                                key[x] = std::tolower(key[x]);
                            headers_[key] = value;
                        }
                        ++i;
                        j = -1;
                        line.clear();
                        continue;
                    }
                    line.push_back(head_[i]);
                    if (j < 0 && line.back() == ':')
                    {
                        j = line.size() - 1;
                    }
                }
                clen_ = std::atoi(headers_["content-length"].c_str());
            }
        };
        HttpResponse response_;
        //阻塞当前线程,等待http消息返回,才继续执行。
        OpenSync openSync_;
    };
    
    //OpenThread的线程之间通信数据结构,用isSocket_区别是socket消息还是http请求消息
    struct BaseProto
    {
        bool isSocket_;
    };
    //携带OpenSocket消息的数据结构,isSocket_=true
    struct SocketProto : public BaseProto
    {
        std::shared_ptr<OpenSocketMsg> data_;
    };
    //携带http请求消息的数据结构,isSocket_=false
    struct TaskProto : public BaseProto
    {
        int fd_;
        OpenSync openSync_;
        std::shared_ptr<HttpRequest> request_;
    };
    
    //应用程序单利,封装OpenSocket,一个进程只有一个对象。
    class App
    {
        //OpenSocketMsg需要手动释放,放到智能指针,由智能指针释放
        static void SocketFunc(const OpenSocketMsg* msg)
        {
            if (!msg) return;
            //msg需要手动delete,把它托管给智能指针
            auto proto = std::shared_ptr<SocketProto>(new SocketProto);
            //OpenThread的线程id >= 0,所以只处理非负数的条件
            if (msg->uid_ >= 0)
            {
                proto->isSocket_ = true;
                proto->data_ = std::shared_ptr<OpenSocketMsg>((OpenSocketMsg*)msg);
                //msg->uid_是绑定的线程id,向该线程派发socket消息
                if (!OpenThread::Send((int)msg->uid_, proto))
                    printf("SocketFunc dispatch faild pid = %lld\n", msg->uid_);
            }
        }
    public:
        static App Instance_;
        //OpenSocket对象,可以设计成单利
        OpenSocket openSocket_;
        //App构造的时候,启动OpenSocket。
        App() {  openSocket_.run(App::SocketFunc); }
    };
    App App::Instance_;
    
    //HttpClient线程类,Factory管理一组线程。
    class HttpClient : public OpenThreader
    {
        //Factory
        class Factory
        {
            const std::vector<HttpClient*> vectWorker_;
        public:
            Factory()
                :vectWorker_({
                    new HttpClient("HttpClient1"),
                    new HttpClient("HttpClient2"),
                    new HttpClient("HttpClient3"),
                    new HttpClient("HttpClient4"),
                    }) {}
                //采用随机方式,提供一个线程
            HttpClient* getWorker()
            {
                if (vectWorker_.empty()) return 0;
                return vectWorker_[std::rand() % vectWorker_.size()];
            }
        };
        static Factory Instance_;
    
        // name是线程名,必须制定。在Linux上,top -Hp可以看到这个线程名。
        HttpClient(const std::string& name)
            :OpenThreader(name)
        {
            start();
        }
        ~HttpClient()
        {
            //销毁之前,尽可能唤醒请求线程,防止请求线程阻塞
            for (auto iter = mapFdToTask_.begin(); iter != mapFdToTask_.end(); iter++)
                iter->second.openSync_.wakeup();
        }
        //处理请求http线程发过来的消息
        void onHttp(TaskProto& proto)
        {
            auto& request = proto.request_;
            //连接Http服务器,并把fd与当前线程绑定。该socket的全部消息,都发到此线程
            proto.fd_ = App::Instance_.openSocket_.connect(pid(), request->ip_, request->port_);
            request->response_.code_ = -1;
            request->response_.head_.clear();
            request->response_.body_.clear();
            //fd与任务绑定到任务列表
            mapFdToTask_[proto.fd_] = proto;
        }
        //与http服务器连接成功以后,发送http请求报文
        void onSend(const std::shared_ptr<OpenSocketMsg>& data)
        {
            //需要判断fd绑定的task是否存在,否则关闭与Http服务器的连接
            auto iter = mapFdToTask_.find(data->fd_);
            if (iter == mapFdToTask_.end())
            {
                App::Instance_.openSocket_.close(pid(), data->fd_);
                return;
            }
            auto& task = iter->second;
            auto& request = task.request_;
            std::string buffer = request->method_ + " " + request->path_ + " HTTP/1.1 \r\n";
            auto iter1 = request->headers_.begin();
            for (; iter1 != request->headers_.end(); iter1++)
            {
                buffer.append(iter1->first + ": " + iter1->second + "\r\n");
            }
            if (!request->body_.empty())
            {
                buffer.append("Content-Length:" + std::to_string(request->body_.size()) + "\r\n\r\n");
                buffer.append(request->body_);
                buffer.append("\r\n");
            }
            else
            {
                buffer.append("\r\n");
            }
            //制作好Http请求报文,发送给服务器。
            App::Instance_.openSocket_.send(task.fd_, buffer.data(), (int)buffer.size());
        }
        //处理Http服务器发送过了socket数据流,拼成完整的Http返回报文
        void onRead(const std::shared_ptr<OpenSocketMsg>& data)
        {
            //Http任务列表没有绑定fd的任务,就对该fd关闭。
            auto iter = mapFdToTask_.find(data->fd_);
            if (iter == mapFdToTask_.end())
            {
                App::Instance_.openSocket_.close(pid(), data->fd_);
                return;
            }
            auto& task = iter->second;
            auto& response = task.request_->response_;
            //处理返回http头
            if (response.code_ == -1)
            {
                response.head_.append(data->data(), data->size());
                const char* ptr = strstr(response.head_.data(), "\r\n\r\n");
                if (!ptr) return;
                response.code_ = 0;
                response.body_.append(ptr + 4);
                response.head_.resize(ptr - response.head_.data() + 2);
                response.parseHeader();
            }
            //处理返回的http的body
            else
            {
                response.body_.append(data->data(), data->size());
            }
            if (response.clen_ > 0)
            {
                if (response.clen_ >= response.body_.size())
                    response.body_.resize(response.clen_);
                App::Instance_.openSocket_.close(pid(), data->fd_);
            }
            else if (response.body_.size() > 2)
            {
                if (response.body_[response.body_.size() - 2] == '\r' && response.body_.back() == '\n')
                {
                    response.body_.pop_back();
                    response.body_.pop_back();
                    App::Instance_.openSocket_.close(pid(), data->fd_);
                }
            }
        }
        //与Http服务器关闭的消息,唤醒请求线程,并对fd绑定的任务,移出任务列表
        void onClose(const std::shared_ptr<OpenSocketMsg>& data)
        {
            auto iter = mapFdToTask_.find(data->fd_);
            if (iter != mapFdToTask_.end())
            {
                iter->second.openSync_.wakeup();
                mapFdToTask_.erase(iter);
            }
        }
        //接收绑定此线程的socket消息。
        void onSocket(const SocketProto& proto)
        {
            const auto& msg = proto.data_;
            switch (msg->type_)
            {
            case OpenSocket::ESocketData:
                onRead(msg);
                break;
            case OpenSocket::ESocketClose:
                onClose(msg);
                break;
            case OpenSocket::ESocketError:
                printf("[%s]ESocketError:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
                onClose(msg);
                break;
            case OpenSocket::ESocketWarning:
                printf("[%s]ESocketWarning:%s\n", ThreadName((int)msg->uid_).c_str(), msg->info());
                break;
            case OpenSocket::ESocketOpen:
                onSend(msg);
                break;
            case OpenSocket::ESocketAccept:
            case OpenSocket::ESocketUdp:
                assert(false);
                break;
            default:
                break;
            }
        }
        //处理static bool Http(std::shared_ptr<HttpRequest>& request)发过来的消息
        virtual void onMsg(OpenThreadMsg& msg)
        {
            const BaseProto* data = msg.data<BaseProto>();
            if (!data) return;
            if (!data->isSocket_)
            {
                TaskProto* proto = msg.edit<TaskProto>();
                if (proto) onHttp(*proto);
            }
            else
            {
                const SocketProto* proto = msg.data<SocketProto>();
                if (proto) onSocket(*proto);
            }
        }
        std::map<int, TaskProto> mapFdToTask_;
    public:
        static bool Http(std::shared_ptr<HttpRequest>& request)
        {
            if (request->ip_.empty())
            {
                assert(false);
                return false;
            }
            //类型线程池中,选择一个。
            auto worker = Instance_.getWorker();
            if (!worker)  return false;
            auto proto = std::shared_ptr<TaskProto>(new TaskProto);
            proto->request_ = request;
            proto->isSocket_ = false;
            //接收消息地方:virtual void onMsg(OpenThreadMsg& msg)
            bool ret = OpenThread::Send(worker->pid(), proto);
            assert(ret);
            //阻塞,等待http请求完成唤醒
            proto->openSync_.await();
            return ret;
        }
    };
    HttpClient::Factory HttpClient::Instance_;
    
    
    int main()
    {
        auto request = std::shared_ptr<HttpRequest>(new HttpRequest);
        //请求交易所的最新龙虎数据
        request->setUrl("http://reportdocs.static.szse.cn/files/text/jy/jy230308.txt");
        request->method_ = "GET";
    
        //自定义Http请求头
        (*request)["Host"] = "reportdocs.static.szse.cn";
        (*request)["Accept"] = "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7";
        (*request)["Accept-Encoding"] = "gzip,deflate";
        (*request)["Accept-Language"] = "zh-CN,zh;q=0.9";
        (*request)["Cache-Control"] = "max-age=0";
        (*request)["User-Agent"] = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36(KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36";
        (*request)["Upgrade-Insecure-Requests"] = "1";
    
        //发送http请求
        HttpClient::Http(request);
        //返回http请求
        auto& response = request->response_;
        printf("code:%d, header:%s\n", response.code_, response.head_.c_str());
        return getchar();
    }
    

    编译和执行

    请安装cmake工具,用cmake可以构建出VS或者XCode工程,就可以在vs或者xcode上编译运行。
    源代码:https://github.com/openlinyou/opensocket
    https://gitee.com/linyouhappy/opensocket

    #克隆项目
    git clone https://github.com/openlinyou/opensocket
    cd ./opensocket
    #创建build工程目录
    mkdir build
    cd build
    cmake ..
    #如果是win32,在该目录出现opensocket.sln,点击它就可以启动vs写代码调试
    make
    ./httpclient
    

    相关文章

      网友评论

          本文标题:跨全平台高性能HttpClient尝试用OpenSocket开发

          本文链接:https://www.haomeiwen.com/subject/nwfaldtx.html