From 7a6017b7aa4a883fd6fba8567bb9b67ac9464358 Mon Sep 17 00:00:00 2001 From: waterliu99 Date: Wed, 1 Jul 2020 14:37:54 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0udp=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Program.cs | 9 +- src/JT1078.Gateway/JT1078UdpServer.cs | 253 +++++++++++++++++- .../Sessions/JT1078SessionManager.cs | 3 +- .../Sessions/JT1078UdpSession.cs | 3 +- 4 files changed, 259 insertions(+), 9 deletions(-) diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 954638c..7788f62 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -30,10 +30,11 @@ namespace JT1078.Gateway.TestNormalHosting services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) - .AddTcp() - .AddNormal() - .AddMsgProducer() - .AddMsgConsumer(); + .AddUdp() + .AddTcp() + .AddNormal() + .AddMsgProducer() + .AddMsgConsumer(); services.AddHostedService(); }); diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs index 2057e8d..5de5b9d 100644 --- a/src/JT1078.Gateway/JT1078UdpServer.cs +++ b/src/JT1078.Gateway/JT1078UdpServer.cs @@ -3,6 +3,7 @@ using System.Buffers; using System.Buffers.Binary; using System.Collections.Generic; using System.IO.Pipelines; +using System.Linq; using System.Net; using System.Net.Sockets; using System.Text; @@ -23,16 +24,264 @@ namespace JT1078.Gateway { public class JT1078UdpServer : IHostedService { + private Socket server; + + private readonly ILogger Logger; + + private readonly JT1078Configuration Configuration; + + private readonly JT1078SessionManager SessionManager; + + private readonly IJT1078PackageProducer jT1078PackageProducer; + + private readonly IJT1078MsgProducer jT1078MsgProducer; + + private readonly JT1078UseType jT1078UseType; + + /// + /// 使用正常方式 + /// + /// + /// + /// + /// + public JT1078UdpServer( + IJT1078PackageProducer jT1078PackageProducer, + IOptions jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager) + { + SessionManager = jT1078SessionManager; + jT1078UseType = JT1078UseType.Normal; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor.Value; + this.jT1078PackageProducer = jT1078PackageProducer; + InitServer(); + } + + /// + /// 使用队列方式 + /// + /// + /// + /// + /// + public JT1078UdpServer( + IJT1078MsgProducer jT1078MsgProducer, + IOptions jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager) + { + SessionManager = jT1078SessionManager; + jT1078UseType = JT1078UseType.Queue; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor.Value; + this.jT1078MsgProducer = jT1078MsgProducer; + InitServer(); + } + + private void InitServer() + { + var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.UdpPort); + server = new Socket(IPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); + //server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); + server.Bind(IPEndPoint); + } public Task StartAsync(CancellationToken cancellationToken) { + Logger.LogInformation($"JT1078 Udp Server start at {IPAddress.Any}:{Configuration.UdpPort}."); + Task.Factory.StartNew(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + JT1078UdpSession session = new JT1078UdpSession(server); + var pipe = new Pipe(); + Task writing = FillPipeAsync(pipe.Writer, session); + Task reading = ReadPipeAsync(pipe.Reader, session); + await Task.WhenAll(reading, writing); + } + }, cancellationToken); return Task.CompletedTask; } + private async Task FillPipeAsync(PipeWriter writer, JT1078UdpSession session) + { + while (true) + { + try + { + await Task.Factory.StartNew(() => { + //设备多久没发数据就断开连接 Receive Timeout. + EndPoint remoteIp = new IPEndPoint(IPAddress.Any, 0);//用来保存发送方的ip和端口号; + var buffer = new byte[Configuration.MiniNumBufferSize]; + var bytesRead = server.ReceiveFrom(buffer, ref remoteIp); + session.RemoteEndPoint = remoteIp; + SessionManager.TryAdd(session); + Memory memory = writer.GetMemory(Configuration.MiniNumBufferSize); + buffer.ToArray().CopyTo(memory); + if (bytesRead == 0) + { + return; + } + writer.Advance(bytesRead); + }); + } + catch (AggregateException ex) { + break; + } + catch (OperationCanceledException ex) + { + // Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); + break; + } + catch (System.Net.Sockets.SocketException ex) + { + // Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) + { + // Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning restore CA1031 // Do not catch general exception types + FlushResult result = await writer.FlushAsync(); + if (result.IsCompleted) + { + break; + } + } + writer.Complete(); + } + private async Task ReadPipeAsync(PipeReader reader, JT1078UdpSession session) + { + while (true) + { + ReadResult result = await reader.ReadAsync(); + if (result.IsCompleted) + { + break; + } + ReadOnlySequence buffer = result.Buffer; + SequencePosition consumed = buffer.Start; + SequencePosition examined = buffer.End; + try + { + if (result.IsCanceled) break; + if (buffer.Length > 0) + { + ReaderBuffer(ref buffer, session, out consumed, out examined); + } + } +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) + { + // Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning restore CA1031 // Do not catch general exception types + finally + { + reader.AdvanceTo(consumed, examined); + } + } + reader.Complete(); + } + private void ReaderBuffer(ref ReadOnlySequence buffer, JT1078UdpSession session, out SequencePosition consumed, out SequencePosition examined) + { + consumed = buffer.Start; + examined = buffer.End; + if (buffer.Length < 15) + { + throw new ArgumentException("not JT1078 package"); + } + SequenceReader seqReader = new SequenceReader(buffer); + long totalConsumed = 0; + while (!seqReader.End) + { + if ((seqReader.Length - seqReader.Consumed) < 15) + { + throw new ArgumentException("not JT1078 package"); + } + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + if (JT1078Package.FH == headerValue) + { + //sim + var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + //根据数据类型处理对应的数据长度 + seqReader.Advance(15); + if (seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + int bodyLength = 0; + //透传的时候没有该字段 + if (label3.DataType != JT1078DataType.透传数据) + { + //时间戳 + bodyLength += 8; + } + //非视频帧时没有该字段 + if (label3.DataType == JT1078DataType.视频I帧 || + label3.DataType == JT1078DataType.视频P帧 || + label3.DataType == JT1078DataType.视频B帧) + { + //上一个关键帧 + 上一帧 = 2 + 2 + bodyLength += 4; + } + seqReader.Advance(bodyLength); + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + //数据体长度 + seqReader.Advance(2); + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + //数据体 + seqReader.Advance(bodyLength); + if (string.IsNullOrEmpty(sim)) + { + sim = session.SessionID; + } + SessionManager.TryLink(sim, session); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); + try + { + if (jT1078UseType == JT1078UseType.Queue) + { + jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); + } + else + { + jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); + } + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + } + } + if (seqReader.Length == totalConsumed) + { + examined = consumed = buffer.End; + } + else + { + consumed = buffer.GetPosition(totalConsumed); + } + } public Task StopAsync(CancellationToken cancellationToken) { + Logger.LogInformation("JT1078 Tcp Server Stop"); + if (server?.Connected ?? false) + server.Shutdown(SocketShutdown.Both); + server?.Close(); return Task.CompletedTask; } - string ReadBCD(ReadOnlySpan readOnlySpan, int len) { int count = len / 2; @@ -44,4 +293,4 @@ namespace JT1078.Gateway return bcdSb.ToString().TrimStart('0'); } } -} +} \ No newline at end of file diff --git a/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs index 6169d14..7402372 100644 --- a/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs @@ -97,8 +97,9 @@ namespace JT1078.Gateway.Sessions } else { - JT1078UdpSession session = new JT1078UdpSession(socket, remoteEndPoint); + JT1078UdpSession session = new JT1078UdpSession(socket); session.TerminalPhoneNo = terminalPhoneNo; + session.RemoteEndPoint = remoteEndPoint; Sessions.TryAdd(session.SessionID, session); TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session); currentSession = session; diff --git a/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs b/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs index c1e8596..856e74d 100644 --- a/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs +++ b/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs @@ -9,13 +9,12 @@ namespace JT1078.Gateway.Sessions { public class JT1078UdpSession: IJT1078Session { - public JT1078UdpSession(Socket socket, EndPoint sender) + public JT1078UdpSession(Socket socket) { ActiveTime = DateTime.Now; StartTime = DateTime.Now; SessionID = Guid.NewGuid().ToString("N"); ReceiveTimeout = new CancellationTokenSource(); - RemoteEndPoint = sender; Client = socket; }