美文网首页Nginx
Nginx stream(UDP)模块分析

Nginx stream(UDP)模块分析

作者: lo踏风 | 来源:发表于2018-02-28 14:45 被阅读1836次

    Nginx stream(UDP)模块分析


    ngx_stream_handler.c

    <i class="icon-file"></i> ngx_stream_init_connection函数

    代码解读:

    • 在ngx_stream_optimize_servers里设置有连接发生时的回调函数ngx_stream_init_connection.
    • 创建一个处理tcp的会话对象.
    • 创建ctx数组,用于存储模块的ctx数据,调用handler,处理tcp数据,收发等等,读事件处理函数,执行处理引擎.
    • 按阶段执行处理引擎ngx_stream_core_run_phases,调用各个模块的handler.

    ngx_stream_proxy_module.c

    <i class="icon-file"></i> ngx_stream_proxy_pass函数

    代码解读:

    • 解析proxy_pass指令,设置处理handler=ngx_stream_proxy_handler,在init建立连接之后会调用.
    • 获取一个upstream{}块的配置信息.

    <i class="icon-file"></i> ngx_stream_proxy_handler函数

    核心代码解读:

    • ngx_stream_init_connection->ngx_stream_init_session之后调用,处理请求.
    static void
    ngx_stream_proxy_handler(ngx_stream_session_t *s)
    {
        u_char                           *p;
        ngx_str_t                        *host;
        ngx_uint_t                        i;
        ngx_connection_t                 *c;
        ngx_resolver_ctx_t               *ctx, temp;
        ngx_stream_upstream_t            *u;
        ngx_stream_core_srv_conf_t       *cscf;
        ngx_stream_proxy_srv_conf_t      *pscf;
        ngx_stream_upstream_srv_conf_t   *uscf, **uscfp;
        ngx_stream_upstream_main_conf_t  *umcf;
    
        // 获取连接对象
        c = s->connection;
    
        pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
    
        ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                       "proxy connection handler");
    
        // 创建连接上游的结构体
        // 里面有如何获取负载均衡server、上下游buf等
        u = ngx_pcalloc(c->pool, sizeof(ngx_stream_upstream_t));
        if (u == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        // 关联到会话对象
        s->upstream = u;
    
        s->log_handler = ngx_stream_proxy_log_error;
    
        u->peer.log = c->log;
        u->peer.log_error = NGX_ERROR_ERR;
    
        if (ngx_stream_proxy_set_local(s, u, pscf->local) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        // 连接的类型,tcp or udp
        u->peer.type = c->type;
    
        // 开始连接后端的时间
        // 准备开始连接,设置开始时间,秒数,没有毫秒
        u->start_sec = ngx_time();
    
        // 连接的读写事件都设置为ngx_stream_proxy_downstream_handler
        // 注意这个连接是客户端发起的连接,即下游
        // 当客户端连接可读或可写时就会调用ngx_stream_proxy_downstream_handler
        c->write->handler = ngx_stream_proxy_downstream_handler;
        c->read->handler = ngx_stream_proxy_downstream_handler;
    
        // 使用数组,可能会连接多个上游服务器
        s->upstream_states = ngx_array_create(c->pool, 1,
                                              sizeof(ngx_stream_upstream_state_t));
        if (s->upstream_states == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        // 如果是tcp连接,那么创建一个缓冲区,用来接收数据
        if (c->type == SOCK_STREAM) {
            p = ngx_pnalloc(c->pool, pscf->buffer_size);
            if (p == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            // 注意是给下游使用的缓冲区
            u->downstream_buf.start = p;
            u->downstream_buf.end = p + pscf->buffer_size;
            u->downstream_buf.pos = p;
            u->downstream_buf.last = p;
    
            // 连接可读,表示客户端有数据发过来
            // 加入到&ngx_posted_events
            // 稍后由ngx_stream_proxy_downstream_handler来处理
            if (c->read->ready) {
                ngx_post_event(c->read, &ngx_posted_events);
            }
        }
    
        // udp不需要,始终用一个固定大小的数组接收数据
    
        // proxy_pass支持复杂变量
        // 如果使用了"proxy_pass $xxx",那么就要解析复杂变量
        if (pscf->upstream_value) {
            if (ngx_stream_proxy_eval(s, pscf) != NGX_OK) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
        }
    
        // 检查proxy_pass的目标地址
        if (u->resolved == NULL) {
    
            uscf = pscf->upstream;
    
        } else {
    
    #if (NGX_STREAM_SSL)
            u->ssl_name = u->resolved->host;
    #endif
    
            host = &u->resolved->host;
    
            // 获取上游的配置结构体
            // 在ngx_stream_proxy_pass里设置的
            //uscf = pscf->upstream;
    
            umcf = ngx_stream_get_module_main_conf(s, ngx_stream_upstream_module);
    
            uscfp = umcf->upstreams.elts;
    
            for (i = 0; i < umcf->upstreams.nelts; i++) {
    
                uscf = uscfp[i];
    
                if (uscf->host.len == host->len
                    && ((uscf->port == 0 && u->resolved->no_port)
                         || uscf->port == u->resolved->port)
                    && ngx_strncasecmp(uscf->host.data, host->data, host->len) == 0)
                {
                    goto found;
                }
            }
    
            if (u->resolved->sockaddr) {
    
                if (u->resolved->port == 0
                    && u->resolved->sockaddr->sa_family != AF_UNIX)
                {
                    ngx_log_error(NGX_LOG_ERR, c->log, 0,
                                  "no port in upstream \"%V\"", host);
                    ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                    return;
                }
    
                if (ngx_stream_upstream_create_round_robin_peer(s, u->resolved)
                    != NGX_OK)
                {
                    ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                    return;
                }
    
                ngx_stream_proxy_connect(s);
    
                return;
            }
    
            if (u->resolved->port == 0) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "no port in upstream \"%V\"", host);
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            temp.name = *host;
    
            cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
    
            ctx = ngx_resolve_start(cscf->resolver, &temp);
            if (ctx == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            if (ctx == NGX_NO_RESOLVER) {
                ngx_log_error(NGX_LOG_ERR, c->log, 0,
                              "no resolver defined to resolve %V", host);
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            ctx->name = *host;
            ctx->handler = ngx_stream_proxy_resolve_handler;
            ctx->data = s;
            ctx->timeout = cscf->resolver_timeout;
    
            u->resolved->ctx = ctx;
    
            if (ngx_resolve_name(ctx) != NGX_OK) {
                u->resolved->ctx = NULL;
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            return;
        }
    
    found:
    
        if (uscf == NULL) {
            ngx_log_error(NGX_LOG_ALERT, c->log, 0, "no upstream configuration");
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        u->upstream = uscf;
    
    #if (NGX_STREAM_SSL)
        u->ssl_name = uscf->host;
    #endif
    
        // 负载均衡算法初始化
        if (uscf->peer.init(s, uscf) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        // 准备开始连接,设置开始时间,毫秒
        u->peer.start_time = ngx_current_msec;
    
        // 设置负载均衡的重试次数
        if (pscf->next_upstream_tries
            && u->peer.tries > pscf->next_upstream_tries)
        {
            u->peer.tries = pscf->next_upstream_tries;
        }
    
        //u->proxy_protocol = pscf->proxy_protocol;
    
        // 最后启动连接
        // 使用ngx_peer_connection_t连接上游服务器
        // 连接失败,需要尝试下一个上游server
        // 连接成功要调用init初始化上游
        ngx_stream_proxy_connect(s);
    }
    
    // 连接上游
    // 使用ngx_peer_connection_t连接上游服务器
    // 连接失败,需要尝试下一个上游server
    // 连接成功要调用init初始化上游
    static void
    ngx_stream_proxy_connect(ngx_stream_session_t *s)
    {
        ngx_int_t                     rc;
        ngx_connection_t             *c, *pc;
        ngx_stream_upstream_t        *u;
        ngx_stream_proxy_srv_conf_t  *pscf;
    
        // 获取连接对象
        c = s->connection;
    
        c->log->action = "connecting to upstream";
    
        pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
    
        // 连接上游的结构体
        // 里面有如何获取负载均衡server、上下游buf等
        u = s->upstream;
    
        u->connected = 0;
        u->proxy_protocol = pscf->proxy_protocol;
    
        // 何时会执行这个?
        if (u->state) {
            u->state->response_time = ngx_current_msec - u->state->response_time;
        }
    
        // 把一个上游的状态添加进会话的数组
        u->state = ngx_array_push(s->upstream_states);
        if (u->state == NULL) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        ngx_memzero(u->state, sizeof(ngx_stream_upstream_state_t));
    
        // 这两个值置为-1,表示未初始化
        u->state->connect_time = (ngx_msec_t) -1;
        u->state->first_byte_time = (ngx_msec_t) -1;
    
        // 用来计算响应时间,保存当前的毫秒值
        // 之后连接成功后就会两者相减
        u->state->response_time = ngx_current_msec;
    
        // 连接上游
        // 使用ngx_peer_connection_t连接上游服务器
        // 从upstream{}里获取一个上游server地址
        // 从cycle的连接池获取一个空闲连接
        // 设置连接的数据收发接口函数
        // 向epoll添加连接,即同时添加读写事件
        // 当与上游服务器有任何数据收发时都会触发epoll
        // socket api调用连接上游服务器
        // 写事件ready,即可以立即向上游发送数据
        rc = ngx_event_connect_peer(&u->peer);
    
        ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0, "proxy connect: %i", rc);
    
        if (rc == NGX_ERROR) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        u->state->peer = u->peer.name;
    
        // 所有上游都busy
        if (rc == NGX_BUSY) {
            ngx_log_error(NGX_LOG_ERR, c->log, 0, "no live upstreams");
            ngx_stream_proxy_finalize(s, NGX_STREAM_BAD_GATEWAY);
            return;
        }
    
        // 连接失败,需要尝试下一个上游server
        if (rc == NGX_DECLINED) {
            ngx_stream_proxy_next_upstream(s);
            return;
        }
    
        /* rc == NGX_OK || rc == NGX_AGAIN || rc == NGX_DONE */
    
        // 连接“成功”,again/done表示正在连接过程中
    
        pc = u->peer.connection;
    
        pc->data = s;
        pc->log = c->log;
        pc->pool = c->pool;
        pc->read->log = c->log;
        pc->write->log = c->log;
    
        // 连接成功
        // 分配供上游读取数据的缓冲区
        // 修改上游读写事件,不再测试连接,改为ngx_stream_proxy_upstream_handler
        // 实际是ngx_stream_proxy_process_connection(ev, !ev->write);
        if (rc != NGX_AGAIN) {
            ngx_stream_proxy_init_upstream(s);
            return;
        }
    
        // again,要再次尝试连接
    
        // 设置上游的读写事件处理函数是ngx_stream_proxy_connect_handler
        // 第一次连接上游不成功后的handler
        // 当上游连接再次有读写事件发生时测试连接
        // 测试连接是否成功,失败就再试下一个上游
        // 最后还是要调用init初始化上游
        pc->read->handler = ngx_stream_proxy_connect_handler;
        pc->write->handler = ngx_stream_proxy_connect_handler;
    
        // 连接上游先写,所以设置写事件的超时时间
        ngx_add_timer(pc->write, pscf->connect_timeout);
    }
    
    // 分配供上游读取数据的缓冲区
    // 进入此函数,肯定已经成功连接了上游服务器
    // 修改上游读写事件,不再测试连接,改为ngx_stream_proxy_upstream_handler
    // 实际是ngx_stream_proxy_process_connection(ev, !ev->write);
    static void
    ngx_stream_proxy_init_upstream(ngx_stream_session_t *s)
    {
        int                           tcp_nodelay;
        u_char                       *p;
        ngx_chain_t                  *cl;
        ngx_connection_t             *c, *pc;
        ngx_log_handler_pt            handler;
        ngx_stream_upstream_t        *u;
        ngx_stream_core_srv_conf_t   *cscf;
        ngx_stream_proxy_srv_conf_t  *pscf;
    
        // u保存了上游相关的信息
        u = s->upstream;
    
        // pc是上游的连接对象
        pc = u->peer.connection;
    
        cscf = ngx_stream_get_module_srv_conf(s, ngx_stream_core_module);
    
        if (pc->type == SOCK_STREAM
            && cscf->tcp_nodelay
            && pc->tcp_nodelay == NGX_TCP_NODELAY_UNSET)
        {
            ngx_log_debug0(NGX_LOG_DEBUG_STREAM, pc->log, 0, "tcp_nodelay");
    
            tcp_nodelay = 1;
    
            if (setsockopt(pc->fd, IPPROTO_TCP, TCP_NODELAY,
                           (const void *) &tcp_nodelay, sizeof(int)) == -1)
            {
                ngx_connection_error(pc, ngx_socket_errno,
                                     "setsockopt(TCP_NODELAY) failed");
                ngx_stream_proxy_next_upstream(s);
                return;
            }
    
            pc->tcp_nodelay = NGX_TCP_NODELAY_SET;
        }
    
        pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
    
    #if (NGX_STREAM_SSL)
    
        if (pc->type == SOCK_STREAM && pscf->ssl) {
    
            if (u->proxy_protocol) {
                if (ngx_stream_proxy_send_proxy_protocol(s) != NGX_OK) {
                    return;
                }
    
                u->proxy_protocol = 0;
            }
    
            if (pc->ssl == NULL) {
                ngx_stream_proxy_ssl_init_connection(s);
                return;
            }
        }
    
    #endif
    
        // c是到客户端即下游的连接对象
        c = s->connection;
    
        if (c->log->log_level >= NGX_LOG_INFO) {
            ngx_str_t  str;
            u_char     addr[NGX_SOCKADDR_STRLEN];
    
            str.len = NGX_SOCKADDR_STRLEN;
            str.data = addr;
    
            if (ngx_connection_local_sockaddr(pc, &str, 1) == NGX_OK) {
                handler = c->log->handler;
                c->log->handler = NULL;
    
                ngx_log_error(NGX_LOG_INFO, c->log, 0,
                              "%sproxy %V connected to %V",
                              pc->type == SOCK_DGRAM ? "udp " : "",
                              &str, u->peer.name);
    
                c->log->handler = handler;
            }
        }
    
        // 计算连接使用的时间,毫秒值
        u->state->connect_time = ngx_current_msec - u->state->response_time;
    
        if (u->peer.notify) {
            u->peer.notify(&u->peer, u->peer.data,
                           NGX_STREAM_UPSTREAM_NOTIFY_CONNECT);
        }
    
        c->log->action = "proxying connection";
    
        // 检查给上游使用的缓冲区
        if (u->upstream_buf.start == NULL) {
    
            // 分配供上游读取数据的缓冲区
            p = ngx_pnalloc(c->pool, pscf->buffer_size);
            if (p == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            u->upstream_buf.start = p;
            u->upstream_buf.end = p + pscf->buffer_size;
            u->upstream_buf.pos = p;
            u->upstream_buf.last = p;
        }
    
        // 此时u里面上下游都有缓冲区了
    
        // udp处理
        // if (c->type == SOCK_DGRAM) {
        //     // 使用客户端连接的buffer计算收到的字节数
        //     s->received = c->buffer->last - c->buffer->pos;
    
        //     // nginx 1.11.x删除了此行代码!!
        //     // downstream_buf直接就是客户端连接的buffer
        //     u->downstream_buf = *c->buffer;
    
    
        // 客户端里已经发来了一些数据
        if (c->buffer && c->buffer->pos < c->buffer->last) {
            ngx_log_debug1(NGX_LOG_DEBUG_STREAM, c->log, 0,
                           "stream proxy add preread buffer: %uz",
                           c->buffer->last - c->buffer->pos);
    
            // 拿一个链表节点
            cl = ngx_chain_get_free_buf(c->pool, &u->free);
            if (cl == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            // 把连接的缓冲区关联到链表节点里
            *cl->buf = *c->buffer;
    
            cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
            cl->buf->flush = 1;
    
            // udp特殊处理,直接是最后一块数据,所以Nginx暂不支持udp会话
            cl->buf->last_buf = (c->type == SOCK_DGRAM);
    
            // 把数据挂到upstream_out里,要发给上游
            cl->next = u->upstream_out;
            u->upstream_out = cl;
        }
    
        if (u->proxy_protocol) {
            ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                           "stream proxy add PROXY protocol header");
    
            cl = ngx_chain_get_free_buf(c->pool, &u->free);
            if (cl == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            p = ngx_pnalloc(c->pool, NGX_PROXY_PROTOCOL_MAX_HEADER);
            if (p == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            cl->buf->pos = p;
    
            p = ngx_proxy_protocol_write(c, p, p + NGX_PROXY_PROTOCOL_MAX_HEADER);
            if (p == NULL) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            cl->buf->last = p;
            cl->buf->temporary = 1;
            cl->buf->flush = 0;
            cl->buf->last_buf = 0;
            cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
    
            cl->next = u->upstream_out;
            u->upstream_out = cl;
    
            u->proxy_protocol = 0;
        }
    
        if (c->type == SOCK_DGRAM && pscf->responses == 0) {
            pc->read->ready = 0;
            pc->read->eof = 1;
        }
    
        // 进入此函数,肯定已经成功连接了上游服务器
        u->connected = 1;
    
        // 修改上游读写事件,不再测试连接,改为ngx_stream_proxy_upstream_handler
        // 实际是ngx_stream_proxy_process_connection(ev, !ev->write);
        pc->read->handler = ngx_stream_proxy_upstream_handler;
        pc->write->handler = ngx_stream_proxy_upstream_handler;
    
        if (pc->read->ready || pc->read->eof) {
            ngx_post_event(pc->read, &ngx_posted_events);
        }
    
        // 参数表示上游连接,上游可写
        ngx_stream_proxy_process(s, 0, 1);
    }
    
    // 处理上下游的数据收发
    // from_upstream参数标记是否是上游,使用的是ev->write
    // 上游下游的可读可写回调函数都调用了该函数
    // 下游可写事件,from_ups =1 表示从上游读写到下游
    // 下游可读事件,from_ups =0 表示从下游读写到上游
    // 上游可写事件,from_ups =0 表示从下游读写到上游
    // 上游可读事件,from_ups =1  表示从上游读写到下游
    // 这个ev 其实是不一样的。分表代表了上游和下游 
    static void
    ngx_stream_proxy_process_connection(ngx_event_t *ev, ngx_uint_t from_upstream)
    {
        ngx_connection_t             *c, *pc;
        ngx_stream_session_t         *s;
        ngx_stream_upstream_t        *u;
        ngx_stream_proxy_srv_conf_t  *pscf;
    
        // 连接、会话、上游
        c = ev->data;
        s = c->data;
        u = s->upstream;
    
        // c是下游连接,pc是上游连接
        c = s->connection;
        pc = u->peer.connection;
    
        pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
    
        // 超时处理,没有delay则失败
        if (ev->timedout) {
            ev->timedout = 0;
    
            if (ev->delayed) {
                ev->delayed = 0;
    
                if (!ev->ready) {
                    if (ngx_handle_read_event(ev, 0) != NGX_OK) {
                        ngx_stream_proxy_finalize(s,
                                                  NGX_STREAM_INTERNAL_SERVER_ERROR);
                        return;
                    }
    
                    if (u->connected && !c->read->delayed && !pc->read->delayed) {
                        ngx_add_timer(c->write, pscf->timeout);
                    }
    
                    return;
                }
    
            } else {
                if (s->connection->type == SOCK_DGRAM) {
                    if (pscf->responses == NGX_MAX_INT32_VALUE) {
    
                        /*
                         * successfully terminate timed out UDP session
                         * with unspecified number of responses
                         */
    
                        pc->read->ready = 0;
                        pc->read->eof = 1;
    
                        ngx_stream_proxy_process(s, 1, 0);
                        return;
                    }
    
                    if (u->received == 0) {
                        ngx_stream_proxy_next_upstream(s);
                        return;
                    }
                }
    
                ngx_connection_error(c, NGX_ETIMEDOUT, "connection timed out");
                ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
                return;
            }
    
        } else if (ev->delayed) {
    
            ngx_log_debug0(NGX_LOG_DEBUG_STREAM, c->log, 0,
                           "stream connection delayed");
    
            if (ngx_handle_read_event(ev, 0) != NGX_OK) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            }
    
            return;
        }
    
        if (from_upstream && !u->connected) {
            return;
        }
    
        // 核心处理函数,处理两个连接的数据收发
        ngx_stream_proxy_process(s, from_upstream, ev->write);
    }
    
    
    // 核心处理函数,处理两个连接的数据收发
    // 可以处理上下游的数据收发
    // 参数标记是否是上游、是否写数据
    // 最终都会调用到这个处理函数
    // 无论是上游可读可写事件还是下游可读可写事件都会调用该函数
    // from_ups == 1 可能是上游的可读事件(从上游读内容),也可能是下游的可写事件(写的内容来自上游)
    static void
    ngx_stream_proxy_process(ngx_stream_session_t *s, ngx_uint_t from_upstream,
        ngx_uint_t do_write)
    {
        off_t                        *received, limit;
        size_t                        size, limit_rate;
        ssize_t                       n;
        ngx_buf_t                    *b;
        ngx_int_t                     rc;
        ngx_uint_t                    flags;
        ngx_msec_t                    delay;
        ngx_chain_t                  *cl, **ll, **out, **busy;
        ngx_connection_t             *c, *pc, *src, *dst;
        ngx_log_handler_pt            handler;
        ngx_stream_upstream_t        *u;
        ngx_stream_proxy_srv_conf_t  *pscf;
    
        // u是上游结构体
        u = s->upstream;
    
        // c是下游的连接
        c = s->connection;
    
        // pc是上游的连接,如果连接失败就是nullptr
        pc = u->connected ? u->peer.connection : NULL;
    
        // nginx处于即将停止状态,连接是udp
        // 使用连接的log记录日志
        if (c->type == SOCK_DGRAM && (ngx_terminate || ngx_exiting)) {
    
            /* socket is already closed on worker shutdown */
    
            handler = c->log->handler;
            c->log->handler = NULL;
    
            ngx_log_error(NGX_LOG_INFO, c->log, 0, "disconnected on shutdown");
    
            c->log->handler = handler;
    
            ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
            return;
        }
    
        // 取proxy模块的配置
        pscf = ngx_stream_get_module_srv_conf(s, ngx_stream_proxy_module);
    
        // 根据上下游状态决定来源和目标
        // 以及缓冲区、限速等
        // 注意使用的缓冲区指针
        if (from_upstream) {
            // 数据下行
            src = pc;
            dst = c;
    
            // 缓冲区是upstream_buf,即上游来的数据
            b = &u->upstream_buf;
    
            limit_rate = pscf->download_rate;
            received = &u->received;
            out = &u->downstream_out;
            busy = &u->downstream_busy;
    
        } else {
            // 数据上行
            src = c;
            dst = pc;
    
            // 缓冲区是downstream_buf,即下游来的数据
            // 早期downstream_buf直接就是客户端连接的buffer
            // 现在是一个正常分配的buffer
            b = &u->downstream_buf;
    
            limit_rate = pscf->upload_rate;
            received = &s->received;
            out = &u->upstream_out;
            busy = &u->upstream_busy;
        }
    
        // b指向当前需要操作的缓冲区
        // 死循环操作,直到出错或者again
        for ( ;; ) {
    
            // 如果要求写,那么把缓冲区里的数据发到dst
            if (do_write && dst) {
    
                // 条件是有数据,且dst连接是可写的
                if (*out || *busy || dst->buffered) {
    
                    // 调用filter过滤链表,过滤数据最后发出去
                    rc = ngx_stream_top_filter(s, *out, from_upstream);
    
                    if (rc == NGX_ERROR) {
                        if (c->type == SOCK_DGRAM && !from_upstream) {
                            ngx_stream_proxy_next_upstream(s);
                            return;
                        }
    
                        ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
                        return;
                    }
    
                    // 调整缓冲区链表,节约内存使用
                    ngx_chain_update_chains(c->pool, &u->free, busy, out,
                                          (ngx_buf_tag_t) &ngx_stream_proxy_module);
    
                    if (*busy == NULL) {
                        b->pos = b->start;
                        b->last = b->start;
                    }
    
                    // n = ngx_again,需要等待可写才能再次发送
                }
            }
    
            // size是缓冲区的剩余可用空间
            size = b->end - b->last;
    
            // 如果缓冲区有剩余,且src还可以读数据
            if (size && src->read->ready && !src->read->delayed
                && !src->read->error)
            {
    
                // 限速处理
                if (limit_rate) {
                    limit = (off_t) limit_rate * (ngx_time() - u->start_sec + 1)
                            - *received;
    
                    if (limit <= 0) {
                        src->read->delayed = 1;
                        delay = (ngx_msec_t) (- limit * 1000 / limit_rate + 1);
                        ngx_add_timer(src->read, delay);
                        break;
                    }
    
                    if ((off_t) size > limit) {
                        size = (size_t) limit;
                    }
                }
    
                // 尽量读满缓冲区
                n = src->recv(src, b->last, size);
    
                // nginx 1.11.x代码不同,只判断NGX_AGAIN
    
                // 如果不可读,或者已经读完
                // break结束for循环
                if (n == NGX_AGAIN) {
                    break;
                }
    
                // 出错,标记为eof
                if (n == NGX_ERROR) {
                    if (c->type == SOCK_DGRAM && u->received == 0) {
                        ngx_stream_proxy_next_upstream(s);
                        return;
                    }
    
                    src->read->eof = 1;
                    n = 0;
                }
    
                // nginx 1.11.x代码不同,判断n >= 0
                // 读取了n字节的数据
                if (n >= 0) {
    
                    // 限速
                    if (limit_rate) {
                        delay = (ngx_msec_t) (n * 1000 / limit_rate);
    
                        if (delay > 0) {
                            src->read->delayed = 1;
                            ngx_add_timer(src->read, delay);
                        }
                    }
    
                    if (from_upstream) {
                        if (u->state->first_byte_time == (ngx_msec_t) -1) {
                            u->state->first_byte_time = ngx_current_msec
                                                        - u->state->response_time;
                        }
                    }
    
                    // udp处理
                    if (c->type == SOCK_DGRAM && ++u->responses == pscf->responses)
                    {
                        src->read->ready = 0;
                        src->read->eof = 1;
                    }
    
                    // 找到链表末尾
                    for (ll = out; *ll; ll = &(*ll)->next) { /* void */ }
    
                    // 把读到的数据挂到链表末尾
                    cl = ngx_chain_get_free_buf(c->pool, &u->free);
                    if (cl == NULL) {
                        ngx_stream_proxy_finalize(s,
                                                  NGX_STREAM_INTERNAL_SERVER_ERROR);
                        return;
                    }
    
                    *ll = cl;
    
                    cl->buf->pos = b->last;
                    cl->buf->last = b->last + n;
                    cl->buf->tag = (ngx_buf_tag_t) &ngx_stream_proxy_module;
    
                    cl->buf->temporary = (n ? 1 : 0);
                    cl->buf->last_buf = src->read->eof;
                    cl->buf->flush = 1;
    
                    // 增加接收的数据字节数
                    *received += n;
    
                    // 缓冲区的末尾指针移动,表示收到了n字节新数据
                    b->last += n;
    
                    // 有数据,那么就可以继续向dst发送
                    do_write = 1;
    
                    // 回到for循环开头,继续发送数据
                    continue;
                }
            }   // 读数据部分结束
    
            break;
        }   // for循环结束
    
        // 这时应该是src已经读完,数据也发送完
        // 读取出错也会有eof标志
        if (src->read->eof && dst && (dst->read->eof || !dst->buffered)) {
            handler = c->log->handler;
            c->log->handler = NULL;
    
            ngx_log_error(NGX_LOG_INFO, c->log, 0,
                          "%s%s disconnected"
                          ", bytes from/to client:%O/%O"
                          ", bytes from/to upstream:%O/%O",
                          src->type == SOCK_DGRAM ? "udp " : "",
                          from_upstream ? "upstream" : "client",
                          s->received, c->sent, u->received, pc ? pc->sent : 0);
    
            c->log->handler = handler;
    
            // 在这里记录访问日志
            ngx_stream_proxy_finalize(s, NGX_STREAM_OK);
    
            return;
        }
    
        // 如果eof就要关闭读事件
        flags = src->read->eof ? NGX_CLOSE_EVENT : 0;
    
        if (!src->shared && ngx_handle_read_event(src->read, flags) != NGX_OK) {
            ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
            return;
        }
    
        if (dst) {
            if (!dst->shared && ngx_handle_write_event(dst->write, 0) != NGX_OK) {
                ngx_stream_proxy_finalize(s, NGX_STREAM_INTERNAL_SERVER_ERROR);
                return;
            }
    
            if (!c->read->delayed && !pc->read->delayed) {
                ngx_add_timer(c->write, pscf->timeout);
    
            } else if (c->write->timer_set) {
                ngx_del_timer(c->write);
            }
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Nginx stream(UDP)模块分析

        本文链接:https://www.haomeiwen.com/subject/duobxftx.html