在main.cpp的main函数里:
...
//This function starts, runs, and shuts down the server
if (::StartServer(&theXMLParser, &theMessagesSource, thePort, statsUpdateInterval, theInitialState, dontFork, debugLevel, debugOptions) != qtssFatalErrorState)
{
::RunServer();
CleanPid(false);
exit (EXIT_SUCCESS);
}
else
exit(-1); //Cant start server don't try again
QTSS_ServerState StartServer(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, int statsUpdateInterval, QTSS_ServerState inInitialState, Bool16 inDontFork, UInt32 debugLevel, UInt32 debugOptions)
{
QTSS_ServerState theServerState = qtssStartingUpState;
sStatusUpdateInterval = statsUpdateInterval;
...
//start the server
QTSSDictionaryMap::Initialize();
QTSServerInterface::Initialize();// this must be called before constructing the server object
sServer = NEW QTSServer();
sServer->SetDebugLevel(debugLevel);
sServer->SetDebugOptions(debugOptions);
// re-parse config file
inPrefsSource->Parse();
...
sServer->Initialize(inPrefsSource, inMessagesSource, inPortOverride,createListeners);
...
if (sServer->GetServerState() != qtssFatalErrorState)
{
IdleTask::Initialize();
Socket::StartThread();
OSThread::Sleep(1000);
sServer->InitModules(inInitialState);
//开始监听
sServer->StartTasks();
sServer->SetupUDPSockets(); // udp sockets are set up after the rtcp task is instantiated
theServerState = sServer->GetServerState();
}
return theServerState;
}
在StartServer里首先创建QTSServer,然后调用了sServer->Initialize函数,在Initialize调用了CreateListeners函数依据xml的配置文件创建了监听的套接字来监听RTSP的端口,这两个函数的实现如下:
Bool16 QTSServer::Initialize(XMLPrefsParser* inPrefsSource, PrefsSource* inMessagesSource, UInt16 inPortOverride, Bool16 createListeners)
{
...
// CREATE GLOBAL OBJECTS
fSocketPool = new RTPSocketPool();
fRTPMap = new OSRefTable(kRTPSessionMapSize);
fHLSMap = new OSRefTable(kHLSSessionMapSize);
fReflectorSessionMap = new OSRefTable(kReflectorSessionMapSize);
...
// BEGIN LISTENING
if (createListeners)
{
if ( !this->CreateListeners(false, fSrvrPrefs, inPortOverride) )
QTSSModuleUtils::LogError(qtssWarningVerbosity, qtssMsgSomePortsFailed, 0);
}
...
fServerState = qtssStartingUpState;
return true;
}
Bool16 QTSServer::CreateListeners(Bool16 startListeningNow, QTSServerPrefs* inPrefs, UInt16 inPortOverride)
{
// Create any new listeners we need
for (UInt32 count3 = 0; count3 < theTotalRTSPPortTrackers; count3++)
{
if (theRTSPPortTrackers[count3].fNeedsCreating)
{
newListenerArray[curPortIndex] = NEW RTSPListenerSocket();
//在Initialize创建了TCPSocket
QTSS_Error err = newListenerArray[curPortIndex]->Initialize(theRTSPPortTrackers[count3].fIPAddr, theRTSPPortTrackers[count3].fPort);
...
else
{
//
// This listener was successfully created.
if (startListeningNow)
newListenerArray[curPortIndex]->RequestEvent(EV_RE);
curPortIndex++;
}
}
}
}
然后StartServer里调用sServer->StartTasks()开始监听。StartTasks是如何开始监听的呢?
void QTSServer::StartTasks()
{
fRTCPTask = new RTCPTask();
fStatsTask = new RTPStatsUpdaterTask();
//
// Start listening
for (UInt32 x = 0; x < fNumListeners; x++)
fListeners[x]->RequestEvent(EV_RE);
}
这里的fListeners就是RTSPListenerSocket,其继承关系如下图,RequestEvent就是把创建的fFileDesc和EV_RE(读事件)添加到epoll中.
123.jpg
在RequestEvent里有这么两行代码
fRef.Set(fUniqueIDStr, this);
fEventThread->fRefTable.Register(&fRef);
这个fEventThread是在StartServer里被创建且开始运行的,在该线程里epoll会监听网络事件,网络事件来的时候,会调用fRefTable存储的EventContext指针对象的ProcessEvent处理事件;fListeners向fEventThread的fRefTable注册了fRef,也就是等网络事件来的时候调用到fListeners的ProcessEvent处理。
该线程函数的代码如下:
void EventThread::Entry()
{
struct eventreq theCurrentEvent;
::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
while (true)
{
int theErrno = EINTR;
while (theErrno == EINTR)
{
#if MACOSXEVENTQUEUE
int theReturnValue = waitevent(&theCurrentEvent, NULL);
#else
#if defined(__linux__)
int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
#else
int theReturnValue = select_waitevent(&theCurrentEvent, NULL);
#endif
#endif
...
}
AssertV(theErrno == 0, theErrno);
//ok, there's data waiting on this socket. Send a wakeup.
if (theCurrentEvent.er_data != NULL)
{
//The cookie in this event is an ObjectID. Resolve that objectID into
//a pointer.
StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
OSRef* ref = fRefTable.Resolve(&idStr);
if (ref != NULL)
{
EventContext* theContext = (EventContext*)ref->GetObject();
#if DEBUG
theContext->fModwatched = false;
#endif
theContext->ProcessEvent(theCurrentEvent.er_eventbits);
fRefTable.Release(ref);
}
}
...
}
那在ProcessEvent里干了些什么事情呢?
void TCPListenerSocket::ProcessEvent(int /*eventBits*/)
{
...
//fSocket data member of TCPSocket.
int osSocket = accept(fFileDesc, (struct sockaddr*)&addr, &size);
...
theTask = this->GetSessionTask(&theSocket);
if (theTask == NULL)
{ //this should be a disconnect. do an ioctl call?
close(osSocket);
if (theSocket)
theSocket->fState &= ~kConnected; // turn off connected state
}
else
{
Assert(osSocket != EventContext::kInvalidFileDesc);
//set options on the socket
//we are a server, always disable nagle algorithm
int one = 1;
int err = ::setsockopt(osSocket, IPPROTO_TCP, TCP_NODELAY, (char*)&one, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
err = ::setsockopt(osSocket, SOL_SOCKET, SO_KEEPALIVE, (char*)&one, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
int sndBufSize = 96L * 1024L;
err = ::setsockopt(osSocket, SOL_SOCKET, SO_SNDBUF, (char*)&sndBufSize, sizeof(int));
AssertV(err == 0, OSThread::GetErrno());
//setup the socket. When there is data on the socket,
//theTask will get an kReadEvent event
theSocket->Set(osSocket, &addr);
theSocket->InitNonBlocking(osSocket);
theSocket->SetTask(theTask);
theSocket->RequestEvent(EV_RE);
theTask->SetThreadPicker(Task::GetBlockingTaskThreadPicker()); //The RTSP Task processing threads
}
...
}
从TCPListenerSocket::ProcessEvent可知,在该函数里accept RTSP的连接请求,然后调用GetSessionTask创建RTSPSession了,theSocket attach上了osSocket,然后设置osSocket为非阻塞,设置处理osSocket的网络事件为RTSPSession,把osSocket的读事件加入到epoll中。至此我们可以知道该连接的所有网络事件由RTSPSession处理。theSocket是一个TCPSocket指针对象,由RTSPSession创建的,而不再是一个RTSPListenerSocket了。
RTSPSession是如何获取到网络事件的?从TCPListenerSocket::ProcessEvent中知道,theSocket对象调用了RequestEvent,请求了读事件,也就是在epoll中等待网络读事件,当读到RTSP会话的对端发送了RTSP的信息时(在EventThread线程函数Entry里,如下所示),会调用到EventContext的ProcessEvent函数。
void EventThread::Entry()
{
struct eventreq theCurrentEvent;
::memset( &theCurrentEvent, '\0', sizeof(theCurrentEvent) );
while (true)
{
int theErrno = EINTR;
while (theErrno == EINTR)
{
#if MACOSXEVENTQUEUE
int theReturnValue = waitevent(&theCurrentEvent, NULL);
#else
#if defined(__linux__)
int theReturnValue = epoll_waitevent(&theCurrentEvent, NULL);
#else
int theReturnValue = select_waitevent(&theCurrentEvent, NULL);
#endif
#endif
//Sort of a hack. In the POSIX version of the server, waitevent can return
//an actual POSIX errorcode.
if (theReturnValue >= 0)
theErrno = theReturnValue;
else
theErrno = OSThread::GetErrno();
}
AssertV(theErrno == 0, theErrno);
//ok, there's data waiting on this socket. Send a wakeup.
if (theCurrentEvent.er_data != NULL)
{
//The cookie in this event is an ObjectID. Resolve that objectID into
//a pointer.
StrPtrLen idStr((char*)&theCurrentEvent.er_data, sizeof(theCurrentEvent.er_data));
OSRef* ref = fRefTable.Resolve(&idStr);
if (ref != NULL)
{
EventContext* theContext = (EventContext*)ref->GetObject();
#if DEBUG
theContext->fModwatched = false;
#endif
theContext->ProcessEvent(theCurrentEvent.er_eventbits);
fRefTable.Release(ref);
}
}
#if EVENT_CONTEXT_DEBUG
SInt64 yieldStart = OS::Milliseconds();
#endif
#if 0//defined(__linux__)
#else
this->ThreadYield();
#endif
#if EVENT_CONTEXT_DEBUG
SInt64 yieldDur = OS::Milliseconds() - yieldStart;
static SInt64 numZeroYields;
if ( yieldDur > 1 )
{
qtss_printf( "EventThread time in OSTHread::Yield %i, numZeroYields %i\n", (SInt32)yieldDur, (SInt32)numZeroYields );
numZeroYields = 0;
}
else
numZeroYields++;
#endif
}
}
ProcessEvent会给RTSPSession这个Task发送一个kReadEvent的信号,此处的fTask就是RTSPSession。于是RTSPSession这个task会被加入到线程队列中,等待RTSPSession的run方法被调用去处理网络的读事件,也就是处理RTSP会话。
RTSPSession::run函数采用状态机处理RTSP会话,其状态机设计如下:
1234.jpg
• kReadingFirstRequest,RTSPSession刚被创建时,都是首先进入这个状态,然后读取客户端发送过来的RTSP request。
• kHTTPFilteringRequest,然后检查该请求是不是以RTSP-OVER-HTTP的方式进行RTSP交互。
• kHaveNonTunnelMessage,开始加锁,防止在该请求还没有处理完的时候,禁止其他的任何RTSP请求被处理。
• kFilteringRequest,会给每个订阅了QTSS_RTSPFilter_Role角色的模块调用QTSS_RTSPFilter_Role的回调函数。如果不是OPTION命令或者SET_PARAMETER命令,则会创建RTPSession。如果是RTP的数据包,则会调用QTSS_RTSPIncomingData_Role角色的回调,每个订阅了该角色的模块都可以获取到RTP包的内容。
• kPostProcessingRequest,如果创建了RTPSession,则会给所有订阅了QTSS_RTSPPostProcessor_Role角色的模块调用QTSS_RTSPPostProcessor_Role回调函数。
• kCleaningUp,在该状态机里释放fSessionMutex和fReadMutex锁。
• kReadingRequest,除了RTSP刚被创建时,读取request消息是在kReadingFirstRequest状态机里,其余所有的request消息都从kReadingRequest消息开始处理。
• kRoutingRequest,调用QTSS_RTSPRoute_Role角色的回调。
• kPreprocessingRequest,调用QTSS_RTSPPreProcessor_Role角色的回调,如果是TEARDOWN的request,则被调用的模块可能会调用RTPSession的Teardown方法,这样就会给RTPSession这个TASK发送一个kKillEvent的事件,RTPSession收到该事件后,会调用QTSS_ClientSessionClosing_Role角色的回调,关闭该会话。
技术交流可以入QQ群【554271674】
网友评论