diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index 9a4e369..9c5e755 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -39,6 +39,7 @@ namespace JT1078.Gateway public static IJT1078GatewayBuilder AddTcp(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.AddHostedService(); + builder.JT1078Builder.Services.AddHostedService(); builder.JT1078Builder.Services.AddHostedService(); return builder; } @@ -57,6 +58,7 @@ namespace JT1078.Gateway public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.AddHostedService(); + builder.JT1078Builder.Services.AddHostedService(); return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder); } diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs index 5de5b9d..957557b 100644 --- a/src/JT1078.Gateway/JT1078UdpServer.cs +++ b/src/JT1078.Gateway/JT1078UdpServer.cs @@ -84,9 +84,6 @@ namespace JT1078.Gateway { var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.UdpPort); server = new Socket(IPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); - //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); - //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); - //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); server.Bind(IPEndPoint); } public Task StartAsync(CancellationToken cancellationToken) @@ -96,201 +93,69 @@ namespace JT1078.Gateway { while (!cancellationToken.IsCancellationRequested) { - JT1078UdpSession session = new JT1078UdpSession(server); - var pipe = new Pipe(); - Task writing = FillPipeAsync(pipe.Writer, session); - Task reading = ReadPipeAsync(pipe.Reader, session); - await Task.WhenAll(reading, writing); - } - }, cancellationToken); - return Task.CompletedTask; - } - private async Task FillPipeAsync(PipeWriter writer, JT1078UdpSession session) - { - while (true) - { - try - { - await Task.Factory.StartNew(() => { - //设备多久没发数据就断开连接 Receive Timeout. - EndPoint remoteIp = new IPEndPoint(IPAddress.Any, 0);//用来保存发送方的ip和端口号; - var buffer = new byte[Configuration.MiniNumBufferSize]; - var bytesRead = server.ReceiveFrom(buffer, ref remoteIp); - session.RemoteEndPoint = remoteIp; - SessionManager.TryAdd(session); - Memory memory = writer.GetMemory(Configuration.MiniNumBufferSize); - buffer.ToArray().CopyTo(memory); - if (bytesRead == 0) - { - return; - } - writer.Advance(bytesRead); - }); - } - catch (AggregateException ex) { - - break; - } - catch (OperationCanceledException ex) - { - // Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); - break; - } - catch (System.Net.Sockets.SocketException ex) - { - // Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}"); - break; - } -#pragma warning disable CA1031 // Do not catch general exception types - catch (Exception ex) - { - // Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); - break; - } -#pragma warning restore CA1031 // Do not catch general exception types - FlushResult result = await writer.FlushAsync(); - if (result.IsCompleted) - { - break; - } - } - writer.Complete(); - } - private async Task ReadPipeAsync(PipeReader reader, JT1078UdpSession session) - { - while (true) - { - ReadResult result = await reader.ReadAsync(); - if (result.IsCompleted) - { - break; - } - ReadOnlySequence buffer = result.Buffer; - SequencePosition consumed = buffer.Start; - SequencePosition examined = buffer.End; - try - { - if (result.IsCanceled) break; - if (buffer.Length > 0) + var buffer = ArrayPool.Shared.Rent(Configuration.MiniNumBufferSize); + try { - ReaderBuffer(ref buffer, session, out consumed, out examined); + var segment = new ArraySegment(buffer); + var result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, server.LocalEndPoint); + ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result); + } + catch (AggregateException ex) + { + Logger.LogError(ex, "Receive MessageFrom Async"); } - } #pragma warning disable CA1031 // Do not catch general exception types - catch (Exception ex) - { - // Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}"); - break; - } + catch (Exception ex) + { + Logger.LogError(ex, $"Received Bytes"); + } #pragma warning restore CA1031 // Do not catch general exception types - finally - { - reader.AdvanceTo(consumed, examined); + finally + { + ArrayPool.Shared.Return(buffer); + } } - } - reader.Complete(); + }, cancellationToken); + return Task.CompletedTask; } - private void ReaderBuffer(ref ReadOnlySequence buffer, JT1078UdpSession session, out SequencePosition consumed, out SequencePosition examined) + private void ReaderBuffer(ReadOnlySpan buffer, Socket socket, SocketReceiveMessageFromResult receiveMessageFromResult) { - consumed = buffer.Start; - examined = buffer.End; - if (buffer.Length < 15) - { - throw new ArgumentException("not JT1078 package"); - } - SequenceReader seqReader = new SequenceReader(buffer); - long totalConsumed = 0; - while (!seqReader.End) + try { - if ((seqReader.Length - seqReader.Consumed) < 15) + var package = JT1078Serializer.Deserialize(buffer); + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{buffer.ToArray().ToHexString()}"); + var session = SessionManager.TryLink(package.SIM, socket, receiveMessageFromResult.RemoteEndPoint); + if (Logger.IsEnabled(LogLevel.Information)) { - throw new ArgumentException("not JT1078 package"); + Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}"); } - var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); - var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); - if (JT1078Package.FH == headerValue) + if (jT1078UseType == JT1078UseType.Queue) { - //sim - var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); - //根据数据类型处理对应的数据长度 - seqReader.Advance(15); - if (seqReader.TryRead(out byte dataType)) - { - JT1078Label3 label3 = new JT1078Label3(dataType); - int bodyLength = 0; - //透传的时候没有该字段 - if (label3.DataType != JT1078DataType.透传数据) - { - //时间戳 - bodyLength += 8; - } - //非视频帧时没有该字段 - if (label3.DataType == JT1078DataType.视频I帧 || - label3.DataType == JT1078DataType.视频P帧 || - label3.DataType == JT1078DataType.视频B帧) - { - //上一个关键帧 + 上一帧 = 2 + 2 - bodyLength += 4; - } - seqReader.Advance(bodyLength); - var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; - //数据体长度 - seqReader.Advance(2); - bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); - //数据体 - seqReader.Advance(bodyLength); - if (string.IsNullOrEmpty(sim)) - { - sim = session.SessionID; - } - SessionManager.TryLink(sim, session); - var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); - try - { - if (jT1078UseType == JT1078UseType.Queue) - { - jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); - } - else - { - jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); - } - } - catch (Exception ex) - { - Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); - } - totalConsumed += (seqReader.Consumed - totalConsumed); - if (seqReader.End) break; - } + jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray()); + } + else + { + jT1078PackageProducer.ProduceAsync(package.SIM, package); } } - if (seqReader.Length == totalConsumed) + catch (NotImplementedException ex) { - examined = consumed = buffer.End; + Logger.LogError(ex.Message); } - else +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) { - consumed = buffer.GetPosition(totalConsumed); + Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}"); } +#pragma warning restore CA1031 // Do not catch general exception types } public Task StopAsync(CancellationToken cancellationToken) { - Logger.LogInformation("JT1078 Tcp Server Stop"); + Logger.LogInformation("JT1078 Udp Server Stop"); if (server?.Connected ?? false) server.Shutdown(SocketShutdown.Both); server?.Close(); return Task.CompletedTask; } - string ReadBCD(ReadOnlySpan readOnlySpan, int len) - { - int count = len / 2; - StringBuilder bcdSb = new StringBuilder(count); - for (int i = 0; i < count; i++) - { - bcdSb.Append(readOnlySpan[i].ToString("X2")); - } - return bcdSb.ToString().TrimStart('0'); - } } } \ No newline at end of file diff --git a/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs new file mode 100644 index 0000000..f2ee348 --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs @@ -0,0 +1,64 @@ +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Jobs +{ + internal class JT1078UdpReceiveTimeoutJob : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT1078SessionManager SessionManager; + + private readonly IOptionsMonitor Configuration; + public JT1078UdpReceiveTimeoutJob( + IOptionsMonitor jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager + ) + { + SessionManager = jT1078SessionManager; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + List sessionIds = new List(); + foreach (var item in SessionManager.GetUdpAll()) + { + if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.UdpReaderIdleTimeSeconds) < DateTime.Now) + { + sessionIds.Add(item.SessionID); + } + } + foreach (var item in sessionIds) + { + SessionManager.RemoveBySessionId(item); + } + Logger.LogInformation($"[Check Receive Timeout]"); + Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}"); + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Timeout]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.UdpReceiveTimeoutCheckTimeSeconds), stoppingToken); + } + } + } + } +}