美文网首页
EasyDarwin RTSP会话流程解析

EasyDarwin RTSP会话流程解析

作者: 耐寒 | 来源:发表于2017-02-28 17:00 被阅读0次

    在main.cpp的main函数里:

    ...
    //This function starts, runs, and shuts down the server
        if (::StartServer(&theXMLParser, &theMessagesSource, thePort, statsUpdateInterval, theInitialState, dontFork, debugLevel, debugOptions) != qtssFatalErrorState)
        {    
             ::RunServer();
             CleanPid(false);
             exit (EXIT_SUCCESS);
        }
        else
            exit(-1); //Cant start server don't try again
    
    
    QTSS_ServerState StartServer(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, int statsUpdateInterval, QTSS_ServerState inInitialState, Bool16 inDontFork, UInt32 debugLevel, UInt32 debugOptions)
    {
        QTSS_ServerState theServerState = qtssStartingUpState;
        
        sStatusUpdateInterval = statsUpdateInterval;
        ...
        //start the server
        QTSSDictionaryMap::Initialize();
        QTSServerInterface::Initialize();// this must be called before constructing the server object
        sServer = NEW QTSServer();
        sServer->SetDebugLevel(debugLevel);
        sServer->SetDebugOptions(debugOptions);
        
        // re-parse config file
        inPrefsSource->Parse();
        ...
        sServer->Initialize(inPrefsSource, inMessagesSource, inPortOverride,createListeners);
        ...
        if (sServer->GetServerState() != qtssFatalErrorState)
        {
            IdleTask::Initialize();
            Socket::StartThread();
            OSThread::Sleep(1000);
            sServer->InitModules(inInitialState);
            //开始监听
            sServer->StartTasks();
            sServer->SetupUDPSockets(); // udp sockets are set up after the rtcp task is instantiated
            theServerState = sServer->GetServerState();
        }
        return theServerState;
    }
    

    在StartServer里首先创建QTSServer,然后调用了sServer->Initialize函数,在Initialize调用了CreateListeners函数依据xml的配置文件创建了监听的套接字来监听RTSP的端口,这两个函数的实现如下:

    Bool16 QTSServer::Initialize(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, Bool16 createListeners)
    {
        ...
        // CREATE GLOBAL OBJECTS
        fSocketPool = new RTPSocketPool();
        fRTPMap = new OSRefTable(kRTPSessionMapSize);
        fHLSMap = new OSRefTable(kHLSSessionMapSize);
        fReflectorSessionMap = new OSRefTable(kReflectorSessionMapSize);
        ...
        // BEGIN LISTENING
        if (createListeners)
        {
            if ( !this->CreateListeners(false, fSrvrPrefs, inPortOverride) )
                QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssMsgSomePortsFailed, 0);
        }
        ...
        fServerState = qtssStartingUpState;
        return true;
    }
    
    Bool16 QTSServer::CreateListeners(Bool16 startListeningNow, QTSServerPrefs* inPrefs, UInt16 inPortOverride)
    {
        // Create any new listeners we need
        for (UInt32 count3 = 0; count3 < theTotalRTSPPortTrackers; count3++)
        {
            if (theRTSPPortTrackers[count3].fNeedsCreating)
            {
                newListenerArray[curPortIndex] = NEW RTSPListenerSocket();
                //在Initialize创建了TCPSocket
                QTSS_Error err = newListenerArray[curPortIndex]->Initialize(theRTSPPortTrackers[count3].fIPAddr, theRTSPPortTrackers[count3].fPort);
                ...
                else
                {
                    //
                    // This listener was successfully created.
                    if (startListeningNow)
                        newListenerArray[curPortIndex]->RequestEvent(EV_RE);
                    curPortIndex++;
                }
            }
        }
    }
    

    然后StartServer里调用sServer->StartTasks()开始监听。StartTasks是如何开始监听的呢?

    void QTSServer::StartTasks()
    {
        fRTCPTask = new RTCPTask();
        fStatsTask = new RTPStatsUpdaterTask();
        //
        // Start listening
        for (UInt32 x = 0; x < fNumListeners; x++)
            fListeners[x]->RequestEvent(EV_RE);
    }
    

    这里的fListeners就是RTSPListenerSocket,其继承关系如下图,RequestEvent就是把创建的fFileDesc和EV_RE(读事件)添加到epoll中.


    123.jpg

    在RequestEvent里有这么两行代码

    fRef.Set(fUniqueIDStr, this); 
    fEventThread->fRefTable.Register(&fRef);
    

    这个fEventThread是在StartServer里被创建且开始运行的,在该线程里epoll会监听网络事件,网络事件来的时候,会调用fRefTable存储的EventContext指针对象的ProcessEvent处理事件;fListeners向fEventThread的fRefTable注册了fRef,也就是等网络事件来的时候调用到fListeners的ProcessEvent处理。
    该线程函数的代码如下:

    void EventThread::Entry()
    {
        struct eventreq theCurrentEvent;
        ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
        
        while (true)
        {
            int theErrno = EINTR;
            while (theErrno == EINTR)
            {
    #if MACOSXEVENTQUEUE
                int theReturnValue = waitevent(&theCurrentEvent, NULL);
    #else
                
                #if defined(__linux__)
                int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
                #else
                int theReturnValue = select_waitevent(&theCurrentEvent, NULL);            
                #endif
    #endif  
                ...
            }
            
            AssertV(theErrno == 0, theErrno);
            
            //ok, there's data waiting on this socket. Send a wakeup.
            if (theCurrentEvent.er_data != NULL)
            {
                //The cookie in this event is an ObjectID. Resolve that objectID into
                //a pointer.
                StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
                OSRef* ref = fRefTable.Resolve(&idStr);
                if (ref != NULL)
                {
                    EventContext* theContext = (EventContext*)ref->GetObject();
    #if DEBUG
                    theContext->fModwatched = false;
    #endif
                    theContext->ProcessEvent(theCurrentEvent.er_eventbits);
                    fRefTable.Release(ref);
                }
            }
        ...
    }
    

    那在ProcessEvent里干了些什么事情呢?

    void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
    {
         ...
        //fSocket data member of TCPSocket.
        int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
        ...
        
        theTask = this->GetSessionTask(&theSocket);
        if (theTask == NULL)
        {    //this should be a disconnect. do an ioctl call?
            close(osSocket);
            if (theSocket)
                theSocket->fState &= ~kConnected; // turn off connected state
        }
        else
        {   
            Assert(osSocket != EventContext::kInvalidFileDesc);
            
            //set options on the socket
            //we are a server, always disable nagle algorithm
            int one = 1;
            int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int));
            AssertV(err == 0, OSThread::GetErrno());
            
            err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int));
            AssertV(err == 0, OSThread::GetErrno());
        
            int sndBufSize = 96L * 1024L;
            err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int));
            AssertV(err == 0, OSThread::GetErrno());
        
            //setup the socket. When there is data on the socket,
            //theTask will get an kReadEvent event
            theSocket->Set(osSocket, &addr);
            theSocket->InitNonBlocking(osSocket);
            theSocket->SetTask(theTask);
            theSocket->RequestEvent(EV_RE);
            
            theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads
        }
        ...
    }
    

    从TCPListenerSocket::ProcessEvent可知,在该函数里accept RTSP的连接请求,然后调用GetSessionTask创建RTSPSession了,theSocket attach上了osSocket,然后设置osSocket为非阻塞,设置处理osSocket的网络事件为RTSPSession,把osSocket的读事件加入到epoll中。至此我们可以知道该连接的所有网络事件由RTSPSession处理。theSocket是一个TCPSocket指针对象,由RTSPSession创建的,而不再是一个RTSPListenerSocket了。
    RTSPSession是如何获取到网络事件的?从TCPListenerSocket::ProcessEvent中知道,theSocket对象调用了RequestEvent,请求了读事件,也就是在epoll中等待网络读事件,当读到RTSP会话的对端发送了RTSP的信息时(在EventThread线程函数Entry里,如下所示),会调用到EventContext的ProcessEvent函数。

    void EventThread::Entry()
    {
        struct eventreq theCurrentEvent;
        ::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
        
        while (true)
        {
            int theErrno = EINTR;
            while (theErrno == EINTR)
            {
    #if MACOSXEVENTQUEUE
                int theReturnValue = waitevent(&theCurrentEvent, NULL);
    #else
                
                #if defined(__linux__)
                int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
                #else
                int theReturnValue = select_waitevent(&theCurrentEvent, NULL);            
                #endif
    #endif  
                //Sort of a hack. In the POSIX version of the server, waitevent can return
                //an actual POSIX errorcode.
                if (theReturnValue >= 0)
                    theErrno = theReturnValue;
                else
                    theErrno = OSThread::GetErrno();
            }
            
            AssertV(theErrno == 0, theErrno);
            
            //ok, there's data waiting on this socket. Send a wakeup.
            if (theCurrentEvent.er_data != NULL)
            {
                //The cookie in this event is an ObjectID. Resolve that objectID into
                //a pointer.
                StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
                OSRef* ref = fRefTable.Resolve(&idStr);
                if (ref != NULL)
                {
                    EventContext* theContext = (EventContext*)ref->GetObject();
    #if DEBUG
                    theContext->fModwatched = false;
    #endif
                    theContext->ProcessEvent(theCurrentEvent.er_eventbits);
                    fRefTable.Release(ref);
                    
                    
                }
            }
    
    #if EVENT_CONTEXT_DEBUG
            SInt64  yieldStart = OS::Milliseconds();
    #endif
    
        #if 0//defined(__linux__)
    
        #else
            this->ThreadYield();
        #endif
        
    #if EVENT_CONTEXT_DEBUG
            SInt64  yieldDur = OS::Milliseconds() - yieldStart;
            static SInt64   numZeroYields;
            
            if ( yieldDur > 1 )
            {
                qtss_printf( "EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32)yieldDur, (SInt32)numZeroYields );
                numZeroYields = 0;
            }
            else
                numZeroYields++;
    #endif
        }
    }
    

    ProcessEvent会给RTSPSession这个Task发送一个kReadEvent的信号,此处的fTask就是RTSPSession。于是RTSPSession这个task会被加入到线程队列中,等待RTSPSession的run方法被调用去处理网络的读事件,也就是处理RTSP会话。
    RTSPSession::run函数采用状态机处理RTSP会话,其状态机设计如下:


    1234.jpg

    • kReadingFirstRequest,RTSPSession刚被创建时,都是首先进入这个状态,然后读取客户端发送过来的RTSP request。
    • kHTTPFilteringRequest,然后检查该请求是不是以RTSP-OVER-HTTP的方式进行RTSP交互。
    • kHaveNonTunnelMessage,开始加锁,防止在该请求还没有处理完的时候,禁止其他的任何RTSP请求被处理。
    • kFilteringRequest,会给每个订阅了QTSS_RTSPFilter_Role角色的模块调用QTSS_RTSPFilter_Role的回调函数。如果不是OPTION命令或者SET_PARAMETER命令,则会创建RTPSession。如果是RTP的数据包,则会调用QTSS_RTSPIncomingData_Role角色的回调,每个订阅了该角色的模块都可以获取到RTP包的内容。
    • kPostProcessingRequest,如果创建了RTPSession,则会给所有订阅了QTSS_RTSPPostProcessor_Role角色的模块调用QTSS_RTSPPostProcessor_Role回调函数。
    • kCleaningUp,在该状态机里释放fSessionMutex和fReadMutex锁。
    • kReadingRequest,除了RTSP刚被创建时,读取request消息是在kReadingFirstRequest状态机里,其余所有的request消息都从kReadingRequest消息开始处理。
    • kRoutingRequest,调用QTSS_RTSPRoute_Role角色的回调。
    • kPreprocessingRequest,调用QTSS_RTSPPreProcessor_Role角色的回调,如果是TEARDOWN的request,则被调用的模块可能会调用RTPSession的Teardown方法,这样就会给RTPSession这个TASK发送一个kKillEvent的事件,RTPSession收到该事件后,会调用QTSS_ClientSessionClosing_Role角色的回调,关闭该会话。

    技术交流可以入QQ群【554271674】

    相关文章

      网友评论

          本文标题:EasyDarwin RTSP会话流程解析

          本文链接:https://www.haomeiwen.com/subject/gnnxgttx.html