美文网首页
libuv学习笔记6----利用PiPe管道实现进程通信

libuv学习笔记6----利用PiPe管道实现进程通信

作者: _李恒 | 来源:发表于2021-01-27 11:53 被阅读0次

    服务端:

    LocalServer.h

    #pragma once
    #include <iostream>
    #include <string>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
    
    using namespace std;
    #define PIPENAME "/data/pipe123.sock"
    extern "C"
    {
        #include "uv.h"
        #include "uv-errno.h"
    }
     
    class LocalServer;
     
    typedef struct{
        uv_pipe_t server_client;
        LocalServer *ts;
    }UvPipe;
     
    class LocalServer
    {
    public:
        LocalServer();
        ~LocalServer();
    
        int initLocalServer(uv_loop_t* loop);
        static void remove_sock(int sig);
        static void onPipeConnectionCb(uv_stream_t* server,int status);
        void onPipeConnection(uv_stream_t* server,int status);
        static void onPipeReadCb(uv_stream_t* client,ssize_t nread,const uv_buf_t* buf);
        void onPipeRead(uv_stream_t* client,ssize_t nread,const uv_buf_t* buf);
        static void allocPipeBufferCb(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf);
        void allocPipeBuffer(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf);
     
    private:
        uv_loop_t* mLoop;
        UvPipe pipeServer;
    };
    
    

    LocalServer.cpp

    #include "LocalServer.h"
    #include "types.h"
    #include <time.h>
    
    LocalServer::LocalServer()
    {
    }
     
    LocalServer::~LocalServer()
    {
    }
     
    
    int LocalServer::initLocalServer(uv_loop_t* loop)
    {
        if(loop==NULL)
        {
            LOG_I("mllop is null   ");
        }
        mLoop=loop;
     
        //若sock文件PIPENAME存在则删除
        uv_fs_t req;
        uv_fs_unlink(mLoop,&req,PIPENAME,NULL);
        uv_pipe_init(mLoop,(uv_pipe_t*)&pipeServer,0);
        //程序终止时删除sock文件PIPENAME
        signal(SIGINT,remove_sock);
        
        pipeServer.ts=this;
     
        int ret=uv_pipe_bind((uv_pipe_t*)&pipeServer,PIPENAME);
        if(ret)
        {
            LOG_I("Bind error ");
            return -1;
        }
     
        //typedef void (*fun)(uv_stream_t*,int);
        //fun on_pipe_connection=(fun)&LocalServer::onPipeConnection;
        //ret=uv_listen((uv_stream_t*)&pipeServer,128,on_pipe_connection);
        ret=uv_listen((uv_stream_t*)&pipeServer,128,onPipeConnectionCb);
        if(ret)
        {
            LOG_I("Listen error ");
            return -1;
        } 
        else
        {
            LOG_I("-------22222  Listen success -----");     
        }
        
        return 0;
    }
    
    void LocalServer::remove_sock(int sig)
    {
        uv_fs_t req;
        uv_loop_t* Loop=uv_default_loop();
        uv_fs_unlink(Loop,&req,PIPENAME,NULL);
        exit(0);
    }
     
    void LocalServer::onPipeConnection(uv_stream_t* server,int status)
    {
        if(status < 0)
        {
            LOG_I("New pipe connection error...");        
            return;
        }
        uv_pipe_t* pipeClient=(uv_pipe_t*)malloc(sizeof(uv_pipe_t));
        if(mLoop == NULL || pipeClient == NULL)
        {
            if(mLoop==NULL)
            {
                LOG_I("1111111111111");        
            }
            else if(pipeClient==NULL)
            {
                LOG_I("22222222222222");        
            }
            return;
        }
        uv_pipe_init(mLoop,pipeClient,0);
        int ret = uv_accept(server, (uv_stream_t*)pipeClient);
        if(ret == 0)
        {
            LOG_I("Accept success...");
            LOG_I("-------22222  Accept success... -----");     
            uv_read_start((uv_stream_t*)pipeClient, allocPipeBufferCb, onPipeReadCb);
        }
    }
    
    void LocalServer::onPipeRead(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf)
    {
        if(nread < 0)
        {
            if(nread != UV_EOF)
            {
                LOG_I("Read error...");        
            }
            uv_close((uv_handle_t*)client, NULL);
        }
        else if(nread > 0)
        {
            struct timeval tv;
            gettimeofday(&tv, NULL);
            int32_t time = (uint32_t)(tv.tv_sec * 1000000 + tv.tv_usec);
    
            LOG_I("---------------recieve-%ld------------time = %ld ", nread, time);
            free(buf->base);
        }
    }
    
    void LocalServer::allocPipeBuffer(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf)
    {
        buf->base=(char*)malloc(suggested_size);
        buf->len=suggested_size;
    }
     
    void LocalServer::onPipeConnectionCb(uv_stream_t* server,int status)
    {
        UvPipe *pipe=(UvPipe*)server;
        LocalServer *ts=pipe->ts;
        ts->onPipeConnection(server,status);
    }
     
    void LocalServer::onPipeReadCb(uv_stream_t* client,ssize_t nread,const uv_buf_t* buf)
    {
        UvPipe *handle=(UvPipe*)client;
        LocalServer *ts=handle->ts;
        ts->onPipeRead(client,nread,buf);
            
    }
     
    void LocalServer::allocPipeBufferCb(uv_handle_t* handle,size_t suggested_size,uv_buf_t* buf)
    {
        UvPipe *pipe=(UvPipe*)handle;
        LocalServer *ts=pipe->ts;
        ts->allocPipeBuffer(handle,suggested_size,buf);
    }
    

    main.cpp

    uv_loop_t mloop;
    uv_loop_init(&mloop);
    LocalServer *localServer = new LocalServer();
    localServer->initLocalServer(&mloop);
    
    uv_run(&mloop,UV_RUN_DEFAULT);
    

    客户端:

    LocalClient .h

    #pragma once
    #include <iostream>
    #include <string>
    #include <stdio.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    #include <sys/types.h>
    #include <sys/socket.h>
     
    using namespace std;
    #define PIPENAME "/data/pipe123.sock"
    extern "C"
    {
        #include "uv.h"
        #include "uv-errno.h"
    }
     
     
    class LocalClient;
     
    typedef struct{
        uv_pipe_t server_client;
        LocalClient *ts;
    }UvPipe;
     
    class LocalClient
    {
    public:
        LocalClient();
        ~LocalClient();
     
        void setStr(const char *str);
        int initPipeClient();
        
        static void on_connect_cb(uv_connect_t* req,int status);
        void on_connect(uv_connect_t* req,int status);
     
        static void write_server_cb(uv_write_t* req,int status);
        void write_server(uv_write_t* req,int status);
        void send_data(char *data, int length);
    
    private:
        uv_loop_t *loop;
        UvPipe pipeClient;
        uv_connect_t *pipeConnect;
     
        uv_connect_t* reqConnect;
    
        char cmd[20];
    };
    

    LocalClient.cpp

    #include "LocalClient.h"
    #include "types.h"
    #include <time.h>
    
    LocalClient::LocalClient()
    {
    }
     
    LocalClient::~LocalClient()
    {
    }
     
    void LocalClient::setStr(const char *str)
    {
        memset(cmd,0,20);
        strcpy(cmd,str);
    }
     
    int LocalClient::initPipeClient()
    {
        reqConnect = nullptr;
        memset(&pipeClient, 0, sizeof(pipeClient));
    
        loop=uv_default_loop();
        pipeClient.ts=this;
        uv_pipe_init(loop,(uv_pipe_t*)&pipeClient,0);
        pipeConnect=(uv_connect_t*)malloc(sizeof(uv_connect_t));
        uv_pipe_connect(pipeConnect,(uv_pipe_t*)&pipeClient,PIPENAME,on_connect_cb);
     
        uv_run(loop,UV_RUN_DEFAULT);
        return 0;
    }
    
    void LocalClient::on_connect_cb(uv_connect_t* req,int status)
    {
        UvPipe* pipe = (UvPipe*)req->handle;
        LocalClient* ts = pipe->ts;
        ts->on_connect(req, status);
    
    
        LOG_D("=======================on_connect_cb ====1111====\n");
    }
    
    void LocalClient::on_connect(uv_connect_t* req,int status)
    {
        if(status<0)
        {
            LOG_D("------------------------- New conect error...");        
        }
        else
        {
            LOG_D("----------------------- New conect sucess...");      
            reqConnect = req;
        }
    }
    
    void LocalClient::send_data(char *data, int length)
    {
        if (!reqConnect)
        {   
            //uv_stop(loop);
            initPipeClient();
            return;
        }
        uv_write_t* wr = (uv_write_t*)malloc(sizeof(uv_write_t));
    
        if(!data || length <= 0)
        {
            LOG_D("---------------------Write data is NULL ----------------"); 
            return;
        }               
        else
        {
            struct timeval tv;
            gettimeofday(&tv, NULL);
            int32_t time = (uint32_t)(tv.tv_sec * 1000000 + tv.tv_usec);
    
            LOG_D("---------------------Write start...length = %ld, time = %ld", length, time); 
            uv_buf_t buf = uv_buf_init(data, length);
            int ret = uv_write(wr, (uv_stream_t*)reqConnect->handle, &buf, 1, write_server_cb);
            if(ret < 0)
            {
                LOG_D("---------------------Write error...");        
            } else
            {
                LOG_D("---------------------Write sucess...");     
            }
        }
    }
    
    void LocalClient::write_server_cb(uv_write_t* req,int status)
    {
        UvPipe* pipe=(UvPipe*)req;
        LocalClient* ts=pipe->ts;
        ts->write_server(req,status);
    }
    
    void LocalClient::write_server(uv_write_t* req,int status)
    {  
        if(status < 0)
        {
            LOG_D("--------------------Write error...");        
        } 
        else
        {
            LOG_D("--------------------Write success...");     
        }
    }
    
    

    main.cpp

    LocalClient *localClient = new LocalClient();
    localClient->initPipeClient();
    

    相关文章

      网友评论

          本文标题:libuv学习笔记6----利用PiPe管道实现进程通信

          本文链接:https://www.haomeiwen.com/subject/avaozktx.html