美文网首页
c#Socket服务端异步高性能并发 解决 半包 粘包问题 封装

c#Socket服务端异步高性能并发 解决 半包 粘包问题 封装

作者: DF_Sky | 来源:发表于2018-08-12 19:11 被阅读0次
/// <summary>
/// Socket连接,双向通信
/// </summary>
public class SocketConnection
{
    #region 构造函数

    public SocketConnection(Socket socket, SocketServer server)
    {
        _socket = socket;
        _server = server;
    }

    #endregion

    public Socket ClientSocket { get { return _socket; } }
    public bool IsCheck { get; set; }//该客户是否通过首次验证  否则无法进行后续操作
    #region 私有成员

    private readonly Socket _socket;
    private bool _isRec = true;
    private SocketServer _server = null;
    private byte[] Tmp_byteArr = new byte[0];//缓存 多余 或者 不完整 的封包数据
    private bool IsSocketConnected()
    {
        bool part1 = _socket.Poll(1000, SelectMode.SelectRead);
        bool part2 = (_socket.Available == 0);
        if (part1 && part2)
            return false;
        else
            return true;
    }

    #endregion

    #region 外部接口

    /// <summary>
    /// 开始接受客户端消息
    /// </summary>
    public void StartRecMsg()
    {
        try
        {
            byte[] container = new byte[1024 * 1024 * 2];
            _socket.BeginReceive(container, 0, container.Length, SocketFlags.None, asyncResult =>
            {
                try
                {
                    int length = _socket.EndReceive(asyncResult);

                    //马上进行下一轮接受,增加吞吐量
                    if (length > 0 && _isRec && IsSocketConnected())
                        StartRecMsg();

                    if (length > 0)
                    {
                        byte[] recBytes = new byte[length];
                        Array.Copy(container, 0, recBytes, 0, length);
                        ParMsg(recBytes);

                        ////处理消息 这里我把他取消了 放到了下面消息解析里面触发 当解析出完整数据时再触发 这样订阅事件的地方不用在做处理
                        //HandleRecMsg?.Invoke(recBytes, this, _server);
                    }
                    else
                        Close();
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                    Close();
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
            Close();
        }
    }

    void ParMsg(byte[] arr)
    {
        List<byte> result = new List<byte>();//之后所有的操作数据都保存在这  list方便点
        if (Tmp_byteArr.Length > 0)//判断之前是否有保存多余数据
        {
            result.AddRange(Tmp_byteArr);
        }
        result.AddRange(arr);
        if (result.Count <= 4)//前4个字节代表包长 如果不够则表示包不完整 保存起来
        {
            Tmp_byteArr = result.ToArray();
            return;
        }
        int DataLength = BitConverter.ToInt32(Common.SubByte(result.ToArray(), 0, 4), 0);//包长 但不含这个包长本身
        if (DataLength == result.Count - 4)//如果相等 则刚好是一个完整的包
        {
            MsgBody msg = new MsgBody(result.ToArray());
            //处理消息 
            HandleRecMsg?.Invoke(msg, this, _server);
            return;
        }
        if (DataLength > result.Count - 4)//如果包长 大于实际消息长度 说明包不完整 则保存至下次使用
        {
            Tmp_byteArr = result.ToArray();
            return;
        }
        if (DataLength < result.Count - 4)//如果包长 小于实际消息长度 说明存在了粘包 要把多余的内容递归使用
        {
            MsgBody msg = new MsgBody(result.Take(DataLength).ToArray());
            //处理消息 
            HandleRecMsg?.Invoke(msg, this, _server);
            ParMsg(result.Skip(DataLength).ToArray());
        }

    }

    /// <summary>
    /// 发送数据
    /// </summary>
    /// <param name="bytes">数据字节</param>
    public void Send(byte[] bytes)
    {
        try
        {
            _socket.BeginSend(bytes, 0, bytes.Length, SocketFlags.None, asyncResult =>
            {
                try
                {
                    int length = _socket.EndSend(asyncResult);
                    HandleSendMsg?.Invoke(bytes, this, _server);
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    /// <summary>
    /// 发送字符串(默认使用UTF-8编码)
    /// </summary>
    /// <param name="msgStr">字符串</param>
    public void Send(string msgStr)
    {
        Send(Encoding.UTF8.GetBytes(msgStr));
    }

    /// <summary>
    /// 发送字符串(使用自定义编码)
    /// </summary>
    /// <param name="msgStr">字符串消息</param>
    /// <param name="encoding">使用的编码</param>
    public void Send(string msgStr, Encoding encoding)
    {
        Send(encoding.GetBytes(msgStr));
    }

    /// <summary>
    /// 传入自定义属性
    /// </summary>
    public object Property { get; set; }

    /// <summary>
    /// 关闭当前连接
    /// </summary>
    public void Close()
    {
        try
        {
            _isRec = false;
            _socket.Disconnect(false);
            _server.ClientList.Remove(this);
            HandleClientClose?.Invoke(this, _server);
            _socket.Close();
            _socket.Dispose();
            GC.Collect();
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    #endregion

    #region 事件处理

    /// <summary>
    /// 客户端连接接受新的消息后调用
    /// </summary>
    public Action<MsgBody, SocketConnection, SocketServer> HandleRecMsg { get; set; }

    /// <summary>
    /// 客户端连接发送消息后回调
    /// </summary>
    public Action<byte[], SocketConnection, SocketServer> HandleSendMsg { get; set; }

    /// <summary>
    /// 客户端连接关闭后回调
    /// </summary>
    public Action<SocketConnection, SocketServer> HandleClientClose { get; set; }

    /// <summary>
    /// 异常处理程序
    /// </summary>
    public Action<Exception> HandleException { get; set; }

    #endregion
}


/// <summary>
/// Socket服务端
/// </summary>
public class SocketServer
{
    #region 构造函数

    /// <summary>
    /// 构造函数
    /// </summary>
    /// <param name="ip">监听的IP地址</param>
    /// <param name="port">监听的端口</param>
    public SocketServer(string ip, int port)
    {
        _ip = ip;
        _port = port;
    }

    /// <summary>
    /// 构造函数,监听IP地址默认为本机0.0.0.0
    /// </summary>
    /// <param name="port">监听的端口</param>
    public SocketServer(int port)
    {
        _ip = "0.0.0.0";
        _port = port;
    }

    #endregion

    #region 内部成员

    private Socket _socket = null;
    private string _ip = "";
    private int _port = 0;
    private bool _isListen = true;
    private void StartListen()
    {
        try
        {
            _socket.BeginAccept(asyncResult =>
            {
                try
                {
                    Socket newSocket = _socket.EndAccept(asyncResult);

                    //马上进行下一轮监听,增加吞吐量
                    if (_isListen)
                        StartListen();

                    SocketConnection newClient = new SocketConnection(newSocket, this)
                    {
                        HandleRecMsg = HandleRecMsg == null ? null : new Action<MsgBody, SocketConnection, SocketServer>(HandleRecMsg),
                        HandleClientClose = HandleClientClose == null ? null : new Action<SocketConnection, SocketServer>(HandleClientClose),
                        HandleSendMsg = HandleSendMsg == null ? null : new Action<byte[], SocketConnection, SocketServer>(HandleSendMsg),
                        HandleException = HandleException == null ? null : new Action<Exception>(HandleException)
                    };

                    newClient.StartRecMsg();
                    ClientList.AddLast(newClient);

                    HandleNewClientConnected?.Invoke(this, newClient);
                }
                catch (Exception ex)
                {
                    HandleException?.Invoke(ex);
                }
            }, null);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    #endregion

    #region 外部接口

    /// <summary>
    /// 开始服务,监听客户端
    /// </summary>
    public void StartServer()
    {
        try
        {
            //实例化套接字(ip4寻址协议,流式传输,TCP协议)
            _socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
            //创建ip对象
            IPAddress address = IPAddress.Parse(_ip);
            //创建网络节点对象包含ip和port
            IPEndPoint endpoint = new IPEndPoint(address, _port);
            //将 监听套接字绑定到 对应的IP和端口
            _socket.Bind(endpoint);
            //设置监听队列长度为Int32最大值(同时能够处理连接请求数量)
            _socket.Listen(int.MaxValue);
            //开始监听客户端
            StartListen();
            HandleServerStarted?.Invoke(this);
        }
        catch (Exception ex)
        {
            HandleException?.Invoke(ex);
        }
    }

    /// <summary>
    /// 所有连接的客户端列表
    /// </summary>
    public LinkedList<SocketConnection> ClientList { get; set; } = new LinkedList<SocketConnection>();

    /// <summary>
    /// 关闭指定客户端连接
    /// </summary>
    /// <param name="theClient">指定的客户端连接</param>
    public void CloseClient(SocketConnection theClient)
    {
        theClient.Close();
    }

    #endregion

    #region 公共事件

    /// <summary>
    /// 异常处理程序
    /// </summary>
    public Action<Exception> HandleException { get; set; }

    #endregion

    #region 服务端事件

    /// <summary>
    /// 服务启动后执行
    /// </summary>
    public Action<SocketServer> HandleServerStarted { get; set; }

    /// <summary>
    /// 当新客户端连接后执行
    /// </summary>
    public Action<SocketServer, SocketConnection> HandleNewClientConnected { get; set; }

    /// <summary>
    /// 服务端关闭客户端后执行
    /// </summary>
    public Action<SocketServer, SocketConnection> HandleCloseClient { get; set; }

    #endregion

    #region 客户端连接事件

    /// <summary>
    /// 客户端连接接受新的消息后调用
    /// </summary>
    public Action<MsgBody, SocketConnection, SocketServer> HandleRecMsg { get; set; }

    /// <summary>
    /// 客户端连接发送消息后回调
    /// </summary>
    public Action<byte[], SocketConnection, SocketServer> HandleSendMsg { get; set; }

    /// <summary>
    /// 客户端连接关闭后回调
    /// </summary>
    public Action<SocketConnection, SocketServer> HandleClientClose { get; set; }

    #endregion
}

public class MsgBody
{
public MsgBody()
{

    }
    public MsgBody(string str)
    {
        this.BodyData = str;
    }
    public MsgBody(byte[] b)
    {
        this.Source = b;
        SetValues();
    }

    /// <summary>
    /// 包长
    /// </summary>
    public int DataLength;
    public byte[] DataLength_Byte;

    /// <summary>
    /// 包长
    /// </summary>
    public int DataLength_2;
    public byte[] DataLength_2_Byte;

    /// <summary>
    /// 命令字 用来标记消息类型
    /// </summary>
    public int Cmd;
    public byte[] Cmd_Byte;

    /// <summary>
    /// 发生的文本内容
    /// </summary>
    public string BodyData;
    public byte[] BodyData_Byte;

    public byte End = 0;

    public byte[] Source;

    /// <summary>
    /// 数据组合为byte数组
    /// </summary>
    /// <returns></returns>
    public byte[] ToByteArray()
    {
        BodyData_Byte = Encoding.UTF8.GetBytes(BodyData);
        Cmd_Byte = BitConverter.GetBytes(Cmd);
        DataLength = DataLength_2 = BodyData_Byte.Length + Cmd_Byte.Length + 4 + 1;
        DataLength_Byte = BitConverter.GetBytes(DataLength);
        DataLength_2_Byte = BitConverter.GetBytes(DataLength_2);

        byte[] result = new byte[DataLength + 4];
        Array.Copy(DataLength_Byte, 0, result, 0, 4);
        Array.Copy(DataLength_2_Byte, 0, result, 4, 4);
        Array.Copy(Cmd_Byte, 0, result, 8, 4);
        Array.Copy(BodyData_Byte, 0, result, 12, BodyData_Byte.Length);
        Source = new byte[result.Length];
        Array.Copy(result, 0, Source, 0, result.Length);
        return result;
    }

    /// <summary>
    /// 数据解析
    /// </summary>
    public void SetValues()
    {
        try
        {
            DataLength_Byte = Common.SubByte(Source, 0, 4);
            DataLength_2_Byte = Common.SubByte(Source, 4, 4);
            Cmd_Byte = Common.SubByte(Source, 8, 4);
            BodyData_Byte = Common.SubByte(Source, 12, Source.Length - 13);
            End = Common.SubByte(Source, Source.Length - 2, 1)[0];

            DataLength = BitConverter.ToInt32(DataLength_Byte, 0);
            DataLength_2 = BitConverter.ToInt32(DataLength_2_Byte, 0);
            Cmd = BitConverter.ToInt32(Cmd_Byte, 0);
            BodyData = Encoding.UTF8.GetString(BodyData_Byte);
        }
        catch (Exception ex)
        {

            throw;
        }
    }
}

相关文章

  • c#Socket服务端异步高性能并发 解决 半包 粘包问题 封装

    public class MsgBody{public MsgBody(){

  • 基于netty的RPC实现

    这里解决了三个问题 协议定义,解决 粘包/拆包 问题 单客户端并发发送/消息维护问题 服务端并发提供服务问题 三个...

  • netty的编解码

    什么是拆包/粘包 TCP 粘包/拆包 半包:读取的数据不是一个数据包粘包:读取的数据超过一个数据包 粘包问题的解决...

  • 2.Netty基本--TCP粘包半包如何解决,协议和序列化到底啥

    [toc] 1.什么是TCP粘包和半包?为什么会出现粘包和半包? 粘包: 粘包半包 混合双打: 首先TCP是可靠有...

  • 企业IT架构笔记7 异步化

    核心设计思路:串行、同步 --> 异步、并行!解决大容量高并发、及高性能问题。 业务处理异步化:通过业务处理流程异...

  • Netty处理半包粘包的解码思路

    Netty处理半包粘包的解码思路 前言 写netty通讯的都不免遇到半包粘包情况。那么半包粘包是什么意思呢?我们可...

  • Go Call

    实现支持异步和并发的高性能客户端 Call 使用Call封装通道来实现异步通知 封装结构体Call用于承载一次客户...

  • Netty 半包,粘包处理

    基于TCP协议处理网络数据经常面对半包和粘包问题,那么什么是半包问题,什么是粘包问题呢?应用层消息在被发送到网络之...

  • Netty粘包/半包问题

    在netty中经常会出现粘包/半包问题? 1.应用程序写入的数据大于套接字缓冲区大小, 这将导致半包现象2.应用程...

  • TCP 半包粘包问题

    什么是粘包现象 TCP 粘包是指发送方发送的若干包数据到接收方接收时粘成一包,从接收缓冲区看,后一包数据的头紧接着...

网友评论

      本文标题:c#Socket服务端异步高性能并发 解决 半包 粘包问题 封装

      本文链接:https://www.haomeiwen.com/subject/refebftx.html