美文网首页QNX操作系统
QNX之编写资源管理器(一)

QNX之编写资源管理器(一)

作者: Loyen | 来源:发表于2019-01-08 14:50 被阅读0次

    QNX相关历史文章:

    The Bones of a Resource Manager

    这篇文章将从服务器端和客户端两侧来描述大体的框架和分层,并会给出实例。

    1. Under the covers

    1.1 Under the client's covers

    当一个客户端调用需要路径名解析的函数时(比如open()/rename()/stat()/unlink()),它会同时向进程管理器和对应的资源管理器发送消息,进而去获取文件描述符,并通过这个文件描述符向与路径名关联的设备发送消息。

    /*
     * In this stage, the client talks 
     * to the process manager and the resource manager.
     */
    fd = open("/dev/ser1", O_RDWR);
    
    /*
     * In this stage, the client talks directly to the
     * resource manager.
     */
    for (packet = 0; packet < npackets; packet++)
    {
        write(fd, packets[packet], PACKET_SIZE);
    }
    close(fd);
    

    上边的这个代码,背后的逻辑是怎么实现的呢?我们假设这是名为devc-ser8250资源管理器管理的串口设备,注册的路径名为dev/ser1

    Under-the-cover communication between the client, the process manager, and the resource manager
    1. 客户端发送query消息,open()函数会向进程管理器发送消息,请求查找名字,比如/dev/ser1

    2. 进程管理器会返回与路径名关联的nd/pid/child/handle。当devc-ser8250资源管理器使用/dev/ser1注册时,进程管理器会维护一个类似于下边的条目信息:
      0, 47167, 1, 0, 0, /dev/ser1
      其中条目的各项分别代表:

    • 0, Node descriptor(nd),用于描述在网络中的节点;
    • 47167, Process ID(pid),资源管理器的进程ID号;
    • 1, Channel ID(chid),资源管理器用于接收消息的通道;
    • 0, Handle的索引号,由资源管理器注册的Handle;
    • 0, 在名称注册期间传递的打开类型;
    • /dev/ser1,注册的路径名;
      在进程管理器中会维护一个条目表,用于记录各个资源管理器的信息。在名字匹配的时候会选择最长匹配。
    1. 客户端需要向资源管理器发送一个connect消息,首先它需要创建一个连接的通道:
    fd = ConnectAttach(nd, pid, chid, 0, 0);
    

    客户端也是使用这个fd向资源管理器发送连接消息,请求打开/dev/ser1。当资源管理器收到连接消息后,会进行权限检查。

    1. 资源管理器通常会回应通过或失败。

    2. 当客户端获取到文件描述符后,可以直接通过它向设备发送消息。示例代码看起来像是客户端直接向设备写入,实际上在调用write()时,会先发送一个_IO_WRITE消息给资源管理器,请求数据写入,客户端调用close()时,会发送_IO_CLOSE_DUP消息给资源管理器,完成最后资源的回收清理工作。

    1.2 Under the resource manager's covers

    资源管理器就是一个服务器,通过send/receive/reply消息协议来接收和回复消息,伪代码如下:

    initialize the resource manager
    register the name with the process manager
    DO forever
        receive a message
        SWITCH on the type of message
            CASE _IO_CONNECT:
                call io_open handler
                ENDCASE
            CASE _IO_READ:
                call io_read handler
                ENDCASE
            CASE _IO_WRITE:
                call io_write handler
                ENDCASE
            .   /* etc. handle all other messages */
            .   /* that may occur, performing     */
            .   /* processing as appropriate      */
        ENDSWITCH
    ENDDO
    

    事实上,上述代码中的很多细节在实现一个资源管理器中可能用不到,因为有一些库进行了封装,但是自己还是需要去实现对消息的回复。

    2. Layers in a resource manager

    资源管理器由四层组成,从下到上分别为:

    1. iofunc layer
    2. resmgr layer
    3. dispatch layer
    4. thread pool layer

    2.1 iofunc layer

    这一层的主要功能是提供POSIX特性,用户编写资源管理器时不需要太关心向外界提供POSIX文件系统所涉及的细节。
    这一层由一组函数iofunc_*组成,包含了默认处理程序,如果没有提供自己的处理程序的话,就会使用默认提供的程序。比如,如果没有提供io_open处理程序的话,就会调用iofunc_open_default()。同时也包含了默认处理程序调用的帮助函数,当自己实现处理程序时,也可以调用这些帮助函数。

    2.2 resmgr layer

    这一层负责管理资源管理器库的大部分细节:

    • 检查传入消息
    • 调用合适的处理程序来处理消息
      如果不使用这一层的话,则需要用户自己解析消息,大部分资源管理器都会用到这一层。


      The resmgr layer to handle _IO_* message

    2.3 dispatch layer

    这一层充当多种事件类型的阻塞点,使用这一层,可以处理:

    • `IO* 消息, 使用resmgr层;
    • select(),注册一个处理函数,当数据包到达时调用,这里的函数是select_*()函数;
    • Pulses,注册一个处理函数,当pulses来的时候调用,这里的函数是pulse_*()函数;
    • 其他消息,可以提供自己创建的一系列消息类型和处理函数,当消息到达时调用对应的处理函数,这里的函数是message_*()函数;
      use the dispatch layer to handle _IO_* messages, select, pulses, and other messages
      消息进行分发时,会进行查找和匹配,然后会调用到匹配的Handler函数,如果没有找到匹配的,则返回MsgError

    2.4 thread pool layer

    这一层允许用户创建单线程或多线程资源管理器,这意味着可以一个线程读,一个线程写。需要为线程提供阻塞函数,以及阻塞函数返回时需要调用的处理函数。大多数情况下,可以将dispatch layer的任务分配到线程上来,当然,也可以是resmgr layer任务或自己实现的功能。

    3. Simple examples of device resource managers

    3.1 Single-threaded device resource managers

    先来看一份完整的单线程设备资源管理器代码吧:

    #include <errno.h>
    #include <stdio.h>
    #include <stddef.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <sys/iofunc.h>
    #include <sys/dispatch.h>
    
    static resmgr_connect_funcs_t    connect_funcs;
    static resmgr_io_funcs_t         io_funcs;
    static iofunc_attr_t             attr;
    
    main(int argc, char **argv)
    {
        /* declare variables we'll be using */
        resmgr_attr_t        resmgr_attr;
        dispatch_t           *dpp;
        dispatch_context_t   *ctp;
        int                  id;
    
        /* initialize dispatch interface */
        if((dpp = dispatch_create()) == NULL) {
            fprintf(stderr,
                    "%s: Unable to allocate dispatch handle.\n",
                    argv[0]);
            return EXIT_FAILURE;
        }
    
        /* initialize resource manager attributes */
        memset(&resmgr_attr, 0, sizeof resmgr_attr);
        resmgr_attr.nparts_max = 1;
        resmgr_attr.msg_max_size = 2048;
    
        /* initialize functions for handling messages */
        iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
                         _RESMGR_IO_NFUNCS, &io_funcs);
    
        /* initialize attribute structure used by the device */
        iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
    
        /* attach our device name */
        id = resmgr_attach(
                dpp,            /* dispatch handle        */
                &resmgr_attr,   /* resource manager attrs */
                "/dev/sample",  /* device name            */
                _FTYPE_ANY,     /* open type              */
                0,              /* flags                  */
                &connect_funcs, /* connect routines       */
                &io_funcs,      /* I/O routines           */
                &attr);         /* handle                 */
        if(id == -1) {
            fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
            return EXIT_FAILURE;
        }
    
        /* allocate a context structure */
        ctp = dispatch_context_alloc(dpp);
    
        /* start the resource manager message loop */
        while(1) {
            if((ctp = dispatch_block(ctp)) == NULL) {
                fprintf(stderr, "block error\n");
                return EXIT_FAILURE;
            }
            dispatch_handler(ctp);
        }
    }
    

    上述代码完成以下功能:

    1. 初始化dispatch接口
      通过调用dispatch_create()接口创建dispatch_t结构,这个结构中包含了channel ID,但channel ID实际的创建是在附加其他内容的时候,比如调用resmgr_attach()/message_attach()/pulse_attach()等。

    2. 初始化资源管理器属性
      当调用resmgr_attach()时会传入resmgr_attr_t结构,在这个示例中主要配置:

    • nparts_max,可供服务器回复的IOV的个数;
    • msg_max_size,最大接收缓冲大小;
    1. 初始化消息处理函数
      在这个例子中提供了两张表,用于指定在特定消息到达时调用哪个函数:
    • 连接函数表;
    • I/O函数表;
      可以调用iofunc_func_init()接口来配置默认操作函数。
    1. 初始化设备使用的属性结构
      调用iofunc_attr_init()接口来设置,属性结构中至少应该包括以下信息:
    • 权限和设备类型
    • 组ID和属主ID
    1. 在名字空间中注册一个名字
      调用resmgr_attach()来注册资源管理器的路径。在资源管理器能接收到其他程序的消息之前,需要通过进程管理器通知其他程序它与路径名的绑定关系。

    2. 分配context结构体
      调用dispatch_context_alloc()接口来分配,context结构体包含了用于接收消息的缓冲,也包含了可用于回复消息的IOVs缓冲。

    3. 开始资源管理器消息循环
      当进入循环后,资源管理器便在dispatch_block()中接收消息,并调用dispatch_handler()进行分发处理,选择连接函数表和I/O函数表中的合适函数进行执行。当执行完毕后,会再进入dispatch_block()来等待接收其他消息。

    3.2 Multi-threaded device resource managers

    下边是多线程设备资源管理器示例代码:

    #include <errno.h>
    #include <stdio.h>
    #include <stddef.h>
    #include <stdlib.h>
    #include <unistd.h>
    
    /*
     * Define THREAD_POOL_PARAM_T such that we can avoid a compiler
     * warning when we use the dispatch_*() functions below
     */
    #define THREAD_POOL_PARAM_T dispatch_context_t
    
    #include <sys/iofunc.h>
    #include <sys/dispatch.h>
    
    static resmgr_connect_funcs_t    connect_funcs;
    static resmgr_io_funcs_t         io_funcs;
    static iofunc_attr_t             attr;
    
    main(int argc, char **argv)
    {
        /* declare variables we'll be using */
        thread_pool_attr_t   pool_attr;
        resmgr_attr_t        resmgr_attr;
        dispatch_t           *dpp;
        thread_pool_t        *tpp;
        dispatch_context_t   *ctp;
        int                  id;
    
        /* initialize dispatch interface */
        if((dpp = dispatch_create()) == NULL) {
            fprintf(stderr, "%s: Unable to allocate dispatch handle.\n",
                    argv[0]);
            return EXIT_FAILURE;
        }
    
        /* initialize resource manager attributes */
        memset(&resmgr_attr, 0, sizeof resmgr_attr);
        resmgr_attr.nparts_max = 1;
        resmgr_attr.msg_max_size = 2048;
    
        /* initialize functions for handling messages */
        iofunc_func_init(_RESMGR_CONNECT_NFUNCS, &connect_funcs, 
                         _RESMGR_IO_NFUNCS, &io_funcs);
    
        /* initialize attribute structure used by the device */
        iofunc_attr_init(&attr, S_IFNAM | 0666, 0, 0);
    
        /* attach our device name */
        id = resmgr_attach(dpp,            /* dispatch handle        */
                           &resmgr_attr,   /* resource manager attrs */
                           "/dev/sample",  /* device name            */
                           _FTYPE_ANY,     /* open type              */
                           0,              /* flags                  */
                           &connect_funcs, /* connect routines       */
                           &io_funcs,      /* I/O routines           */
                           &attr);         /* handle                 */
        if(id == -1) {
            fprintf(stderr, "%s: Unable to attach name.\n", argv[0]);
            return EXIT_FAILURE;
        }
    
        /* initialize thread pool attributes */
        memset(&pool_attr, 0, sizeof pool_attr);
        pool_attr.handle = dpp;
        pool_attr.context_alloc = dispatch_context_alloc;
        pool_attr.block_func = dispatch_block; 
        pool_attr.unblock_func = dispatch_unblock;
        pool_attr.handler_func = dispatch_handler;
        pool_attr.context_free = dispatch_context_free;
        pool_attr.lo_water = 2;
        pool_attr.hi_water = 4;
        pool_attr.increment = 1;
        pool_attr.maximum = 50;
    
        /* allocate a thread pool handle */
        if((tpp = thread_pool_create(&pool_attr, 
                                     POOL_FLAG_EXIT_SELF)) == NULL) {
            fprintf(stderr, "%s: Unable to initialize thread pool.\n",
                    argv[0]);
            return EXIT_FAILURE;
        }
    
        /* start the threads; will not return */
        thread_pool_start(tpp);
    }
    

    从代码中可以看出,大部分代码与单线程示例一样。在这个代码中,线程在阻塞循环中用到了dispatch_*()函数(dispatch layer)。

    1. 初始化线程池的属性
      pool_attr的各个字段赋值,用于告知线程在阻塞循环时调用哪些函数,以及线程池需要维护多少线程。

    2. 分配一个线程池的handle
      调用thread_pool_create()接口来分配,这个handle用于控制整个线程池。

    3. 开启线程
      调用thread_pool_start()接口,启动线程池。每个新创建的线程都会使用在属性结构中给出的context_alloc()函数来分配THREAD_POOL_PARAM_T定义类型的context结构。

    3.3 Using MsgSend() and MsgReply()

    可以不使用read()write()接口来与资源管理器交互,可以调用MsgSend()来发送消息,下边的示例将分两部分:服务器和客户端。服务器端需要使能PROCMGR_AID_PATHSPACE,用于确保能调用resmgr_attach()函数。

    • 服务器代码:
    /*
     * ResMgr and Message Server Process
     */
    
    #include <stdio.h>
    #include <unistd.h>
    #include <stdlib.h>
    #include <errno.h>
    #include <string.h>
    #include <sys/neutrino.h>
    #include <sys/iofunc.h>
    #include <sys/dispatch.h>
    
    resmgr_connect_funcs_t  ConnectFuncs;
    resmgr_io_funcs_t       IoFuncs;
    iofunc_attr_t           IoFuncAttr;
    
    
    typedef struct
    {
        uint16_t msg_no;
        char     msg_data[255];
    } server_msg_t;
    
    
    int message_callback( message_context_t *ctp, int type, unsigned flags, 
                          void *handle )
    {
        server_msg_t *msg;
        int num;
        char msg_reply[255];
    
        /* Cast a pointer to the message data */
        msg = (server_msg_t *)ctp->msg;
    
        /* Print some useful information about the message */
        printf( "\n\nServer Got Message:\n" );
        printf( "  type: %d\n" , type );
        printf( "  data: %s\n\n", msg->msg_data );
    
        /* Build the reply message */
        num = type - _IO_MAX;
        snprintf( msg_reply, 254, "Server Got Message Code: _IO_MAX + %d", num );
       
        /* Send a reply to the waiting (blocked) client */ 
        MsgReply( ctp->rcvid, EOK, msg_reply, strlen( msg_reply ) + 1 );
    
        return 0;
    }
    
    
    
    int main( int argc, char **argv )
    {
        resmgr_attr_t        resmgr_attr;
        message_attr_t       message_attr;
        dispatch_t           *dpp;
        dispatch_context_t   *ctp, *ctp_ret;
        int                  resmgr_id, message_id;
    
        /* Create the dispatch interface */
        dpp = dispatch_create();
        if( dpp == NULL )
        {
            fprintf( stderr, "dispatch_create() failed: %s\n", 
                     strerror( errno ) );
            return EXIT_FAILURE;
        }
    
        memset( &resmgr_attr, 0, sizeof( resmgr_attr ) );
        resmgr_attr.nparts_max = 1;
        resmgr_attr.msg_max_size = 2048;
    
        /* Setup the default I/O functions to handle open/read/write/... */
        iofunc_func_init( _RESMGR_CONNECT_NFUNCS, &ConnectFuncs,
                          _RESMGR_IO_NFUNCS, &IoFuncs );
    
        /* Setup the attribute for the entry in the filesystem */
        iofunc_attr_init( &IoFuncAttr, S_IFNAM | 0666, 0, 0 );
    
        resmgr_id = resmgr_attach( dpp, &resmgr_attr, "serv", _FTYPE_ANY, 
                                   0, &ConnectFuncs, &IoFuncs, &IoFuncAttr );
        if( resmgr_id == -1 )
        {
            fprintf( stderr, "resmgr_attach() failed: %s\n", strerror( errno ) );
            return EXIT_FAILURE;
        }
    
        /* Setup our message callback */
        memset( &message_attr, 0, sizeof( message_attr ) );
        message_attr.nparts_max = 1;
        message_attr.msg_max_size = 4096;
    
        /* Attach a callback (handler) for two message types */
        message_id = message_attach( dpp, &message_attr, _IO_MAX + 1,
                                     _IO_MAX + 2, message_callback, NULL );
        if( message_id == -1 )
        {
            fprintf( stderr, "message_attach() failed: %s\n", strerror( errno ) );
            return EXIT_FAILURE;
        }
    
        /* Setup a context for the dispatch layer to use */
        ctp = dispatch_context_alloc( dpp );
        if( ctp == NULL )
        {
            fprintf( stderr, "dispatch_context_alloc() failed: %s\n", 
                     strerror( errno ) );
            return EXIT_FAILURE;
        }
    
    
        /* The "Data Pump" - get and process messages */
        while( 1 )
        {
            ctp_ret = dispatch_block( ctp );
            if( ctp_ret )
            {
                dispatch_handler( ctp );
            }
            else
            {
                fprintf( stderr, "dispatch_block() failed: %s\n", 
                         strerror( errno ) );
                return EXIT_FAILURE;
            }
        }
    
        return EXIT_SUCCESS;
    }
    
    1. 首先是调用dispatch_create()接口来创建dpp,通过这个handle将接收到的消息转发到合适的层(resmgr, message, pulse);
    2. 设置调用resmgr_attach()所需要的变量;
    3. 调用iofunc_func_init()来设置默认处理函数,调用iofunc_attr_init()来设置属性结构;
    4. 调用resmgr_attach(),注意这时候注册的路径不是绝对路径,因此默认在它的执行路径下。.
    5. 告诉dispatch layer,除了由resmgr layer处理的标准I/O和连接消息之外,还需要处理自己的消息。
    6. 调用message_attach()接口来注册自己的消息处理函数。
    7. 当调用dispatch_block()接收消息后,调用dispatch_handler()来处理,实际上会在dispatch_handler()中调用message_callback()函数。当消息类型为_IO_MAX + 1_IO_MAX + 2时,就会回调函数。
    • 客户端代码:
    /* 
     * Message Client Process 
     */ 
    
    #include <stdio.h> 
    #include <unistd.h> 
    #include <stdlib.h> 
    #include <fcntl.h> 
    #include <errno.h> 
    #include <string.h> 
    #include <sys/neutrino.h> 
    #include <sys/iofunc.h> 
    #include <sys/dispatch.h> 
    
    typedef struct 
    { 
        uint16_t msg_no; 
        char msg_data[255]; 
    } client_msg_t; 
    
    int main( int argc, char **argv ) 
    { 
        int fd; 
        int c; 
        client_msg_t msg; 
        int ret; 
        int num; 
        char msg_reply[255]; 
        
        num = 1; 
        
        /* Process any command line arguments */ 
        while( ( c = getopt( argc, argv, "n:" ) ) != -1 ) 
        { 
            if( c == 'n' ) 
            { 
                num = strtol( optarg, 0, 0 ); 
            } 
        } 
        /* Open a connection to the server (fd == coid) */ 
        fd = open( "serv", O_RDWR ); 
        if( fd == -1 ) 
        { 
            fprintf( stderr, "Unable to open server connection: %s\n", 
                strerror( errno ) ); 
            return EXIT_FAILURE; 
        } 
        
        /* Clear the memory for the msg and the reply */ 
        memset( &msg, 0, sizeof( msg ) ); 
        memset( &msg_reply, 0, sizeof( msg_reply ) ); 
        
        /* Set up the message data to send to the server */ 
        msg.msg_no = _IO_MAX + num; 
        snprintf( msg.msg_data, 254, "client %d requesting reply.", getpid() ); 
        
        printf( "client: msg_no: _IO_MAX + %d\n", num ); 
        fflush( stdout ); 
        
        /* Send the data to the server and get a reply */ 
        ret = MsgSend( fd, &msg, sizeof( msg ), msg_reply, 255 ); 
        if( ret == -1 ) 
        { 
            fprintf( stderr, "Unable to MsgSend() to server: %s\n", strerror( errno ) ); 
            return EXIT_FAILURE; 
        } 
        
        /* Print out the reply data */ 
        printf( "client: server replied: %s\n", msg_reply ); 
        
        close( fd ); 
        
        return EXIT_SUCCESS; 
    } 
    

    客户端通过open()接口获取connection id,调用MsgSend()接口去往服务器上发送消息,并且等待回复。

    上述服务器和客户端的例子非常简单,但覆盖了大部分的内容,基于这个基础的框架,还可以做其他的事情:

    • 基于不同的消息类型,注册不同的消息回调函数;
    • 除了消息之外,使用pulse_attach()来接收pulse
    • 重写默认的I/O消息处理函数以便客户端也能使用read()write()来与服务器交互;
    • 使用线程池来编写多线程服务器;

    相关文章

      网友评论

        本文标题:QNX之编写资源管理器(一)

        本文链接:https://www.haomeiwen.com/subject/lwdurqtx.html