美文网首页通往成功之路
C# 实现的多线程异步Socket数据包接收qi框架

C# 实现的多线程异步Socket数据包接收qi框架

作者: 此十八 | 来源:发表于2018-08-13 09:09 被阅读7次

    几天前在博问中看到一个C# Socket问题,就想到笔者2004年做的一个省级交通流量接收服务器项目,当时的基本求如下:

    接收自动观测设备通过无线网卡、Internet和Socket上报的交通量数据包

    全年365*24运行的自动观测设备5分钟上报一次观测数据,每笔记录约2K大小

    规划全省将有100个左右的自动观测设备(截止2008年10月还只有30个)

          当时,VS2003才发布年多,笔者也是接触C#不久。于是Google了国内国外网,希望找点应用C#解决Socket通信问题的思路和代码。最后,找到了两篇帮助最大的文章:一篇是国人写的Socket接收器框架,应用了独立的客户端Socket会话(Session)概念,给笔者提供了一个接收服务器的总体框架思路;另一篇是美国人写的,提出了多线程、分段接收数据包的技术方案,描述了多线程、异步Socket的许多实现细节,该文坚定了笔者采用多线程和异步方式处理Socket接收器的技术路线。

         具体实现和测试时笔者还发现,在Internet环境下的Socket应用中,需要系统有极强的容错能力:没有办法控制异常,就必须允许它们存在(附加源代码中可以看到,try{}catch{}语句较多)。对此,笔者设计了一个专门的检查和清理线程,完成无效或超时会话的清除和资源释放工作。

         依稀记得,国内框架作者的名称空间有ibm,认为是IBM公司职员,通过邮件后才知道其人在深圳。笔者向他请教了几个问题,相互探讨了几个技术关键点。可惜,现在再去找,已经查不到原文和邮件了。只好借此机会,将本文献给这两个素未谋面的技术高人和同行,也盼望拙文或源码能给读者一点有用的启发和帮助。

    1、主要技术思路

         整个系统由三个核心线程组成,并由.NET线程池统一管理:

    监视客户端连接请求线程:ListenClientRequest(),循环监视客户端连接请求。如果有,检测该客户端IP,看是否是同一观测设备,然后建立一个客户端TSession对象,并通过Socket异步调用方法BeginReceive()接收数据包、EndReceive()处理数据包

    数据包处理线程:HandleDatagrams(),循环检测数据包队列_datagramQueue,完成数据包解析、判断类型、存储等工作

    客户端状态检测线程:CheckClientState(),循环检查客户端会话表_sessionTable,判断会话对象是否有效,设置超时会话关闭标志,清楚无效会话对象及释放其资源

    2、主要类简介

         系统主要由3个类组成:

    TDatagramReceiver(数据包接收服务器):系统的核心进程类,建立Socket连接、处理与存储数据包、清理系统资源,该类提供全部的public属性和方法

    TSession(客户端会话):由每个客户端的Socket对象组成,有自己的数据缓冲区,清理线程根据该对象的最近会话时间判断是否超时

    TDatagram(数据包类):判断数据包类别、解析数据包

    3、关键函数和代码

         下面简介核心类TDatagramReceiver的关键实现代码。

    3.1  系统启动

          系统启动方法StartReceiver()首先清理资源、创建数据库连接、初始化若干计数值,然后创建服务器端侦听Socket对象,最后调用静态方法ThreadPool.QueueUserWorkItem()在线程池中创建3个核心处理线程。

    /// ///启动接收器

    /// public boolStartReceiver()

    {

    try

    {

    _stopReceiver

    = true;

    this.Close();

    if (!this.ConnectDatabase()) return false;

    _clientCount

    = 0;

    _datagramQueueCount

    = 0;

    _datagramCount

    = 0;

    _errorDatagramCount

    = 0;

    _exceptionCount

    = 0;

    _sessionTable

    = newHashtable(_maxAllowClientCount);

    _datagramQueue

    = new Queue(_maxAllowDatagramQueueCount);

    _stopReceiver

    = false;  // 循环中均要该标志if (!this.CreateReceiverSocket())  //建立服务器端 Socket 对象{

    return false;

    }

    // 侦听客户端连接请求线程, 使用委托推断, 不建 CallBack 对象        if (!ThreadPool.QueueUserWorkItem(ListenClientRequest))

    {

    return false;

    }

    // 处理数据包队列线程        if (!ThreadPool.QueueUserWorkItem(HandleDatagrams))

    {

    return false;

    }

    // 检查客户会话状态, 长时间未通信则清除该对象        if (!ThreadPool.QueueUserWorkItem(CheckClientState))

    {

    return false;

    }

    _stopConnectRequest

    = false;  // 启动接收器,则自动允许连接}

    catch

    {

    this.OnReceiverException();

    _stopReceiver

    = true;

    }

    return !_stopReceiver;

    }

          下面是创建侦听Socket对象的方法代码。

    /// ///创建接收服务器的 Socket, 并侦听客户端连接请求

    /// private boolCreateReceiverSocket()

    {

    try

    {

    _receiverSocket

    = newSocket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    _receiverSocket.Bind(

    new IPEndPoint(IPAddress.Any, _tcpSocketPort));  // 绑定端口        _receiverSocket.Listen(_maxAllowListenQueueLength);  // 开始监听return true;

    }

    catch

    {

    this.OnReceiverException();

    return false;

    }

    }

    3.2  侦听客户端连接请求

          服务器端循环等待客户端连接请求。一旦有请求,先判断客户端连接数是否超限,接着检测该客户端IP地址,一切正常后建立TSession对象,并调用异步方法接收客户端Socket数据包。

          代码中,Socket读到数据时的回调AsyncCallback委托方法EndReceiveData()完成数据接收工作,正常情况下启动另一个异步BeginReceive()调用。

          .NET中,每个异步方法都有自己的独立线程,异步处理其实也基于多线程机制的。下面代码中的异步套异步调用,既占用较大的系统资源,也给处理带来意想不到的结果,更是出现异常时难以控制和处理的关键所在。

    /// ///循环侦听客户端请求,由于要用线程池,故带一个参数

    /// private void ListenClientRequest(objectstate)

    {

    Socket client

    = null;

    while (!_stopReceiver)

    {

    if (_stopConnectRequest)  //  停止客户端连接请求{

    if (_receiverSocket != null)

    {

    try

    {

    _receiverSocket.Close();

    // 强制关闭接收器}

    catch

    {

    this.OnReceiverException();

    }

    finally

    {

    // 必须为 null,否则 disposed 对象仍然存在,将引发下面的错误                    _receiverSocket = null;

    }

    }

    continue;

    }

    else

    {

    if (_receiverSocket == null)

    {

    if (!this.CreateReceiverSocket())

    {

    continue;

    }

    }

    }

    try

    {

    if(_receiverSocket.Poll(_loopWaitTime, SelectMode.SelectRead))

    {

    // 频繁关闭、启动时,这里容易产生错误(提示套接字只能有一个)                client =_receiverSocket.Accept();

    if (client != null &&client.Connected)

    {

    if (this._clientCount >= this._maxAllowClientCount)

    {

    this.OnReceiverException();

    try

    {

    client.Shutdown(SocketShutdown.Both);

    client.Close();

    }

    catch{ }

    }

    else if (CheckSameClientIP(client))  // 已存在该 IP 地址{

    try

    {

    client.Shutdown(SocketShutdown.Both);

    client.Close();

    }

    catch{ }

    }

    else

    {

    TSession session

    = newTSession(client);

    session.LoginTime

    =DateTime.Now;

    lock(_sessionTable)

    {

    int preSessionID =session.ID;

    while (true)

    {

    if (_sessionTable.ContainsKey(session.ID))  // 有可能重复该编号{

    session.ID

    = 100000 +preSessionID;

    }

    else

    {

    break;

    }

    }

    _sessionTable.Add(session.ID, session);

    // 登记该会话客户端                            Interlocked.Increment(ref_clientCount);

    }

    this.OnClientRequest();

    try  // 客户端连续连接或连接后立即断开,易在该处产生错误,系统忽略之{

    // 开始接受来自该客户端的数据                            session.ClientSocket.BeginReceive(session.ReceiveBuffer, 0,

    session.ReceiveBufferLength, SocketFlags.None, EndReceiveData, session);

    }

    catch

    {

    session.DisconnectType

    =TDisconnectType.Exception;

    session.State

    =TSessionState.NoReply;

    }

    }

    }

    else if (client != null)  // 非空,但没有连接(connected is false){

    try

    {

    client.Shutdown(SocketShutdown.Both);

    client.Close();

    }

    catch{ }

    }

    }

    }

    catch

    {

    this.OnReceiverException();

    if (client != null)

    {

    try

    {

    client.Shutdown(SocketShutdown.Both);

    client.Close();

    }

    catch{ }

    }

    }

    // 该处可以适当暂停若干毫秒}

    // 该处可以适当暂停若干毫秒}

    3.3  处理数据包

          该线程循环查看数据包队列,完成数据包的解析与存储等工作。具体实现时,如果队列中没有数据包,可以考虑等待若干毫秒,提高CPU利用率。

    private void HandleDatagrams(objectstate)

    {

    while (!_stopReceiver)

    {

    this.HandleOneDatagram();  // 处理一个数据包if (!_stopReceiver)

    {

    // 如果连接关闭,则重新建立,可容许几个连接错误出现            if (_sqlConnection.State ==ConnectionState.Closed)

    {

    this.OnReceiverWork();

    try

    {

    _sqlConnection.Open();

    }

    catch

    {

    this.OnReceiverException();

    }

    }

    }

    }

    }

    /// ///处理一个包数据,包括:验证、存储

    /// private voidHandleOneDatagram()

    {

    TDatagram datagram

    = null;

    lock(_datagramQueue)

    {

    if (_datagramQueue.Count > 0)

    {

    datagram

    = _datagramQueue.Dequeue();  // 取队列数据            Interlocked.Decrement(ref_datagramQueueCount);

    }

    }

    if (datagram == null) return;

    datagram.Clear();

    datagram

    = null;  // 释放对象}

    3.4  检查与清理会话

          本线程负责处理建立连接后的客户端会话TSession或Socket对象的关闭与资源清理工作,其它方法中出现异常等情况,尽可能标记相关TSession对象的属性NoReply=true,表示该会话已经无效、需要清理。 

           检查会话队列并清理资源分3步:第一步,Shutdown()客户端Socket,此时可能立即触发某些Socket的异步方法EndReceive();第二步,Close()客户端Socket,释放占用资源;第三步,从会话表中清除该会话对象。其中,第一步完成后,某个TSession也许不会立即到第二步,因为可能需要处理其异步结束方法。

          需要指出, 由于涉及多线程处理,需要频繁加解锁操作,清理工作前先建立一个会话队列列副本sessionTable2,检查与清理该队副本列列的TSession对象。

    /// ///检查客户端状态(扫描方式,若长时间无数据,则断开)

    /// private void CheckClientState(objectstate)

    {

    while (!_stopReceiver)

    {

    DateTime thisTime

    =DateTime.Now;

    // 建立一个副本 ,然后对副本进行操作        Hashtable sessionTable2 = newHashtable();

    lock(_sessionTable)

    {

    foreach (TSession session in_sessionTable.Values)

    {

    if (session != null)

    {

    sessionTable2.Add(session.ID, session);

    }

    }

    }

    foreach (TSession session in sessionTable2.Values)  // 对副本进行操作{

    Monitor.Enter(session);

    try

    {

    if (session.State == TSessionState.NoReply)  // 分三步清除一个 Session{

    session.State

    =TSessionState.Closing;

    if (session.ClientSocket != null)

    {

    try

    {

    // 第一步:shutdownsession.ClientSocket.Shutdown(SocketShutdown.Both);

    }

    catch{ }

    }

    }

    else if (session.State ==TSessionState.Closing)

    {

    session.State

    =TSessionState.Closed;

    if (session.ClientSocket != null)

    {

    try

    {

    // 第二步: Closesession.ClientSocket.Close();

    }

    catch{ }

    }

    }

    else if (session.State ==TSessionState.Closed)

    {

    lock(_sessionTable)

    {

    // 第三步:remove from table_sessionTable.Remove(session.ID);

    Interlocked.Decrement(

    ref_clientCount);

    }

    this.OnClientRequest();

    session.Clear();

    // 清空缓冲区}

    else if (session.State == TSessionState.Normal)  // 正常的会话 {

    TimeSpan ts

    =thisTime.Subtract(session.LastDataReceivedTime);

    if (Math.Abs(ts.TotalSeconds) > _maxSocketDataTimeout)  // 超时,则准备断开连接{

    session.DisconnectType

    =TDisconnectType.Timeout;

    session.State

    = TSessionState.NoReply;  // 标记为将关闭、准备断开}

    }

    }

    finally

    {

    Monitor.Exit(session);

    }

    }

    // end foreach

    sessionTable2.Clear();

    }

    // end while}

    4 、结语

         基于多线程处理的系统代价是比较大的,需要经常调用加/解锁方法lock()或Monitor.Enter(),需要经常创建处理线程等。从实际运行效果看,笔者的实现方案有较好的稳定性:2005年4月到5月间,在一个普通PC机器上连续运行30多天不出一点故障。同时,笔者采用了时序区间判重等算法,有效地提高了系统处理与响应速度。测试表明,在普通的PC机器(P4 2.0)上,可以做到0.5秒处理一个数据包,如果优化代码和服务器,还有较大的性能提升空间。

         上面的代码是笔者实现的省级公路交通流量数据服务中心(DSC)项目中的接收服务器框架部分,整个系统还包括:数据转发交通部的转发服务器、数据远程查询客户端、综合报表数据处理系统、数据在线发布系统、系统运行监控系统等。

         实际的接收服务器类及其辅助类超过3K行,整个系统则超过了60K。因为是早期实现的程序,难免有代码粗糙、方法欠妥的感觉,只有留待下个版本完善扩充了。由于与甲方有保密合同和版权保护等,不可能公开全部源代码,删减也有不当之处,读者发现时请不吝指正。下面是带详细注释的代码下载URL。

    下载框架源码

    附注:笔者补充了有关数据包界限、间断、重叠等内容,请参考指正。

    Tag标签:技术

    C# 实现的多线程异步Socket数据包接收器框架(补记)

    国庆假日的最后一天,用近9个小时写完了C# 实现的多线程异步Socket数据包接收器框架(包括删减代码的时间)。饭后散步回来再看,好家伙,有300多个Page Views了,超过笔者在codeproject上首日前几个小时的PV速度了。呵呵,如果发表在笔者原博客网上,估计就是自己反反复复修改记录的数十个PV了!终究是彼网牛人高手太多。

         散步时仔细想想该文,发觉有三个Socket通信中关键与著名的问题没有讲到或没有讲清楚:

    数据包界限符问题。根据原项目中交通部标准,在连续观测站中数据包中,使用<>两个字符表示有效数据包开始和结束。实际项目有各自的具体技术规范

    数据包不连续问题。在TCP/IP等通信中,由于时延等原因,一个数据包被Socket做两次或多次接收,此时在接收第一个包后,必须保存到TSession的DatagramBuffer中,在以后一并处理

    包并发与重叠问题。由于客户端发送过快或设备故障等原因,一次接收到一个半、两个或多个包文。此时,也需要处理、一个半、两个或多个包

         先补充异步BeginReceive()回调函数EndReceiveData()中的数据包分合函数ResolveBuffer()。

    /// /// 1) 报文界限字符为<>,其它为合法字符,

    ///2) 按报文头、界限标志抽取报文,可能合并包文

    ///3) 如果一次收完数据,此时 DatagramBuffer 为空

    ///4) 否则转存到包文缓冲区 session.DatagramBuffer

    /// private void ResolveBuffer(TSession session, intreceivedSize)

    {

    // 上次留下的报文缓冲区非空(注意:必然含有开始字符 <,空时不含 <)    bool hasBeginChar = (session.DatagramBufferLength > 0);

    int packPos = 0;  // ReceiveBuffer 缓冲区中包的开始位置    int packLen = 0;  // 已经解析的接收缓冲区大小byte dataByte = 0;  // 缓冲区字节    int subIndex = 0;   // 缓冲区下标while (subIndex 

    {

    // 接收缓冲区数据,要与报文缓冲区 session.DatagramBuffer 同时考虑        dataByte =session.ReceiveBuffer[subIndex];

    if (dataByte == TDatagram.BeginChar) // 是数据包的开始字符<,则前面的包文均要放弃{

    // <前面有非空串(包括报文缓冲区),则前面是错包文,防止 AAA 两个报文一次读现象            if (packLen > 0)

    {

    Interlocked.Increment(

    ref _datagramCount);       // 前面有非空字符                Interlocked.Increment(ref _errorDatagramCount);  // 一个错误包                this.OnDatagramError();

    }

    session.ClearDatagramBuffer();

    // 清空会话缓冲区,开始一个新包

    packPos

    = subIndex;   // 新包起点,即<所在位置            packLen = 1;          // 新包的长度(即<)            hasBeginChar = true;  // 新包有开始字符}

    else if (dataByte == TDatagram.EndChar)  // 数据包的结束字符 >{

    if (hasBeginChar)  // 两个缓冲区中有开始字符<{

    ++packLen;  //长度包括结束字符>

    // >前面的为正确格式的包,则分析该包,并准备加入包队列AnalyzeOneDatagram(session, packPos, packLen);

    packPos

    = subIndex + 1;  // 新包起点。注意:subIndex 在循环最后处 + 1                packLen = 0;             // 新包长度}

    else  // >前面没有开始字符,则认为结束字符>为一般字符,待后续的错误包处理{

    ++packLen;  //  hasBeginChar = false;}

    }

    else  // 非界限字符<>,就是是一般字符,长度 + 1,待解析包处理{

    ++packLen;

    }

    ++subIndex;  // 增加下标号    }  // end whileif (packLen > 0)  // 剩下的待处理串,分两种情况{

    // 剩下包文,已经包含首字符且不超长,转存到包文缓冲区中,待下次处理        if (hasBeginChar && packLen + session.DatagramBufferLength <=_maxDatagramSize)

    {

    session.CopyToDatagramBuffer(packPos, packLen);

    }

    else  // 不含首字符,或超长{

    Interlocked.Increment(

    ref_datagramCount);

    Interlocked.Increment(

    ref_errorDatagramCount);

    this.OnDatagramError();

    session.ClearDatagramBuffer();

    // 丢弃全部数据}

    }

    }

         分析包文AnalyzeOneDatagram()函数代码补充如下:     

    /// /// 具有<

    >格式的数据包加入到队列中

    /// private void AnalyzeOneDatagram(TSession session, int packPos, intpackLen)

    {

    if (packLen + session.DatagramBufferLength > _maxDatagramSize)  // 超过长度限制{

    Interlocked.Increment(

    ref_datagramCount);

    Interlocked.Increment(

    ref_errorDatagramCount);

    this.OnDatagramError();

    }

    else // 一个首尾字符相符的包,此时需要判断其类型{

    Interlocked.Increment(

    ref_datagramCount);

    TDatagram datagram

    = newTDatagram();

    if (!datagram.CheckDatagramKind())  // 包格式错误(只能是短期BG、或长期SG包){

    Interlocked.Increment(

    ref_datagramCount);

    Interlocked.Increment(

    ref_errorDatagramCount);

    this.OnDatagramError();

    datagram

    = null;  // 丢弃当前包}

    else  // 实时包、定期包,先解析数据,判断正误,并发回确认包{

    datagram.ResolveDatagram();

    if (true)  // 正确的包才入包队列{

    Interlocked.Increment(

    ref_datagramQueueCount);

    lock(_datagramQueue)

    {

    _datagramQueue.Enqueue(datagram);

    // 数据包入队列}

    }

    else

    {

    Interlocked.Increment(

    ref_errorDatagramCount);

    this.OnDatagramError();

    }

    }

    }

    session.ClearDatagramBuffer();

    // 清包文缓冲区}

         TSession的拷贝转存数据包文的方法CopyToDatagramBuffer()代码如下:

    /// ///拷贝接收缓冲区的数据到数据缓冲区(即多次读一个包文)

    /// public void CopyToDatagramBuffer(int startPos, intpackLen)

    {

    int datagramLen = 0;

    if (DatagramBuffer != null) datagramLen =DatagramBuffer.Length;

    // 调整长度(DataBuffer 为 null 不会出错)    Array.Resize(ref DatagramBuffer, datagramLen +packLen);

    // 拷贝到数据就缓冲区Array.Copy(ReceiveBuffer, startPos, DatagramBuffer, datagramLen, packLen);

    }

         代码中注释比较详细了,下面指出其思路:

    使用TSession会话对象的字节数组ReceiveBuffer保存BeginReceiver()接收到的数据,使用字节数组DatagramBuffer保存一次接收后分解或合并的剩下的包文。本项目中,由于是5分钟一个包,正常情况下不需要用到DatagramBuffer数组

    处理ReceiveBuffer中的字节数据包时,先考虑DatagramBuffer是否有开始字符<。如果有,则当前包文是前个包文的补充,否则前个包文是错误的。正确的包文可能存在于两个缓冲区中,见分析函数AnalyzeOneDatagram()

    分析完接收数据包后,剩下的转存到DatagramBuffer中,见函数CopyToDatagramBuffer()

         设计时考虑的另一个重要问题就是处理速度。如果自动观测站达到100个,此时5*60=300秒钟就有100个包,即每3秒种一个包,不存在处理速度慢问题。但是,真正耗时的是判断包是否重复!特别地,当设备故障时存在混乱上传数据包现象,此时将存在大量的重复包。笔者采用了所谓的区间判重算法,较好地解决了判重速度问题,使得系统具有很好的可伸缩性(分析算法的论文被EI核心版收录,呵呵,意外收获)。事实上,前年的交通部接收服务器还不具备该项功能,可能是太费时间了。

         还有,就是在.NET Framework的托管CLR下,系统本身的响应速度如何?当时的确没有把握,认为只要稳定性和速度满足要求就行了。三年半运行情况表明,系统有良好的处理速度、很好的稳定性、满足了部省要求。

    相关文章

      网友评论

        本文标题:C# 实现的多线程异步Socket数据包接收qi框架

        本文链接:https://www.haomeiwen.com/subject/rmnkbftx.html