美文网首页
Node.js介绍5-libuv的基本概念

Node.js介绍5-libuv的基本概念

作者: 转角遇见一直熊 | 来源:发表于2016-05-14 14:26 被阅读2510次

    Node.js介绍1-事件驱动非阻塞中,我们了解到node中除了v8还有一个底层依赖,那就是libuv。可以看看下图。

    node.js核心

    现在我们就看看libuv是干嘛的,了解libuv对我们了解node很有必要。

    libuv提供了什么

    libuv enforces an asynchronous, event-driven style of programming. Its core job is to provide an event loop and
    callback based notifications of I/O and other activities. libuv offers core utilities like timers, non-blocking networking support, asynchronous file system access, child processes and more.
    libuv使用异步,事件驱动的编程方式,核心是提供i/o的事件循环和异步回调。libuv的API包含有时间,非阻塞的网络,异步文件操作,子进程等等。

    为什么需要libuv

    当程序在等待i/o完成的时候,我们希望cpu不要被这个等待中的程序阻塞,libuv提供的编程方式使我们开发异步程序变得简单。

    while there are still events to process:
        e = get the next event
        if there is a callback associated with e:
            call the callback
    

    上面伪代码的事件(event)可以是:
    • 文件已经准备好写入
    • 套接字已经接受到数据,可以读了
    • 一个计时器已经到期

    libuv基础

    看一个简单的libuv程序.idle这里翻译成空转监视器。

    #include <stdio.h>
    #include <uv.h>
    
    int64_t counter = 0;
    
    void wait_for_a_while(uv_idle_t* handle, int status) {
        counter++;
    
        if (counter >= 10e6)
            uv_idle_stop(handle);
    }
    
    int main() {
        uv_idle_t idler;
    
        uv_idle_init(uv_default_loop(), &idler);
        uv_idle_start(&idler, wait_for_a_while);
    
        printf("Idling...\n");
        uv_run(uv_default_loop(), UV_RUN_DEFAULT);
    
        return 0;
    }
    
    代码 含义
    uv_idle_t 空转监视器
    uv_default_loop 默认循环,node也使用这个循环哦
    uv_idle_init 初始化
    uv_idle_start 设置事件回调函数并监听相应事件
    wait_for_a_while 回调函数
    uv_idle_stop 停止监听
    uv_run 事件循环由 uv_run函数封装, 在使用 libuv 编程时, 该函数通常在最后才被调用.由于设置了监视器, 所以调用 uv_run()时程序会阻塞, 空转监视器将会在计数器达到设定的值时停止(监视), uv_run()会退出。因为此时程序中没有活动的监视器了.

    上面uv_XXX_t是命名规范,我们看看还有哪些这种结构:

    typedef struct uv_loop_s uv_loop_t;
    typedef struct uv_err_s uv_err_t;
    typedef struct uv_handle_s uv_handle_t;
    typedef struct uv_stream_s uv_stream_t;
    typedef struct uv_tcp_s uv_tcp_t;
    typedef struct uv_udp_s uv_udp_t;
    typedef struct uv_pipe_s uv_pipe_t;
    typedef struct uv_tty_s uv_tty_t;
    typedef struct uv_poll_s uv_poll_t;
    typedef struct uv_timer_s uv_timer_t;
    typedef struct uv_prepare_s uv_prepare_t;
    typedef struct uv_check_s uv_check_t;
    typedef struct uv_idle_s uv_idle_t;
    typedef struct uv_async_s uv_async_t;
    typedef struct uv_process_s uv_process_t;
    typedef struct uv_fs_event_s uv_fs_event_t;
    typedef struct uv_fs_poll_s uv_fs_poll_t;
    typedef struct uv_signal_s uv_signal_t;
    

    上面的结构都可以像uv_idle_t这么使用哦,我们只要一个个搞清楚就掌握libuv的使用了。

    文件

    我们看05_fs_readasync_context例子, 代码可以在learnuv下载

    #include "learnuv.h"
    
    #define BUF_SIZE 37
    static const char *filename = __MAGIC_FILE__;
    
    typedef struct context_struct {
      uv_fs_t *open_req;
    } context_t;
    
    void read_cb(uv_fs_t* read_req) {
      int r = 0;
      if (read_req->result < 0) CHECK(read_req->result, "uv_fs_read callback");
    
      /* extracting our context from the read_req */
      context_t* context = read_req->data;
    
      /* 4. Report the contents of the buffer */
      log_report("%s", read_req->bufs->base);
      log_info("%s", read_req->bufs->base);
    
      free(read_req->bufs->base);
    
      /* 5. Close the file descriptor (synchronously) */
      uv_fs_t close_req;
      r = uv_fs_close(uv_default_loop(), &close_req, context->open_req->result, NULL);
      if (r < 0) CHECK(abs(r), "uv_fs_close");
    
      /* cleanup all requests and context */
      uv_fs_req_cleanup(context->open_req);
      uv_fs_req_cleanup(read_req);
      uv_fs_req_cleanup(&close_req);
      free(context);
    }
    
    void init(uv_loop_t *loop) {
      int r = 0;
    
      /* No more globals, we need to malloc each request and pass it around for later cleanup */
      uv_fs_t *open_req = malloc(sizeof(uv_fs_t));
      uv_fs_t *read_req = malloc(sizeof(uv_fs_t));
    
      context_t *context = malloc(sizeof(context_t));
      context->open_req  = open_req;
    
      /* 1. Open file */
      r = uv_fs_open(loop, open_req, filename, O_RDONLY, S_IRUSR, NULL);
      if (r < 0) CHECK(r, "uv_fs_open");
    
      /* 2. Create buffer and initialize it to turn it into a a uv_buf_t */
      size_t buf_len = sizeof(char) * BUF_SIZE;
      char *buf = malloc(buf_len);
      uv_buf_t iov = uv_buf_init(buf, buf_len);
    
      /* allow us to access the context inside read_cb */
      read_req->data = context;
    
      /* 3. Read from the file into the buffer */
      r = uv_fs_read(loop, read_req, open_req->result, &iov, 1, 0, read_cb);
      if (r < 0) CHECK(r, "uv_fs_read");
    }
    
    int main() {
      uv_loop_t *loop = uv_default_loop();
    
      init(loop);
    
      uv_run(loop, UV_RUN_DEFAULT);
    
      return 0;
    }
    
    

    这个例子展示了几个技术:

    • 如何在循环中传递数据
    • 如何异步读文件数据
    • 如何清理内存
      由于代码比较清晰,就不仔细说了。

    网络

    我们看一个回声服务的例子。这里的echo和命令行中的echo功能是一样的。这个例子用c语言写很长,而用JavaScript写则很简洁。可以看出用javascript的优势。

    #include "learnuv.h"
    #include <math.h>
    
    const static char* HOST    = "0.0.0.0"; /* localhost */
    const static int   PORT    = 7000;
    const static int   NBUFS   = 1;         /* number of buffers we write at once */
    
    static uv_tcp_t tcp_server;
    
    typedef struct {
      uv_write_t req;
      uv_buf_t buf;
    } write_req_t;
    
    /* forward declarations */
    static void close_cb(uv_handle_t* client);
    static void server_close_cb(uv_handle_t*);
    static void shutdown_cb(uv_shutdown_t*, int);
    
    static void alloc_cb(uv_handle_t*, size_t, uv_buf_t*);
    static void read_cb(uv_stream_t*, ssize_t, const uv_buf_t*);
    static void write_cb(uv_write_t*, int);
    
    static void close_cb(uv_handle_t* client) {
      free(client);
      log_info("Closed connection");
    }
    
    static void shutdown_cb(uv_shutdown_t* req, int status) {
      uv_close((uv_handle_t*) req->handle, close_cb);
      free(req);
    }
    
    static void onconnection(uv_stream_t *server, int status) {
      CHECK(status, "onconnection");
    
      int r = 0;
      uv_shutdown_t *shutdown_req;
    
      /* 4. Accept client connection */
      log_info("Accepting Connection");
    
      /* 4.1. Init client connection using `server->loop`, passing the client handle */
      uv_tcp_t *client = malloc(sizeof(uv_tcp_t));
      r = uv_tcp_init(server->loop, client);
      CHECK(r, "uv_tcp_init");
    
      /* 4.2. Accept the now initialized client connection */
      r = uv_accept(server, (uv_stream_t*) client);
      if (r) {
        log_error("trying to accept connection %d", r);
    
        shutdown_req = malloc(sizeof(uv_shutdown_t));
        r = uv_shutdown(shutdown_req, (uv_stream_t*) client, shutdown_cb);
        CHECK(r, "uv_shutdown");
      }
    
      /* 5. Start reading data from client */
      r = uv_read_start((uv_stream_t*) client, alloc_cb, read_cb);
      CHECK(r, "uv_read_start");
    }
    
    static void alloc_cb(uv_handle_t *handle, size_t size, uv_buf_t *buf) {
      /* libuv suggests a buffer size but leaves it up to us to create one of any size we see fit */
      buf->base = malloc(size);
      buf->len = size;
      if (buf->base == NULL) log_error("alloc_cb buffer didn't properly initialize");
    }
    
    static void read_cb(uv_stream_t* client, ssize_t nread, const uv_buf_t* buf) {
      int r = 0;
      uv_shutdown_t *shutdown_req;
    
      /* Errors or EOF */
      if (nread < 0) {
        if (nread != UV_EOF) CHECK(nread, "read_cb");
    
        /* Client signaled that all data has been sent, so we can close the connection and are done */
        free(buf->base);
    
        shutdown_req = malloc(sizeof(uv_shutdown_t));
        r = uv_shutdown(shutdown_req, client, shutdown_cb);
        CHECK(r, "uv_shutdown");
        return;
      }
    
      if (nread == 0) {
        /* Everything OK, but nothing read and thus we don't write anything */
        free(buf->base);
        return;
      }
    
      /* Check if we should quit the server which the client signals by sending "QUIT" */
      if (!strncmp("QUIT", buf->base, fmin(nread, 4))) {
        log_info("Closing the server");
        free(buf->base);
        /* Before exiting we need to properly close the server via uv_close */
        /* We can do this synchronously */
        uv_close((uv_handle_t*) &tcp_server, NULL);
        log_info("Closed server, exiting");
        exit(0);
      }
    
      /* 6. Write same data back to client since we are an *echo* server and thus can reuse the buffer used to read*/
      /*    We wrap the write req and buf here in order to be able to clean them both later */
      write_req_t *write_req = malloc(sizeof(write_req_t));
      write_req->buf = uv_buf_init(buf->base, nread);
      r = uv_write(&write_req->req, client, &write_req->buf, NBUFS, write_cb);
      CHECK(r, "uv_write");
    }
    
    static void write_cb(uv_write_t *req, int status) {
      CHECK(status, "write_cb");
    
      log_info("Replied to client");
    
      /* Since the req is the first field inside the wrapper write_req, we can just cast to it */
      /* Basically we are telling C to include a bit more data starting at the same memory location, which in this case is our buf */
      write_req_t *write_req = (write_req_t*) req;
      free(write_req->buf.base);
      free(write_req);
    }
    
    int main() {
      int r = 0;
      uv_loop_t *loop = uv_default_loop();
    
      /* 1. Initialize TCP server */
      r = uv_tcp_init(loop, &tcp_server);
      CHECK(r, "uv_tcp_init");
    
      /* 2. Bind to localhost:7000 */
      struct sockaddr_in addr;
      r = uv_ip4_addr(HOST, PORT, &addr);
      CHECK(r, "uv_ip4_addr");
    
      r = uv_tcp_bind(&tcp_server, (struct sockaddr*) &addr, AF_INET);
      CHECK(r, "uv_tcp_bind");
    
      /* 3. Start listening */
      /* uv_tcp_t inherits uv_stream_t so casting is ok */
      r = uv_listen((uv_stream_t*) &tcp_server, SOMAXCONN, onconnection);
      CHECK(r, "uv_listen");
      log_info("Listening on %s:%d", HOST, PORT);
    
      uv_run(loop, UV_RUN_DEFAULT);
    
      MAKE_VALGRIND_HAPPY();
    
      return 0;
    }
    

    我们看一下这里面的技术:

    • C的"继承"技术 :(uv_stream_t*) &tcp_server
    • 嵌套的回调:onconnection->read_cb->write_cb,还有出错的回调。

    由于使用了比较多的callback,导致不是很清晰,但是c语言实现异步回调就只能这样了。而async和await方式的异步编程,才是最清晰的,这个在C#和最新的ES6中都有。

    线程

    libuv对线程也进行了跨平台包装。线程在libuv中的工作主要在于支持事件循环,因为异步的调用本质上还是需要用线程实现,另外开发者也可以用线程来运行计算密集型的任务;同时,线程之间也可以互相通讯。

    我们看一个线程通信的例子, 为了接受下载情况的数据,我们使用了uv_async_senduv_async_init

    uv_loop_t *loop;
    uv_async_t async;
    
    void fake_download(uv_work_t *req) {
        int size = *((int*) req->data);
        int downloaded = 0;
        double percentage;
        while (downloaded < size) {
            percentage = downloaded*100.0/size;
            async.data = (void*) &percentage;
            uv_async_send(&async);
    
            sleep(1);
            downloaded += (200+random())%1000; // can only download max 1000bytes/sec,
                                               // but at least a 200;
        }
    }
    
    void print_progress(uv_async_t *handle, int status /*UNUSED*/) {
        double percentage = *((double*) handle->data);
        fprintf(stderr, "Downloaded %.2f%%\n", percentage);
    }
    
    void after(uv_work_t *req, int status) {
        fprintf(stderr, "Download complete\n");
        uv_close((uv_handle_t*) &async, NULL);
    }
    
    
    int main() {
        loop = uv_default_loop();
    
        uv_work_t req;
        int size = 10240;
        req.data = (void*) &size;
    
        uv_async_init(loop, &async, print_progress);
        uv_queue_work(loop, &req, fake_download, after);
    
        return uv_run(loop, UV_RUN_DEFAULT);
    }
    

    上面的uv_queue_work很有用,可以让我们把同步的程序放到队列中,在另一个线程执行(线程有线程池管理),实际上是对多线程编程接口的简化。fake_download中执行下载操作不会导致uv_default_loop阻塞,所以主循环还可以接受其他事件(比如处理更多的http请求)。

    上面fake_download使用uv_async_send(&async);async发送消息,在print_progress中处理。print_progress运行在循环线程(uv_default_loop)中。

    如果用线程,不可避免的需要同步变量,libuv 也支锁,信号量 semaphores, 条件变量 condition variables 和屏障 barriers, 相关的 API 也和 pthread 中的相似.

    进程

    libuv对进程进行了仔细的包装,屏蔽了操作系统的差异。允许进程和子进程之间通过streams和命名管道通信。

    uv_loop_t *loop;
    uv_process_t child_req;
    uv_process_options_t options;
    
    void on_exit(uv_process_t *req, int exit_status, int term_signal) {
        fprintf(stderr, "Process exited with status %d, signal %d\n", exit_status, term_signal);
        uv_close((uv_handle_t*) req, NULL);
    }
    
    int main() {
        loop = uv_default_loop();
    
        char* args[3];
        args[0] = "mkdir";
        args[1] = "test-dir";
        args[2] = NULL;
    
        options.exit_cb = on_exit;
        options.file = "mkdir";
        options.args = args;
    
        if (uv_spawn(loop, &child_req, options)) {
            fprintf(stderr, "%s\n", uv_strerror(uv_last_error(loop)));
            return 1;
        }
    
        return uv_run(loop, UV_RUN_DEFAULT);
    }
    ```
    注意看`uv_spawn`和`uv_process_t`。实际上还有管道的api给我们使用,比如`uv_pipe_init`。这个例子比较简单,在网上可以找到利用多进程实现负载均衡的例子,可以进一步研究。
    
    
    ----
    # 总结
    剩下还有很多API没有涉及,本文的目的不是翻译,而是介绍libuv的基本概念,接着在看node的代码的时候就不会觉得很多API非常不熟悉了。
    
    本文主要内容来此于:
    
    [learnuv](https://github.com/thlorenz/learnuv.git),文章中的代码都可以在这里面找到。
    [libuv-dox](https://github.com/thlorenz/libuv-dox)
    [learnuv汉语翻译](http://www.nowx.org/uvbook/index.html)

    相关文章

      网友评论

          本文标题:Node.js介绍5-libuv的基本概念

          本文链接:https://www.haomeiwen.com/subject/qkzyrttx.html