From 52bd0f5a343112c9a3c800b02b4389e4f529ef3e Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Sun, 13 Sep 2020 00:25:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84tcp=E6=8E=A5=E6=94=B6?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E5=8C=85=EF=BC=88=E5=BE=85=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT1078.Gateway.Test/PipeTest.cs | 139 ++++++++++++++++++ .../JT1078.Gateway.TestNormalHosting.csproj | 5 +- .../Program.cs | 21 ++- ....cs => JT1078FlvNormalMsgHostedService.cs} | 4 +- src/JT1078.Gateway/JT1078TcpServer.cs | 122 ++++++++++----- 5 files changed, 245 insertions(+), 46 deletions(-) rename src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/{JT1078NormalMsgHostedService.cs => JT1078FlvNormalMsgHostedService.cs} (94%) diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs index 9be4e3b..e4ba470 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs @@ -73,6 +73,139 @@ namespace JT1078.Gateway.Test Assert.Single(packages); } + [Fact] + public void Test1_1() + { + var reader = new ReadOnlySequence("303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes()); + SequenceReader seqReader = new SequenceReader(reader); + long totalConsumed = 0; + List packages = new List(); + FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); + while (!seqReader.End) + { + if (!fixedHeaderInfo.FoundHeader) + { + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + Assert.Equal(JT1078Package.FH, headerValue); + if (JT1078Package.FH == headerValue) + { + //sim + var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + Assert.Equal("1901305037", sim); + //根据数据类型处理对应的数据长度 + fixedHeaderInfo.TotalSize += 15; + var dataType = seqReader.Sequence.Slice(seqReader.Consumed+fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; + fixedHeaderInfo.TotalSize += 1; + JT1078Label3 label3 = new JT1078Label3(dataType); + Assert.Equal(JT1078DataType.视频I帧, label3.DataType); + int bodyLength = 0; + //透传的时候没有该字段 + if (label3.DataType != JT1078DataType.透传数据) + { + //时间戳 + bodyLength += 8; + } + //非视频帧时没有该字段 + if (label3.DataType == JT1078DataType.视频I帧 || + label3.DataType == JT1078DataType.视频P帧 || + label3.DataType == JT1078DataType.视频B帧) + { + //上一个关键帧 + 上一帧 = 2 + 2 + bodyLength += 4; + } + fixedHeaderInfo.TotalSize += bodyLength; + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed+ fixedHeaderInfo.TotalSize, 2).FirstSpan; + //数据体长度 + fixedHeaderInfo.TotalSize += 2; + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + Assert.Equal(950, bodyLength); + //数据体 + fixedHeaderInfo.TotalSize += bodyLength; + fixedHeaderInfo.FoundHeader = true; + } + } + if (reader.Length < fixedHeaderInfo.TotalSize) break; + seqReader.Advance(fixedHeaderInfo.TotalSize); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + Assert.Single(packages); + } + + [Fact] + public void Test1_2() + { + var reader = new ReadOnlySequence("303163648162000000190130503701010000016f95973f84000000000000".ToHexBytes()); + SequenceReader seqReader = new SequenceReader(reader); + long totalConsumed = 0; + List packages = new List(); + FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); + while (!seqReader.End) + { + if (!fixedHeaderInfo.FoundHeader) + { + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + Assert.Equal(JT1078Package.FH, headerValue); + if (JT1078Package.FH == headerValue) + { + //sim + var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + Assert.Equal("1901305037", sim); + //根据数据类型处理对应的数据长度 + fixedHeaderInfo.TotalSize += 15; + var dataType = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; + fixedHeaderInfo.TotalSize += 1; + JT1078Label3 label3 = new JT1078Label3(dataType); + Assert.Equal(JT1078DataType.视频I帧, label3.DataType); + int bodyLength = 0; + //透传的时候没有该字段 + if (label3.DataType != JT1078DataType.透传数据) + { + //时间戳 + bodyLength += 8; + } + //非视频帧时没有该字段 + if (label3.DataType == JT1078DataType.视频I帧 || + label3.DataType == JT1078DataType.视频P帧 || + label3.DataType == JT1078DataType.视频B帧) + { + //上一个关键帧 + 上一帧 = 2 + 2 + bodyLength += 4; + } + fixedHeaderInfo.TotalSize += bodyLength; + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 2).FirstSpan; + //数据体长度 + fixedHeaderInfo.TotalSize += 2; + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + Assert.Equal(0, bodyLength); + if (bodyLength == 0) + { + try + { + seqReader.Advance(fixedHeaderInfo.TotalSize); + var package1 = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package1); + } + finally + { + totalConsumed += (seqReader.Consumed - totalConsumed); + } + continue; + } + //数据体 + fixedHeaderInfo.TotalSize += bodyLength; + fixedHeaderInfo.FoundHeader = true; + } + } + if (seqReader.End) break; + } + Assert.Single(packages); + } + [Fact] public void Test2() { @@ -251,5 +384,11 @@ namespace JT1078.Gateway.Test var extension = Path.GetExtension(filename); var queryParams = uri.Query.Substring(1, uri.Query.Length - 1).Split('&'); } + + class FixedHeaderInfo + { + public bool FoundHeader { get; set; } + public int TotalSize { get; set; } + } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index 613c7e1..b7c0065 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -6,8 +6,9 @@ - - + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 9616f49..2210512 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -1,6 +1,8 @@ 锘縰sing JT1078.Flv; using JT1078.Gateway.InMemoryMQ; using JT1078.Gateway.TestNormalHosting.Services; +using JT1078.Hls; +using JT1078.Hls.Options; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; @@ -27,9 +29,16 @@ namespace JT1078.Gateway.TestNormalHosting }) .ConfigureServices((hostContext, services) => { + services.AddMemoryCache(); services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(new M3U8Option + { + + }); + services.AddSingleton(); //浣跨敤鍐呭瓨闃熷垪瀹炵幇浼氳瘽閫氱煡 services.AddJT1078Gateway(hostContext.Configuration) .AddTcp() @@ -39,9 +48,17 @@ namespace JT1078.Gateway.TestNormalHosting .AddNormal() .AddMsgProducer() .AddMsgConsumer(); - services.AddHostedService(); + services.AddHostedService(); + //services.AddHostedService(); }); - + //娴嬭瘯1: + //鍙戦佸畬鏁村寘 + //303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2 + //娴嬭瘯2锛 + //鍙戦佸ご閮ㄥ寘 + //303163648162000000190130503701010000016f95973f840000000003b6 + //鍙戦佹暟鎹綋 + //00000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2 await serverHostBuilder.RunConsoleAsync(); } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs similarity index 94% rename from src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs rename to src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs index b915eee..59036dd 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs @@ -11,12 +11,12 @@ using System.Linq; namespace JT1078.Gateway.TestNormalHosting.Services { - public class JT1078NormalMsgHostedService : BackgroundService + public class JT1078FlvNormalMsgHostedService : BackgroundService { private IJT1078PackageConsumer PackageConsumer; private JT1078HttpSessionManager HttpSessionManager; private FlvEncoder FlvEncoder; - public JT1078NormalMsgHostedService( + public JT1078FlvNormalMsgHostedService( FlvEncoder flvEncoder, JT1078HttpSessionManager httpSessionManager, IJT1078PackageConsumer packageConsumer) diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs index dc1950c..e46a428 100644 --- a/src/JT1078.Gateway/JT1078TcpServer.cs +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -164,6 +164,7 @@ namespace JT1078.Gateway } private async Task ReadPipeAsync(JT1078TcpSession session, PipeReader reader) { + FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); while (true) { ReadResult result = await reader.ReadAsync(); @@ -179,7 +180,7 @@ namespace JT1078.Gateway if (result.IsCanceled) break; if (buffer.Length > 0) { - ReaderBuffer(ref buffer, session, out consumed, out examined); + ReaderBuffer(ref buffer, fixedHeaderInfo, session, out consumed, out examined); } } #pragma warning disable CA1031 // Do not catch general exception types @@ -196,32 +197,32 @@ namespace JT1078.Gateway } reader.Complete(); } - private void ReaderBuffer(ref ReadOnlySequence buffer, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) + private void ReaderBuffer(ref ReadOnlySequence buffer, FixedHeaderInfo fixedHeaderInfo, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) { consumed = buffer.Start; examined = buffer.End; - if (buffer.Length < 15) - { - throw new ArgumentException("not JT1078 package"); - } SequenceReader seqReader = new SequenceReader(buffer); long totalConsumed = 0; while (!seqReader.End) { - if ((seqReader.Length - seqReader.Consumed) < 15) - { - throw new ArgumentException("not JT1078 package"); - } - var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); - var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); - if (JT1078Package.FH == headerValue) + if (!fixedHeaderInfo.FoundHeader) { - //sim - var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); - //鏍规嵁鏁版嵁绫诲瀷澶勭悊瀵瑰簲鐨勬暟鎹暱搴 - seqReader.Advance(15); - if (seqReader.TryRead(out byte dataType)) + if (seqReader.Length < 4) + throw new ArgumentException("not JT1078 package"); + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + if (JT1078Package.FH == headerValue) { + //sim + fixedHeaderInfo.SIM = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + if (string.IsNullOrEmpty(fixedHeaderInfo.SIM)) + { + fixedHeaderInfo.SIM = session.SessionID; + } + //鏍规嵁鏁版嵁绫诲瀷澶勭悊瀵瑰簲鐨勬暟鎹暱搴 + fixedHeaderInfo.TotalSize += 15; + var dataType = seqReader.Sequence.Slice(seqReader.Consumed+fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; + fixedHeaderInfo.TotalSize += 1; JT1078Label3 label3 = new JT1078Label3(dataType); int bodyLength = 0; //閫忎紶鐨勬椂鍊欐病鏈夎瀛楁 @@ -238,38 +239,67 @@ namespace JT1078.Gateway //涓婁竴涓叧閿抚 + 涓婁竴甯 = 2 + 2 bodyLength += 4; } - seqReader.Advance(bodyLength); - var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + fixedHeaderInfo.TotalSize += bodyLength; + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 2).FirstSpan; //鏁版嵁浣撻暱搴 - seqReader.Advance(2); + fixedHeaderInfo.TotalSize += 2; bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); - //鏁版嵁浣 - seqReader.Advance(bodyLength); - if (string.IsNullOrEmpty(sim)) - { - sim = session.SessionID; - } - SessionManager.TryLink(sim, session); - var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); - try + if (bodyLength == 0)//鏁版嵁浣撻暱搴︿负0 { - if (jT1078UseType== JT1078UseType.Queue) + seqReader.Advance(fixedHeaderInfo.TotalSize); + var package1 = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); + try { - jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); + SessionManager.TryLink(fixedHeaderInfo.SIM, session); + if (jT1078UseType == JT1078UseType.Queue) + { + jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.FirstSpan.ToArray()); + } + else + { + jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1.FirstSpan)); + } } - else + catch (Exception ex) { - jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); + Logger.LogError(ex, $"[Parse]:{package1.ToArray().ToHexString()}"); } + finally + { + totalConsumed += (seqReader.Consumed - totalConsumed); + fixedHeaderInfo.Reset(); + } + continue; } - catch (Exception ex) - { - Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); - } - totalConsumed += (seqReader.Consumed - totalConsumed); - if (seqReader.End) break; + //鏁版嵁浣 + fixedHeaderInfo.TotalSize += bodyLength; + fixedHeaderInfo.FoundHeader = true; + } + } + if (seqReader.Length < fixedHeaderInfo.TotalSize) break; + seqReader.Advance(fixedHeaderInfo.TotalSize); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); + try + { + SessionManager.TryLink(fixedHeaderInfo.SIM, session); + if (jT1078UseType == JT1078UseType.Queue) + { + jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package.ToArray()); + } + else + { + jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package.FirstSpan)); } } + catch (Exception ex) + { + Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); + } + finally + { + totalConsumed += (seqReader.Consumed - totalConsumed); + fixedHeaderInfo.Reset(); + } } if (seqReader.Length == totalConsumed) { @@ -298,5 +328,17 @@ namespace JT1078.Gateway } return bcdSb.ToString().TrimStart('0'); } + + class FixedHeaderInfo + { + public bool FoundHeader { get; set; } + public int TotalSize { get; set; } + public string SIM { get; set; } + public void Reset() + { + FoundHeader = false; + TotalSize = 0; + } + } } }