美文网首页
ntirpc code

ntirpc code

作者: 帆子_8c3a | 来源:发表于2019-03-11 21:04 被阅读0次

1. ONE RPC - Open Network Computing RPC

参考文档
最早由Sun研发的RPC协议,也叫SUN RPC。 Linux Kernel Client有它的的实现。nfs-ganesha借用ntirpc - Transport-Independent RPC
ONE RPC可以通过rpcgen工具对msg.x进行编译,分别产生client skeletonserver skeleton
下面是nfs-ganesha中的nfsv41.x部分代码

program NFS4_PROGRAM {
        version NFS_V4 {
                void
                        NFSPROC4_NULL(void) = 0;

                COMPOUND4res
                        NFSPROC4_COMPOUND(COMPOUND4args) = 1;

        } = 4;
} = 100003;

RPC传输的参数通过XDR - eXternal Data Representation编解码,在网络中传输。

2. rpcbind service

参考
RPC由三元组<program number, protocol (tcp/udp), version>确定。这个三元组可以从rcpbind得到相应的port。

rpcinfo -p
   program vers proto   port  service
    100000    4   tcp    111  portmapper
    100000    3   tcp    111  portmapper
    100000    2   tcp    111  portmapper
    100000    4   udp    111  portmapper
    100000    3   udp    111  portmapper
    100000    2   udp    111  portmapper
    100003    4   udp   2049  nfs
    100003    4   tcp   2049  nfs
    100011    1   udp    875  rquotad
    100011    1   tcp    875  rquotad
    100011    2   udp    875  rquotad
    100011    2   tcp    875  rquotad

NFS4固定端口是2049,不需要rpcbind service,这样可以让NFS的client穿过防火墙。

3. 代码中的一些缩写

  • ioq - io queue
  • svc - service
  • rqst - request
  • xprt - tranport
  • clnt - client

4. 网络连接部分代码

4.1 Channel

每种连接(TCP,UDP,RDMA) 会创建一个channel,每个channel由struct svc_rqst_rec表示,每个channel都创建一个epoll_fd

  • svc_rqst_new_evchan //创建channel
  • svc_rqst_lookup_chan //通过channel id得到struct svc_rqst_rec
  • svc_rqst_hook_events //将epoll事件注册到这个channel上,当epoll得到通知时,可以获得struct rpc_dplx_rec
  • svc_rqst_epoll_events //此时已经得到epoll通知,处理epoll的所有事件
  • svc_rqst_epoll_event //此时已经得到epoll通知,此函数返回struct rpc_dplx_rec
  • svc_rqst_evchan_reg //将channel和xprt建立联系,rec->ev_p = sr_rec
  • svc_rqst_xprt_register //svc_vc_rendezvous调用此函数,将newxprt和channel 1进行关联,并且newxprt和xprt建立父子联系。
  • svc_rqst_xprt_task //处理channel接收到的请求

4.2 Channel数量

数量是5。

  • id 4: UDP listen
  • id 3: TCP listen
  • id 2: rpcbind连接
  • id 1: TCP accept
  • id 0: 未用

4.3 xprt

xprt代表一个对应channel的一个连接(或实例),用struct svc_xprt表示。Channel和xprt的关系是一对多。对于同一个Client来说,Channel和xprt的关系是一对一。

  • svc_xprt_lookup //通过fd查询xprt
  • makefd_xprt //根据fd创建xprt

4.4 svc_rqst_rec数据结构

变量一般叫做sr_rec,描述一个channel。

struct svc_rqst_rec {
    struct work_pool_entry ev_wpe;
    struct opr_rbtree call_expires;
    mutex_t ev_lock;

    int sv[2];
    uint32_t id_k;      /* chan id */

    /*
     * union of event processor types
     */
    enum svc_event_type ev_type;
    union {
#if defined(TIRPC_EPOLL)
        struct {
            int epoll_fd;
            struct epoll_event ctrl_ev;
            struct epoll_event *events;
            u_int max_events;   /* max epoll events */
        } epoll;
#endif
        struct {
            fd_set set; /* select/fd_set (currently unhooked) */
        } fd;
    } ev_u;

    int32_t ev_refcnt;
    uint16_t ev_flags;
};

4.5 svc_xprt数据结构

变量名一般叫xprt,代表server端的一个连接,用makefd_xprt创建一个连接。用svc_xprt_lookup查询一个连接。

  • 对于socket fd的监听是一种连接,它的xp_recv是svc_vc_rendezvous
  • 对于accept fd的监听是一种连接,它的xp_recv是svc_vc_recv
typedef struct svc_xprt SVCXPRT;

/*
 * Server side transport handle
 */
struct svc_xprt {
    struct xp_ops {
        /* receive incoming requests */
        svc_xprt_fun_t xp_recv;

        /* get transport status */
        svc_xprt_fun_t xp_stat;

        /* decode incoming message header (called by request_cb) */
        svc_req_fun_t xp_decode;

        /* send reply */
        svc_req_fun_t xp_reply;

        /* optional checksum (after authentication/decryption) */
        void (*xp_checksum) (struct svc_req *, void *, size_t);

        /* actually destroy after xp_destroy_it and xp_release_it */
        void (*xp_destroy) (SVCXPRT *, u_int, const char *, const int);

        /* catch-all function */
        bool (*xp_control) (SVCXPRT *, const u_int, void *);

        /* free client user data */
        svc_xprt_fun_t xp_free_user_data;
    } *xp_ops;

    /* handle incoming connections (per xp_fd) */
    union {
        svc_req_fun_t process_cb;
        svc_xprt_fun_t rendezvous_cb;
    }  xp_dispatch;
    SVCXPRT *xp_parent;

    char *xp_tp;        /* transport provider device name */
    char *xp_netid;     /* network token */

    void *xp_p1;        /* private: for use by svc ops */
    void *xp_p2;        /* private: for use by svc ops */
    void *xp_p3;        /* private: for use by svc lib */
    void *xp_u1;        /* client user data */
    void *xp_u2;        /* client user data */

    struct rpc_address xp_local;    /* local address, length, port */
    struct rpc_address xp_remote;   /* remote address, length, port */

#if defined(HAVE_BLKIN)
    /* blkin tracing */
    struct {
        char *svc_name;
        struct blkin_endpoint endp;
    } blkin;
#endif
    /* serialize private data */
    mutex_t xp_lock;

    int xp_fd;
    int xp_ifindex;     /* interface index */
    int xp_si_type;     /* si type */
    int xp_type;        /* xprt type */

    int32_t xp_refcnt;  /* handle reference count */
    uint16_t xp_flags;  /* flags */
};

4.6 rpc_dplx_rec数据结构

可以通过REC_XPRT得到rec,rec是epoll事件携带的参数。

struct rpc_dplx_rec *rec = REC_XPRT(xprt);
struct rpc_dplx_rec {
    struct svc_xprt xprt;
    struct xdr_ioq ioq; //可以通过它找到rec上所有的xioq
    struct opr_rbtree call_replies;
    struct opr_rbtree_node fd_node;
    struct {
        rpc_dplx_lock_t lock;
        struct timespec ts;
    } recv;

    union {
        struct {
            struct epoll_event event;
        } epoll;
    } ev_u;
    void *ev_p;         /* struct svc_rqst_rec (internal) */

    size_t maxrec;
    long pagesz;
    u_int recvsz;
    u_int sendsz;
    uint32_t call_xid;      /**< current call xid */
    uint32_t ev_count;      /**< atomic count of waiting events */
};
#define REC_XPRT(p) (opr_containerof((p), struct rpc_dplx_rec, xprt))

4.7 网络连接相关操作

  1. Allocate_sockets //为TCP,UDP创建socket
  2. Bind_sockets //执行bind
  3. Create_tcp //执行listen,为TCP socket调用EPOLL_CTL_ADD
  4. svc_vc_rendezvous //执行accept得到connection fd,为它调用EPOLL_CTL_ADD,事件
  5. svc_vc_recv //receive for specific socket connection fd

4.8 epoll事件的关联

在svc_rqst_hook_events()中,设置ev->data.ptr = rec
用svc_rqst_epoll_event可以反向得到这个rec

4.9 svc_rqst_epoll_events中收到1个event

  1. 调用svc_rqst_epoll_event得到rec,这个是在svc_rqst_hook_events()中设置的
  2. 对于大多数情况,只有一个event,直接调用svc_rqst_xprt_task
  3. svc_rqst_xprt_task

4.10 svc_rqst_epoll_events中收到多个event

  1. 调用svc_rqst_epoll_event得到rec,这个是在svc_rqst_hook_events()中设置的
  2. 对于少数数情况,有多个event。先直接调用svc_rqst_xprt_task,且将多余的部分放到work pool里处理
  3. svc_rqst_xprt_task
  4. work_pool_submit => svc_rqst_xprt_task

4.11 典型的callstack

TCP的第一次连接,由svc_vc_rendezvous处理,在其中执行accept。accept以后的TCP连接由svc_vc_recv处理。在svc_vc_rendezvous中对创建一个新的xprt,以后的TCP连接都用xprt,并建立新的xprt的处理函数。参见svc_vc_override_ops函数。

#0  nfs_rpc_dispatch_tcp_NFS (xprt=0x7fffe0000ca0) at /root/code/nfs-ganesha-2.7/src/MainNFSD/nfs_rpc_dispatcher_thread.c:308
#1  0x00007ffff7bb489a in svc_vc_rendezvous (xprt=0x7f58e0) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_vc.c:507
#2  0x00007ffff7bb1c6b in svc_rqst_xprt_task (wpe=0x7f5af8) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:769
#3  0x00007ffff7bb20ee in svc_rqst_epoll_events (sr_rec=0x7e0680, n_events=1) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:941
#4  0x00007ffff7bb2396 in svc_rqst_epoll_loop (sr_rec=0x7e0680) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:1014
#5  0x00007ffff7bb2460 in svc_rqst_run_task (wpe=0x7e0680) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:1050
#6  0x00007ffff7bbb313 in work_pool_thread (arg=0x7fffd4000b40) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/work_pool.c:181
#7  0x00007ffff6e036ba in start_thread (arg=0x7fffd3dfd700) at pthread_create.c:333
#8  0x00007ffff693141d in clone () at ../sysdeps/unix/sysv/linux/x86_64/clone.S:109
#0  nfs4_Compound (arg=0x7f7314001268, req=0x7f7314000b60, res=0x7f7314001bc0) at /root/code/nfs-ganesha-2.7/src/Protocols/NFS/nfs4_Compound.c:618
#1  0x000000000045731a in nfs_rpc_process_request (reqdata=0x7f7314000b60) at /root/code/nfs-ganesha-2.7/src/MainNFSD/nfs_worker_thread.c:1329
#2  0x0000000000457ac1 in nfs_rpc_valid_NFS (req=0x7f7314000b60) at /root/code/nfs-ganesha-2.7/src/MainNFSD/nfs_worker_thread.c:1539
#3  0x00007f73bc7c36f5 in svc_vc_decode (req=0x7f7314000b60) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_vc.c:825
#4  0x000000000044a9b4 in nfs_rpc_decode_request (xprt=0x7f7328000b20, xdrs=0x7f73140008c0) at /root/code/nfs-ganesha-2.7/src/MainNFSD/nfs_rpc_dispatcher_thread.c:1341
#5  0x00007f73bc7c3606 in svc_vc_recv (xprt=0x7f7328000b20) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_vc.c:798
#6  0x00007f73bc7bfb40 in svc_rqst_xprt_task (wpe=0x7f7328000d38) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:767
#7  0x00007f73bc7bffe0 in svc_rqst_epoll_events (sr_rec=0x3e45de0, n_events=1) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:939
#8  0x00007f73bc7c0288 in svc_rqst_epoll_loop (sr_rec=0x3e45de0) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:1012
#9  0x00007f73bc7c0352 in svc_rqst_run_task (wpe=0x3e45de0) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/svc_rqst.c:1048
#10 0x00007f73bc7c91a1 in work_pool_thread (arg=0x7f732c000b20) at /root/code/nfs-ganesha-2.7/src/libntirpc/src/work_pool.c:181

5. 数据的接收和发送

  • svc_vc_recv: 接收数据,将读到的数据封装到XDR数据结构。
  • svc_vc_decode: 将XDR数据decode成struct rpc_msg,它定义在rfc5531
  • nfs_rpc_valid_NFS: 根据rpc_msg中的cb_prog,路由到不同的处理函数中处理
struct rpc_msg {
    u_int32_t rm_xid;
    enum msg_type rm_direction;
    struct {
        struct call_body RM_cmb;
        struct reply_body RM_rmb;
    } ru;
#define rm_call  ru.RM_cmb
#define rm_reply ru.RM_rmb

    /* New with TI-RPC */
    struct xdrpair rm_xdr;
    uint32_t rm_flags;

    /* Moved in N TI-RPC; used by auth, logging, replies */
    rpcprog_t cb_prog;
    rpcvers_t cb_vers;
    rpcproc_t cb_proc;

    struct opaque_auth cb_cred;
    struct opaque_auth cb_verf; /* protocol specific - provided by client */

    /* avoid separate alloc/free */
    char rq_cred_body[MAX_AUTH_BYTES];  /* size is excessive */
};

6. ioq (未完成)

  • svc_ioq_flushv
  • svc_ioq_write
  • svc_ioq_write_submit

XDR数据结构

typedef struct rpc_xdr {
    const struct xdr_ops {
        /* get 4 unsigned bytes from underlying stream */
        bool (*x_getunit)(struct rpc_xdr *, uint32_t *);
        /* put 4 unsigned bytes to underlying stream */
        bool (*x_putunit)(struct rpc_xdr *, const uint32_t);
        /* get some bytes from " */
        bool (*x_getbytes)(struct rpc_xdr *, char *, u_int);
        /* put some bytes to " */
        bool (*x_putbytes)(struct rpc_xdr *, const char *, u_int);
        /* returns bytes off from beginning */
        u_int (*x_getpostn)(struct rpc_xdr *);
        /* lets you reposition the stream */
        bool (*x_setpostn)(struct rpc_xdr *, u_int);
        /* free private resources of this xdr_stream */
        void (*x_destroy)(struct rpc_xdr *);
        bool (*x_control)(struct rpc_xdr *, int, void *);
        /* new vector and refcounted interfaces */
        bool (*x_getbufs)(struct rpc_xdr *, xdr_uio *, u_int);
        bool (*x_putbufs)(struct rpc_xdr *, xdr_uio *, u_int);
    } *x_ops;
    void *x_public; /* users' data */
    void *x_private; /* pointer to private data */
    void *x_lib[2]; /* RPC library private */
    uint8_t *x_data; //每encode一个元素,向前一个位置
    void *x_base;  //base 地址
    struct xdr_vio x_v; //其内部buffer是由一个uv的v提供,它里面的vio_wrap记录最大界限,如果超过需要执行xdr_ioq_putunit操作
    u_int x_handy; /* extra private word */
    u_int x_flags; /* shared flags */
    enum xdr_op x_op;  /* operation; fast additional param */
} XDR;

xdrs->x_v.vio_base是起始位置,xdrs->x_data是目前指针的位置。所以两个值之差,可以算出有多少内存放在buffer里了。

xdr_ioq数据结构

变量名一般叫xioq

struct xdr_ioq {
    XDR xdrs[1];
    struct work_pool_entry ioq_wpe; //回收时使用
    struct poolq_entry ioq_s;   //插入到rec->ioq.ioq_uv.uvqh.qh使用
    pthread_cond_t ioq_cond;
    struct poolq_head *ioq_pool; //仅仅rdma使用,不考虑
    struct xdr_ioq_uv_head ioq_uv;  //只有rec->ioq.ioq_uv才是真正的head, 其他ioq的ioq_uv无意义,这个设计太stupid
    uint64_t id;
};

struct xdr_ioq_uv_head {
    struct poolq_head uvqh; //xioq队列

    struct poolq_entry *(*uvq_fetch)(struct xdr_ioq *xioq,
                     struct poolq_head *ioqh,
                     char *comment, u_int count,
                     u_int ioq_flags);

    size_t min_bsize;   /* multiple of pagesize */
    size_t max_bsize;   /* multiple of min_bsize */
    size_t plength;     /* sub-total of previous lengths, not including
                 * any length in this xdr_ioq_uv */
    u_int pcount;       /* fill index (0..m) in the current stream */
};

xdr_ioq_uv数据结构

变量名一般叫uv或者data,uv是两种表示方式,目前用的是v,即iovec表示,u貌似RDMA时候用,暂不考虑。

struct xdr_ioq_uv
{
    struct poolq_entry uvq;

    /* spliced buffers, if any */
    struct xdr_uio u;

    /* Each xdr_ioq_uv can have a different kind of buffer or data source,
     * as indicated by the uio_flags, needing different release techniques.
     * Note: overloads uio_release with uio_p1 for pool.
     */
    struct xdr_vio v;   /* immediately follows u (uio_vio[0]) */
};

xdr_vio数据结构

参见函数xdr_ioq_uv_create

/* XDR buffer vector descriptors */
typedef struct xdr_vio {
    uint8_t *vio_base;
    uint8_t *vio_head;  /* minimum vio_tail (header offset) */
    uint8_t *vio_tail;
    uint8_t *vio_wrap;  /* maximum vio_tail */
} xdr_vio;

/* vio_wrap >= vio_tail >= vio_head >= vio_base */

svc_vc_recv分析

  1. 创建xioq,插入rec中的队列
  2. 创建uv,插入到xioq队列
  3. xdr_ioq_reset,填xioq->xdrs

xdr_ioq_uv //buffer
xdr_ioq //ioq


  • xdr_ioq_uv_create()创建缓存,返回struct xdr_ioq_uv
  • xdr_ioq_reset //fill ioq
  • xdr_ioq_uv_reset //fill XDR
  • svc_vc_recv结束前,已经把数据读入uv里面
  • svc_vc_decode,将XDR数据decode到struct rpc_msg

XDR_FLAG_VIO
xdr_ioq_uv_advance *
xdr_ioq_uv_append
svc_ioq_flushv

svc_req

struct svc_req {
    SVCXPRT *rq_xprt;   /* associated transport */

    /* New with TI-RPC */
    char *rq_clntname;  /* read only client name */
    char *rq_svcname;   /* read only cooked service cred */

    XDR *rq_xdrs;
    void *rq_u1;        /* user data */
    void *rq_u2;        /* user data */
    uint64_t rq_cksum;

    /* Moved in N TI-RPC */
    struct SVCAUTH *rq_auth;    /* auth handle */
    void *rq_ap1;       /* auth private */
    void *rq_ap2;       /* auth private */

    /* avoid separate alloc/free */
    struct rpc_msg rq_msg;

    uint32_t rq_refcnt;
};

数据收发

svc_vc_reply

  1. xdr_ioq_create 创建IOQ,此IOQ的buffer来自一个新创建的uv
  2. xdr_reply_encode
  3. SVCAUTH_WRAP,会内部调用ganesha的xdr_COMPOUND4res
  4. svc_ioq_write_now

8. 总结

  1. ntirpc为TCP listen,TCP accept创建Channel
  2. 在Channel下创建xptr
  3. 为Channel下的xtpr设定epoll事件,事件关联到具体的xptr
  4. epoll得到通知,从xptr读取数据到XDR
  5. svc_vc_decode将XDR解码成rpc_msg
  6. 根据rpc_msg的cb_prog,路由到具体的处理函数

相关文章

网友评论

      本文标题:ntirpc code

      本文链接:https://www.haomeiwen.com/subject/ijkbpqtx.html