瀏覽代碼

优化Udp协议

master
waterliu99 4 年之前
父節點
當前提交
d740159c25
共有 3 個檔案被更改,包括 105 行新增174 行删除
  1. +2
    -0
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  2. +39
    -174
      src/JT1078.Gateway/JT1078UdpServer.cs
  3. +64
    -0
      src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs

+ 2
- 0
src/JT1078.Gateway/JT1078GatewayExtensions.cs 查看文件

@@ -39,6 +39,7 @@ namespace JT1078.Gateway
public static IJT1078GatewayBuilder AddTcp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddHostedService<JT1078TcpReceiveTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078UdpReceiveTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078TcpServer>();
return builder;
}
@@ -57,6 +58,7 @@ namespace JT1078.Gateway
public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddHostedService<JT1078TcpReceiveTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078UdpReceiveTimeoutJob>();
return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder);
}



+ 39
- 174
src/JT1078.Gateway/JT1078UdpServer.cs 查看文件

@@ -84,9 +84,6 @@ namespace JT1078.Gateway
{
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.UdpPort);
server = new Socket(IPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
//server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
//server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize);
//server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize);
server.Bind(IPEndPoint);
}
public Task StartAsync(CancellationToken cancellationToken)
@@ -96,201 +93,69 @@ namespace JT1078.Gateway
{
while (!cancellationToken.IsCancellationRequested)
{
JT1078UdpSession session = new JT1078UdpSession(server);
var pipe = new Pipe();
Task writing = FillPipeAsync(pipe.Writer, session);
Task reading = ReadPipeAsync(pipe.Reader, session);
await Task.WhenAll(reading, writing);
}
}, cancellationToken);
return Task.CompletedTask;
}
private async Task FillPipeAsync(PipeWriter writer, JT1078UdpSession session)
{
while (true)
{
try
{
await Task.Factory.StartNew(() => {
//设备多久没发数据就断开连接 Receive Timeout.
EndPoint remoteIp = new IPEndPoint(IPAddress.Any, 0);//用来保存发送方的ip和端口号;
var buffer = new byte[Configuration.MiniNumBufferSize];
var bytesRead = server.ReceiveFrom(buffer, ref remoteIp);
session.RemoteEndPoint = remoteIp;
SessionManager.TryAdd(session);
Memory<byte> memory = writer.GetMemory(Configuration.MiniNumBufferSize);
buffer.ToArray().CopyTo(memory);
if (bytesRead == 0)
{
return;
}
writer.Advance(bytesRead);
});
}
catch (AggregateException ex) {

break;
}
catch (OperationCanceledException ex)
{
// Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}");
break;
}
catch (System.Net.Sockets.SocketException ex)
{
// Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}");
break;
}
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{
// Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}");
break;
}
#pragma warning restore CA1031 // Do not catch general exception types
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
writer.Complete();
}
private async Task ReadPipeAsync(PipeReader reader, JT1078UdpSession session)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
if (result.IsCompleted)
{
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (result.IsCanceled) break;
if (buffer.Length > 0)
var buffer = ArrayPool<byte>.Shared.Rent(Configuration.MiniNumBufferSize);
try
{
ReaderBuffer(ref buffer, session, out consumed, out examined);
var segment = new ArraySegment<byte>(buffer);
var result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, server.LocalEndPoint);
ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result);
}
catch (AggregateException ex)
{
Logger.LogError(ex, "Receive MessageFrom Async");
}
}
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{
// Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}");
break;
}
catch (Exception ex)
{
Logger.LogError(ex, $"Received Bytes");
}
#pragma warning restore CA1031 // Do not catch general exception types
finally
{
reader.AdvanceTo(consumed, examined);
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}
reader.Complete();
}, cancellationToken);
return Task.CompletedTask;
}
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT1078UdpSession session, out SequencePosition consumed, out SequencePosition examined)
private void ReaderBuffer(ReadOnlySpan<byte> buffer, Socket socket, SocketReceiveMessageFromResult receiveMessageFromResult)
{
consumed = buffer.Start;
examined = buffer.End;
if (buffer.Length < 15)
{
throw new ArgumentException("not JT1078 package");
}
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
long totalConsumed = 0;
while (!seqReader.End)
try
{
if ((seqReader.Length - seqReader.Consumed) < 15)
var package = JT1078Serializer.Deserialize(buffer);
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{buffer.ToArray().ToHexString()}");
var session = SessionManager.TryLink(package.SIM, socket, receiveMessageFromResult.RemoteEndPoint);
if (Logger.IsEnabled(LogLevel.Information))
{
throw new ArgumentException("not JT1078 package");
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
}
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4);
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan);
if (JT1078Package.FH == headerValue)
if (jT1078UseType == JT1078UseType.Queue)
{
//sim
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12);
//根据数据类型处理对应的数据长度
seqReader.Advance(15);
if (seqReader.TryRead(out byte dataType))
{
JT1078Label3 label3 = new JT1078Label3(dataType);
int bodyLength = 0;
//透传的时候没有该字段
if (label3.DataType != JT1078DataType.透传数据)
{
//时间戳
bodyLength += 8;
}
//非视频帧时没有该字段
if (label3.DataType == JT1078DataType.视频I帧 ||
label3.DataType == JT1078DataType.视频P帧 ||
label3.DataType == JT1078DataType.视频B帧)
{
//上一个关键帧 + 上一帧 = 2 + 2
bodyLength += 4;
}
seqReader.Advance(bodyLength);
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan;
//数据体长度
seqReader.Advance(2);
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan);
//数据体
seqReader.Advance(bodyLength);
if (string.IsNullOrEmpty(sim))
{
sim = session.SessionID;
}
SessionManager.TryLink(sim, session);
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed);
try
{
if (jT1078UseType == JT1078UseType.Queue)
{
jT1078MsgProducer.ProduceAsync(sim, package.ToArray());
}
else
{
jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan));
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break;
}
jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray());
}
else
{
jT1078PackageProducer.ProduceAsync(package.SIM, package);
}
}
if (seqReader.Length == totalConsumed)
catch (NotImplementedException ex)
{
examined = consumed = buffer.End;
Logger.LogError(ex.Message);
}
else
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{
consumed = buffer.GetPosition(totalConsumed);
Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}");
}
#pragma warning restore CA1031 // Do not catch general exception types
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("JT1078 Tcp Server Stop");
Logger.LogInformation("JT1078 Udp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();
return Task.CompletedTask;
}
string ReadBCD(ReadOnlySpan<byte> readOnlySpan, int len)
{
int count = len / 2;
StringBuilder bcdSb = new StringBuilder(count);
for (int i = 0; i < count; i++)
{
bcdSb.Append(readOnlySpan[i].ToString("X2"));
}
return bcdSb.ToString().TrimStart('0');
}
}
}

+ 64
- 0
src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs 查看文件

@@ -0,0 +1,64 @@
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Jobs
{
internal class JT1078UdpReceiveTimeoutJob : BackgroundService
{
private readonly ILogger Logger;

private readonly JT1078SessionManager SessionManager;

private readonly IOptionsMonitor<JT1078Configuration> Configuration;
public JT1078UdpReceiveTimeoutJob(
IOptionsMonitor<JT1078Configuration> jT1078ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT1078SessionManager jT1078SessionManager
)
{
SessionManager = jT1078SessionManager;
Logger = loggerFactory.CreateLogger<JT1078UdpReceiveTimeoutJob>();
Configuration = jT1078ConfigurationAccessor;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
List<string> sessionIds = new List<string>();
foreach (var item in SessionManager.GetUdpAll())
{
if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.UdpReaderIdleTimeSeconds) < DateTime.Now)
{
sessionIds.Add(item.SessionID);
}
}
foreach (var item in sessionIds)
{
SessionManager.RemoveBySessionId(item);
}
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}");
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.UdpReceiveTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

Loading…
取消
儲存