C#处理TCP数据的方法详解

来自:网络
时间:2024-09-10
阅读:

前言

Tcp是一个面向连接的流数据传输协议,用人话说就是传输是一个已经建立好连接的管道,数据都在管道里像流水一样流淌到对端。

那么数据必然存在几个问题,比如数据如何持续的读取,数据包的边界等。

Nagle's算法

Nagle 算法的核心思想是,在一个 TCP 连接上,最多只能有一个未被确认的小数据包(小于 MSS,即最大报文段大小)

优势

减少网络拥塞:通过合并小数据包,减少了网络中的数据包数量,降低了拥塞的可能性。

提高网络效率:在低速网络中,Nagle 算法可以显著提高传输效率。

劣势

增加延迟:在交互式应用中,Nagle 算法可能导致显著的延迟,因为它等待 ACK 或合并数据包。

C#中如何配置?

var _socket = new Socket(IPAddress.Any.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
_serverSocket.NoDelay = _options.NoDelay;

连接超时

在调用客户端Socket连接服务器的时候,可以设置连接超时机制,具体可以传入一个任务的取消令牌,并且设置超时时间。

CancellationTokenSource connectTokenSource = new CancellationTokenSource();
connectTokenSource.CancelAfter(3000); //3秒
await _socket.ConnectAsync(RemoteEndPoint, connectTokenSource.Token);

SSL加密传输

TCP使用SSL加密传输,通过非对称加密的方式,利用证书,保证双方使用了安全的密钥加密了报文。在C#中如何配置?

服务端配置

//创建证书对象
var _certificate  = _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
 
//与客户端进行验证
if (allowingUntrustedSSLCertificate) //是否允许不受信任的证书
{
    SslStream = new SslStream(NetworkStream, false,
        (obj, certificate, chain, error) => true);
}
else
{
    SslStream = new SslStream(NetworkStream, false);
}
 
try
{
    //serverCertificate:用于对服务器进行身份验证的 X509Certificate
    //clientCertificateRequired:一个 Boolean 值,指定客户端是否必须为身份验证提供证书
    //checkCertificateRevocation:一个 Boolean 值,指定在身份验证过程中是否检查证书吊销列表
    await SslStream.AuthenticateAsServerAsync(new SslServerAuthenticationOptions()
    {
        ServerCertificate = x509Certificate,
        ClientCertificateRequired = mutuallyAuthenticate,
        CertificateRevocationCheckMode = checkCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck
    }, cancellationToken).ConfigureAwait(false);
 
    if (!SslStream.IsEncrypted || !SslStream.IsAuthenticated)
    {
        return false;
    }
 
    if (mutuallyAuthenticate && !SslStream.IsMutuallyAuthenticated)
    {
        return false;
    }
}
catch (Exception)
{
    throw;
}
 
//完成验证后,通过SslStream传输数据
int readCount = await SslStream.ReadAsync(buffer, _lifecycleTokenSource.Token)
    .ConfigureAwait(false);

客户端配置

var _certificate = new X509Certificate2(_options.PfxCertFilename, _options.PfxPassword);
 
if (_options.IsSsl) //如果使用ssl加密传输
{
    if (_options.AllowingUntrustedSSLCertificate)//是否允许不受信任的证书
    {
        _sslStream = new SslStream(_networkStream, false,
                (obj, certificate, chain, error) => true);
    }
    else
    {
        _sslStream = new SslStream(_networkStream, false);
    }
 
    _sslStream.ReadTimeout = _options.ReadTimeout;
    _sslStream.WriteTimeout = _options.WriteTimeout;
    await _sslStream.AuthenticateAsClientAsync(new SslClientAuthenticationOptions()
    {
        TargetHost = RemoteEndPoint.Address.ToString(),
        EnabledSslProtocols = System.Security.Authentication.SslProtocols.Tls12,
        CertificateRevocationCheckMode = _options.CheckCertificateRevocation ? X509RevocationMode.Online : X509RevocationMode.NoCheck,
        ClientCertificates = new X509CertificateCollection() { _certificate }
    }, connectTokenSource.Token).ConfigureAwait(false);
 
    if (!_sslStream.IsEncrypted || !_sslStream.IsAuthenticated ||
        (_options.MutuallyAuthenticate && !_sslStream.IsMutuallyAuthenticated))
    {
        throw new InvalidOperationException("SSL authenticated faild!");
    }
}

KeepAlive

keepAlive不是TCP协议中的,而是各个操作系统本身实现的功能,主要是防止一些Socket突然断开后没有被感知到,导致一直浪费资源的情况。

/// <summary>
/// 开启Socket的KeepAlive
/// 设置tcp协议的一些KeepAlive参数
/// </summary>
/// <param name="socket"></param>
/// <param name="tcpKeepAliveInterval">没有接收到对方确认,继续发送KeepAlive的发送频率</param>
/// <param name="tcpKeepAliveTime">KeepAlive的空闲时长,或者说每次正常发送心跳的周期</param>
/// <param name="tcpKeepAliveRetryCount">KeepAlive之后设置最大允许发送保活探测包的次数,到达此次数后直接放弃尝试,并关闭连接</param>
internal static void SetKeepAlive(this Socket socket, int tcpKeepAliveInterval, int tcpKeepAliveTime, int tcpKeepAliveRetryCount)
{
    socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
    socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveInterval, tcpKeepAliveInterval);
    socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveTime, tcpKeepAliveTime);
    socket.SetSocketOption(SocketOptionLevel.Tcp, SocketOptionName.TcpKeepAliveRetryCount, tcpKeepAliveRetryCount);
}

具体的开启,还需要看操作系统的版本以及不同操作系统的支持。

粘包断包处理

Pipe & ReadOnlySequence

C#处理TCP数据的方法详解

TCP面向应用是流式数据传输,所以接收端接到的数据是像流水一样从管道中传来,每次取到的数据取决于应用设置的缓冲区大小,以及套接字本身缓冲区待读取字节数

C#中提供的Pipe就如上图一样,是一个管道Pipe有两个对象成员,一个是PipeWriter,一个是PipeReader,可以理解为一个是生产者,专门往管道里灌输数据流,即字节流,一个是消费者,专门从管道里获取字节流进行处理。

可以看到Pipe中的数据包是用链表关联的,但是这个数据包是从Socke缓冲区每次取到的数据包,它不一定是一个完整的数据包,所以这些数据包连接起来后形成了一个C#提供的另外一个抽象的对象ReadOnlySequence。

但是这里还是没有提供太好的处理断包和粘包的办法,因为断包粘包的处理需要两方面

1、业务数据包的定义

2、数据流切割出一个个完整的数据包

假设业务已经定义好了数据包,那么我们如何从Pipe中这些数据包根据业务定义来从不同的数据包中切割出一个完整的包,那么就需要ReadOnlySequence,它提供的操作方法,非常方便我们去切割数据,主要是头尾数据包的切割。

假设我们业务层定义了一个数据包结构,数据包是不定长的,包体长度每次都写在包头里,我们来实现一个数据包过滤器。

//收到消息
 while (!_receiveDataTokenSource.Token.IsCancellationRequested)
 {
     try
     {
        //从pipe中获取缓冲区
         Memory<byte> buffer = _pipeWriter.GetMemory(_options.BufferSize);
         int readCount = 0;
         readCount = await _sslStream.ReadAsync(buffer, _lifecycleTokenSource.Token).ConfigureAwait(false);
 
         if (readCount > 0)
         {
 
             var data = buffer.Slice(0, readCount);
             //告知消费者,往Pipe的管道中写入了多少字节数据
             _pipeWriter.Advance(readCount);
         }
         else
         {
             if (IsDisconnect())
             {
                 await DisConnectAsync();
             }
 
             throw new SocketException();
         }
 
         FlushResult result = await _pipeWriter.FlushAsync().ConfigureAwait(false);
         if (result.IsCompleted)
         {
             break;
         }
     }
     catch (IOException)
     {
         //TODO log
         break;
     }
     catch (SocketException)
     {
         //TODO log
         break;
     }
     catch (TaskCanceledException)
     {
         //TODO log
         break;
     }
 }
 
 _pipeWriter.Complete();
//消费者处理数据
 while (!_lifecycleTokenSource.Token.IsCancellationRequested)
 {
     ReadResult result = await _pipeReader.ReadAsync();
     ReadOnlySequence<byte> buffer = result.Buffer;
     ReadOnlySequence<byte> data;
     do
     {
        //通过过滤器得到一个完整的包
         data = _receivePackageFilter.ResolvePackage(ref buffer);
 
         if (!data.IsEmpty)
         {
             OnReceivedData?.Invoke(this, new ClientDataReceiveEventArgs(data.ToArray()));
         }
     }
     while (!data.IsEmpty && buffer.Length > 0);
     _pipeReader.AdvanceTo(buffer.Start);
 }
 
 _pipeReader.Complete();
/// <summary>
/// 解析数据包
/// 固定报文头解析协议
/// </summary>
/// <param name="headerSize">数据报文头的大小</param>
/// <param name="bodyLengthIndex">数据包大小在报文头中的位置</param>
/// <param name="bodyLengthBytes">数据包大小在报文头中的长度</param>
/// <param name="IsLittleEndian">数据报文大小端。windows中通常是小端,unix通常是大端模式</param>
/// </summary>
/// <param name="sequence">一个完整的业务数据包</param>
public override ReadOnlySequence<byte> ResolvePackage(ref ReadOnlySequence<byte> sequence)
{
    var len = sequence.Length;
    if (len < _bodyLengthIndex) return default;
    var bodyLengthSequence = sequence.Slice(_bodyLengthIndex, _bodyLengthBytes);
    byte[] bodyLengthBytes = ArrayPool<byte>.Shared.Rent(_bodyLengthBytes);
    try
    {
        int index = 0;
        foreach (var item in bodyLengthSequence)
        {
            Array.Copy(item.ToArray(), 0, bodyLengthBytes, index, item.Length);
            index += item.Length;
        }
 
        long bodyLength = 0;
        int offset = 0;
        if (!_isLittleEndian)
        {
            offset = bodyLengthBytes.Length - 1;
            foreach (var bytes in bodyLengthBytes)
            {
                bodyLength += bytes << (offset * 8);
                offset--;
            }
        }
        else
        {
 
            foreach (var bytes in bodyLengthBytes)
            {
                bodyLength += bytes << (offset * 8);
                offset++;
            }
        }
 
        if (sequence.Length < _headerSize + bodyLength)
            return default;
 
        var endPosition = sequence.GetPosition(_headerSize + bodyLength);
        var data = sequence.Slice(0, endPosition);//得到完整数据包
        sequence = sequence.Slice(endPosition);//缓冲区中去除取到的完整包
 
        return data;
    }
    finally
    {
        ArrayPool<byte>.Shared.Return(bodyLengthBytes);
    }
}

以上就是实现了固定数据包头实现粘包断包处理的部分代码。

关于TCP的连接还有一些,比如客户端连接限制,空闲连接关闭等。

返回顶部
顶部