美文网首页
TCP解决粘包分包,简单封装

TCP解决粘包分包,简单封装

作者: SeatonLv | 来源:发表于2023-12-20 15:01 被阅读0次

1.TCP粘包拆包原因

1.1TCP粘包原因

TCP粘包是指在数据传输过程中,短时间发送多个小数据包被合并成一个大的数据包,

包1
包2包3
包4

1.2TCP分包原因

TCP分包发送方一次发送大数据包被拆分成多个小的数据包,

包1_1
包1_2

1.3解决方法

1.包固定长度
2.包结尾特定字符分割如\n
3.消息头包含包长度

2.具体实现

在这使用特殊字符分割,缓存数组,
1.在拿到多个分隔符时候,通过分隔符解析多条消息。
2.在没有拿到分隔符时候,继续缓存,直到拿到分隔符,保证完整包。

服务器代码:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

class Server
{
    static string delimiter = "\r\n";
    static List<byte> leftoverData = new List<byte>();

    public void Start()
    {
        int port = 1234;
        TcpListener listener = new TcpListener(IPAddress.Any, port);
        listener.Start();
        Console.WriteLine("Server started. Waiting for connections...");

        TcpClient client = listener.AcceptTcpClient();
        Console.WriteLine("Client connected.");

        // 启动发送消息的任务
        Task sendTask = Task.Run(() => SendMessages(client));

        // 启动接收消息的任务
        Task receiveTask = Task.Run(() => ReceiveMessages(client));

        // 等待发送和接收任务完成
        Task.WaitAll(sendTask, receiveTask);

        client.Close();
        listener.Stop();
        Console.WriteLine("Server stopped.");
    }

    static void SendMessages(TcpClient client)
    {
        NetworkStream stream = client.GetStream();
        string[] messages = { "Hello", "以下是移除了文本中所有换行和空格的版本:当我们展望未来,科技的进步和创新无疑将引领着人类走向一个全新的世界。以下是一段关于未来科技的随机文本,长约800字:未来科技的世界将是一个充满奇迹和无限可能的地方。人工智能、生物技术、量子计算等前沿领域的突破将改变我们的生活方式、推动社会发展,并带来前所未有的机遇和挑战。人工智能将成为未来的核心技术。智能机器人将在各个领域发挥重要作用,从医疗保健到交通运输,从教育到家庭生活,无所不在。智能机器人不仅能够执行各种任务,还能与人类进行交互和合作。他们将成为我们的助手、朋友和教师,为我们提供个性化的服务和支持。生物技术的进步将彻底改变医学和生命科学。基因编辑技术的突破将使得人类能够更精确地治疗疾病,并可能解决一些遗传性疾病的根本问题。仿生技术的发展将使得人类能够模仿自然界的优秀设计,创造出更强大、更灵活的机器和材料。生物打印技术将使得人体器官的定制化制造成为可能,解决器官移植的瓶颈问题。量子计算的崛起将彻底改变计算机科学和信息技术。量子计算机的运算速度将远远超过传统计算机,解决那些传统计算机无法应对的复杂问题。密码学、材料科学、化学合成等领域将从量子计算的进步中获益。同时,量子通信技术将实现绝对安全的通信,保护个人隐私和商业机密。能源技术的创新将推动可持续发展和应对气候变化。太阳能、风能等可再生能源将成为主流,取代传统的化石燃料。能源储存技术的突破将解决可再生能源的不稳定性问题,实现可持续能源的大规模应用。核聚变技术的实现将为人类提供清洁、高效、安全的能源来源。虚拟现实和增强现实技术将改变人们的感知和体验。我们将进入一个全新的数字世界,与虚拟对象进行互动、探索虚拟环境、参与虚拟社交。", "12345" };

        foreach (string message in messages)
        {
            byte[] data = Encoding.UTF8.GetBytes(message + delimiter);
            stream.Write(data, 0, data.Length);
            Console.WriteLine("Sent message: " + message);

            Task.Delay(1000).Wait(); // 模拟发送间隔
        }

        // for (int i = 0; i < 5000; i++)
        // {
        //     byte[] data = Encoding.UTF8.GetBytes($"server back:{i}{delimiter}" );
        //     stream.Write(data, 0, data.Length);
        // }
    }

    static async Task ReceiveMessages(TcpClient client)
    {
        NetworkStream stream = client.GetStream();
        byte[] buffer = new byte[4096];
        int bytesRead = 0;

        while (true)
        {
            try
            {
                bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
                if (bytesRead == 0)
                {
                    break;
                }

                ProcessReceivedData(buffer, bytesRead);
            }
            catch (Exception ex)
            {
                Console.WriteLine("Error reading data: " + ex.Message);
                break;
            }
        }
    }

    static void ProcessReceivedData(byte[] receivedBytes, int bytesRead)
    {
        leftoverData.AddRange(receivedBytes.Take(bytesRead));

        while (true)
        {
            int delimiterIndex = FindDelimiterIndex(leftoverData, delimiter);

            if (delimiterIndex == -1)
            {
                break; // 没有找到完整的消息
            }

            byte[] messageBytes = leftoverData.GetRange(0, delimiterIndex).ToArray();
            string message = Encoding.UTF8.GetString(messageBytes);
            Console.WriteLine("Received message: " + message);

            // 在这里进行消息处理逻辑
            // ...

            leftoverData.RemoveRange(0, delimiterIndex + delimiter.Length);
        }
    }

    static int FindDelimiterIndex(List<byte> data)
    {
        byte[] delimiterBytes = Encoding.UTF8.GetBytes(delimiter);
        int delimiterLength = delimiterBytes.Length;

        for (int i = 0; i <= data.Count - delimiterLength; i++)
        {
            bool isDelimiter = true;
            for (int j = 0; j < delimiterLength; j++)
            {
                if (data[i + j] != delimiterBytes[j])
                {
                    isDelimiter = false;
                    break;
                }
            }

            if (isDelimiter)
            {
                return i;
            }
        }

        return -1;
    }

    #region 高效kmp算法

    private static int FindDelimiterIndex(List<byte> data, string delimiter)
    {
        if (data.Count < delimiter.Length)
            return -1;

        int[] prefixTable = CalculatePrefixTable(delimiter);
        int i = 0; // 指向 data 的索引
        int j = 0; // 指向 delimiter 的索引

        while (i < data.Count)
        {
            if (data[i] == delimiter[j])
            {
                i++;
                j++;

                if (j == delimiter.Length)
                    return i - j; // 找到完整的消息,返回起始索引
            }
            else if (j > 0)
            {
                j = prefixTable[j - 1]; // 利用前缀表回退到前一个匹配位置
            }
            else
            {
                i++;
            }
        }

        return -1; // 没有找到完整的消息
    }

    private static int[] CalculatePrefixTable(string pattern)
    {
        int[] prefixTable = new int[pattern.Length];
        int length = 0;
        int i = 1;

        prefixTable[0] = 0; // 第一个元素的前缀长度为0

        while (i < pattern.Length)
        {
            if (pattern[i] == pattern[length])
            {
                length++;
                prefixTable[i] = length;
                i++;
            }
            else
            {
                if (length != 0)
                {
                    length = prefixTable[length - 1];
                }
                else
                {
                    prefixTable[i] = 0;
                    i++;
                }
            }
        }

        return prefixTable;
    }

    #endregion
}

客户端代码

using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Unity.VisualScripting;
using UnityEngine;

namespace NFramework.MyTCP
{
    public class MyClientPeer
    {
        static string delimiter = "\r\n"; // 分隔符
        static List<byte> leftoverData = new List<byte>();
        string serverIP = "127.0.0.1";
        int port = 1234;

        private TcpClient client;
        private NetworkStream stream;

        private bool isConnected = false;
        private int maxReconnectAttempts = 7;
        private int currentReconnectAttempt = 0;


        private readonly SemaphoreSlim semaphore = new SemaphoreSlim(1, 1);

        public void InitConnect(string ip, int portT)
        {
            serverIP = ip;
            port = portT;

            isConnected = false;
            currentReconnectAttempt = 0;

            while (!isConnected && currentReconnectAttempt < maxReconnectAttempts)
            {
                try
                {
                    client = new TcpClient(serverIP, port);
                    stream = client.GetStream();
                    isConnected = true;

                    Debug.Log($"Connected to server. serverIP:{serverIP}//port:{port}");

                    // 启动接收消息的线程
                    Task task = Task.Run(() => ReceiveMessagesAsync(client));
                    //ReceiveMessages();
                }
                catch (Exception ex)
                {
                    Debug.LogError($"Error connecting to server: {ex.Message}. Reconnecting...");
                    Close();

                    // 等待一段时间后尝试重新连接
                    Task.Delay(TimeSpan.FromSeconds(2));
                    currentReconnectAttempt++;
                }
            }

            if (!isConnected)
            {
                Debug.LogError($"Failed to connect after {maxReconnectAttempts} attempts.");
            }

            Debug.LogError($"connectEnd");
        }


        public void Close()
        {
            isConnected = false;
            if (client != null)
            {
                client.Close();
                Debug.Log("Disconnected from server.");
            }

            if (stream != null)
            {
                stream.Close();
            }
        }

        public void SendMessages(string dataMsg)
        {
            if (client == null || !client.Connected)
            {
                Debug.LogError("Client is not connected. Reconnecting...");
                Reconnect();
                return;
            }

            try
            {
                Debug.LogError($"dataMsg:{dataMsg}");
                byte[] data = Encoding.UTF8.GetBytes(dataMsg + delimiter);
                stream.Write(data, 0, data.Length);
            }
            catch (Exception e)
            {
                Debug.LogError("Error sending data: " + e.Message);
                Close();
            }
        }

        public async Task SendMessagesAsync(string dataMsg)
        {
            if (client == null || !client.Connected)
            {
                Debug.LogError("Client is not connected. Reconnecting...");
                Reconnect();
                return;
            }

            Debug.LogError($"dataMsg:{dataMsg}");
            byte[] data = Encoding.UTF8.GetBytes(dataMsg + delimiter);

            try
            {
                await semaphore.WaitAsync();
                await stream.WriteAsync(data, 0, data.Length);
            }
            catch (Exception ex)
            {
                Debug.LogError("Error sending data: " + ex.Message);
                Close();
            }
            finally
            {
                semaphore.Release();
            }
        }

        private void Reconnect()
        {
            Close();
            InitConnect(serverIP, port);
        }


        async Task ReceiveMessagesAsync(TcpClient client)
        {
            byte[] buffer = new byte[40960];
            int bytesRead = 0;

            while (true)
            {
                try
                {
                    bytesRead = await stream.ReadAsync(buffer, 0, buffer.Length);
                    if (bytesRead == 0)
                    {
                        break;
                    }

                    ProcessReceivedData(buffer, bytesRead);
                }
                catch (Exception ex)
                {
                    Debug.LogError("Error reading data: " + ex.Message);
                    Close();
                    break;
                }
            }
        }

        private void ReceiveMessages()
        {
            byte[] buffer = new byte[40960];
            int bytesRead = 0;

            while (true)
            {
                try
                {
                    bytesRead = stream.Read(buffer, 0, buffer.Length);
                    if (bytesRead == 0)
                    {
                        continue;
                    }

                    ProcessReceivedData(buffer, bytesRead);
                }
                catch (Exception ex)
                {
                    Debug.LogError("Error reading data: " + ex.Message);
                    Close();
                    break;
                }
            }
        }

        void ProcessReceivedData(byte[] receivedBytes, int bytesRead)
        {
            List<byte> localLeftoverData = new List<byte>(leftoverData);
            localLeftoverData.AddRange(receivedBytes.Take(bytesRead));

            while (true)
            {
                int delimiterIndex = FindDelimiterIndex(localLeftoverData);
                if (delimiterIndex == -1)
                {
                    break; // 没有找到完整的消息
                }

                byte[] messageBytes = localLeftoverData.GetRange(0, delimiterIndex).ToArray();
                string message = Encoding.UTF8.GetString(messageBytes);
                Debug.LogError("Received message: " + message);

                // 在这里进行消息处理逻辑
                // ...

                localLeftoverData.RemoveRange(0, delimiterIndex + delimiter.Length);
            }
        }

        int FindDelimiterIndex(List<byte> data)
        {
            byte[] delimiterBytes = Encoding.UTF8.GetBytes(delimiter);
            int delimiterLength = delimiterBytes.Length;

            for (int i = 0; i <= data.Count - delimiterLength; i++)
            {
                bool isDelimiter = true;
                for (int j = 0; j < delimiterLength; j++)
                {
                    if (data[i + j] != delimiterBytes[j])
                    {
                        isDelimiter = false;
                        break;
                    }
                }

                if (isDelimiter)
                {
                    return i;
                }
            }

            return -1;
        }

        #region 高效kmp算法

        //https://www.bilibili.com/video/BV1Qb411h7U6/?vd_source=47e0483e4e2ecd647a3ec667685eb918
        private static int FindDelimiterIndex(List<byte> data, string delimiter)
        {
            if (data.Count < delimiter.Length)
                return -1;

            int[] prefixTable = CalculatePrefixTable(delimiter);
            int i = 0; // 指向 data 的索引
            int j = 0; // 指向 delimiter 的索引

            while (i < data.Count)
            {
                if (data[i] == delimiter[j])
                {
                    i++;
                    j++;

                    if (j == delimiter.Length)
                        return i - j; // 找到完整的消息,返回起始索引
                }
                else if (j > 0)
                {
                    j = prefixTable[j - 1]; // 利用前缀表回退到前一个匹配位置
                }
                else
                {
                    i++;
                }
            }

            return -1; // 没有找到完整的消息
        }

        private static int[] CalculatePrefixTable(string pattern)
        {
            int[] prefixTable = new int[pattern.Length];
            int length = 0;
            int i = 1;

            prefixTable[0] = 0; // 第一个元素的前缀长度为0

            while (i < pattern.Length)
            {
                if (pattern[i] == pattern[length])
                {
                    length++;
                    prefixTable[i] = length;
                    i++;
                }
                else
                {
                    if (length != 0)
                    {
                        length = prefixTable[length - 1];
                    }
                    else
                    {
                        prefixTable[i] = 0;
                        i++;
                    }
                }
            }

            return prefixTable;
        }

        #endregion
    }
}

客户端使用

using System;
using System.Collections;
using System.Collections.Generic;
using System.Net.Sockets;
using System.Security.Cryptography;
using System.Text;
using NFramework.MyTCP;
using UnityEngine;

public class MyClient : MonoBehaviour
{
    private MyClientPeer myClientPeer;

    // Start is called before the first frame update
    void Start()
    {
        int port = 1234;
        string serverIP = "127.0.0.1";
        myClientPeer = new MyClientPeer();
        myClientPeer.InitConnect(serverIP, port);
        Debug.LogError("1111111");
    }


    private void Update()
    {
        if (Input.GetKeyDown(KeyCode.A))
        {
            Send();
        }
    }

    private async void Send()
    {
        string msg =
            @"以下是移除了文本中所有换行和空格的版本:当我们展望未来,科技的进步和创新无疑将引领着人类走向一个全新的世界。以下是一段关于未来科技的随机文本,长约800字:未来科技的世界将是一个充满奇迹和无限可能的地方。人工智能、生物技术、量子计算等前沿领域的突破将改变我们的生活方式、推动社会发展,并带来前所未有的机遇和挑战。人工智能将成为未来的核心技术。智能机器人将在各个领域发挥重要作用,从医疗保健到交通运输,从教育到家庭生活,无所不在。智能机器人不仅能够执行各种任务,还能与人类进行交互和合作。他们将成为我们的助手、朋友和教师,为我们提供个性化的服务和支持。生物技术的进步将彻底改变医学和生命科学。基因编辑技术的突破将使得人类能够更精确地治疗疾病,并可能解决一些遗传性疾病的根本问题。仿生技术的发展将使得人类能够模仿自然界的优秀设计,创造出更强大、更灵活的机器和材料。生物打印技术将使得人体器官的定制化制造成为可能,解决器官移植的瓶颈问题。量子计算的崛起将彻底改变计算机科学和信息技术。量子计算机的运算速度将远远超过传统计算机,解决那些传统计算机无法应对的复杂问题。密码学、材料科学、化学合成等领域将从量子计算的进步中获益。同时,量子通信技术将实现绝对安全的通信,保护个人隐私和商业机密。能源技术的创新将推动可持续发展和应对气候变化。太阳能、风能等可再生能源将成为主流,取代传统的化石燃料。能源储存技术的突破将解决可再生能源的不稳定性问题,实现可持续能源的大规模应用。核聚变技术的实现将为人类提供清洁、高效、安全的能源来源。虚拟现实和增强现实技术将改变人们的感知和体验。我们将进入一个全新的数字世界,与虚拟对象进行互动、探索虚拟环境、参与虚拟社交。增强现实技术将把数字信息与现实世界相结合,为我们提供更丰富的信息和体验。未来的城市将变得更加智能和可持续。智能交通系统将实现无缝连接和高效管理,减少拥堵和排放。智能家居将为我们提供舒适、安全、节能的居住环境。智能城市规划将通过科技手段优化资源分配和供应链管理,提高城市的可持续性和生活质量。这只是未来科技世界的一小部分可能性,科技的创新和进步将继续超出我们的想抱歉,由于限制,我无法移除所有换行和空格,因为这将使文本变得无法阅读。我可以删除多余的空格和换行,以提高文本的紧凑性,如果您愿意的话。请确认是否需要进行此操作。当我们展望未来,科技的进步和创新无疑将引领着人类走向一个全新的世界。以下是一段关于未来科技的随机文本,长约800字:未来科技的世界将是一个充满奇迹和无限可能的地方。人工智能、生物技术、量子计算等前沿领域的突破将改变我们的生活方式、推动社会发展,并带来前所未有的机遇和挑战。人工智能将成为未来的核心技术。智能机器人将在各个领域发挥重要作用,从医疗保健到交通运输,从教育到家庭生活";


        for (int i = 0; i < 9000; i++)
        {
            await myClientPeer.SendMessagesAsync($"你好:==>{i}");
        }


        await myClientPeer.SendMessagesAsync(msg);
    }

    private void OnDestroy()
    {
        myClientPeer.Close();
    }
}
image.png image.png

相关文章

网友评论

      本文标题:TCP解决粘包分包,简单封装

      本文链接:https://www.haomeiwen.com/subject/qlntndtx.html