@@ -73,6 +73,139 @@ namespace JT1078.Gateway.Test | |||
Assert.Single(packages); | |||
} | |||
[Fact] | |||
public void Test1_1() | |||
{ | |||
var reader = new ReadOnlySequence<byte>("303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes()); | |||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||
long totalConsumed = 0; | |||
List<byte[]> packages = new List<byte[]>(); | |||
FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); | |||
while (!seqReader.End) | |||
{ | |||
if (!fixedHeaderInfo.FoundHeader) | |||
{ | |||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||
Assert.Equal(JT1078Package.FH, headerValue); | |||
if (JT1078Package.FH == headerValue) | |||
{ | |||
//sim | |||
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||
Assert.Equal("1901305037", sim); | |||
//根据数据类型处理对应的数据长度 | |||
fixedHeaderInfo.TotalSize += 15; | |||
var dataType = seqReader.Sequence.Slice(seqReader.Consumed+fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; | |||
fixedHeaderInfo.TotalSize += 1; | |||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||
Assert.Equal(JT1078DataType.视频I帧, label3.DataType); | |||
int bodyLength = 0; | |||
//透传的时候没有该字段 | |||
if (label3.DataType != JT1078DataType.透传数据) | |||
{ | |||
//时间戳 | |||
bodyLength += 8; | |||
} | |||
//非视频帧时没有该字段 | |||
if (label3.DataType == JT1078DataType.视频I帧 || | |||
label3.DataType == JT1078DataType.视频P帧 || | |||
label3.DataType == JT1078DataType.视频B帧) | |||
{ | |||
//上一个关键帧 + 上一帧 = 2 + 2 | |||
bodyLength += 4; | |||
} | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed+ fixedHeaderInfo.TotalSize, 2).FirstSpan; | |||
//数据体长度 | |||
fixedHeaderInfo.TotalSize += 2; | |||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||
Assert.Equal(950, bodyLength); | |||
//数据体 | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
fixedHeaderInfo.FoundHeader = true; | |||
} | |||
} | |||
if (reader.Length < fixedHeaderInfo.TotalSize) break; | |||
seqReader.Advance(fixedHeaderInfo.TotalSize); | |||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); | |||
packages.Add(package); | |||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||
if (seqReader.End) break; | |||
} | |||
Assert.Single(packages); | |||
} | |||
[Fact] | |||
public void Test1_2() | |||
{ | |||
var reader = new ReadOnlySequence<byte>("303163648162000000190130503701010000016f95973f84000000000000".ToHexBytes()); | |||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||
long totalConsumed = 0; | |||
List<byte[]> packages = new List<byte[]>(); | |||
FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); | |||
while (!seqReader.End) | |||
{ | |||
if (!fixedHeaderInfo.FoundHeader) | |||
{ | |||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||
Assert.Equal(JT1078Package.FH, headerValue); | |||
if (JT1078Package.FH == headerValue) | |||
{ | |||
//sim | |||
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||
Assert.Equal("1901305037", sim); | |||
//根据数据类型处理对应的数据长度 | |||
fixedHeaderInfo.TotalSize += 15; | |||
var dataType = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; | |||
fixedHeaderInfo.TotalSize += 1; | |||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||
Assert.Equal(JT1078DataType.视频I帧, label3.DataType); | |||
int bodyLength = 0; | |||
//透传的时候没有该字段 | |||
if (label3.DataType != JT1078DataType.透传数据) | |||
{ | |||
//时间戳 | |||
bodyLength += 8; | |||
} | |||
//非视频帧时没有该字段 | |||
if (label3.DataType == JT1078DataType.视频I帧 || | |||
label3.DataType == JT1078DataType.视频P帧 || | |||
label3.DataType == JT1078DataType.视频B帧) | |||
{ | |||
//上一个关键帧 + 上一帧 = 2 + 2 | |||
bodyLength += 4; | |||
} | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 2).FirstSpan; | |||
//数据体长度 | |||
fixedHeaderInfo.TotalSize += 2; | |||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||
Assert.Equal(0, bodyLength); | |||
if (bodyLength == 0) | |||
{ | |||
try | |||
{ | |||
seqReader.Advance(fixedHeaderInfo.TotalSize); | |||
var package1 = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); | |||
packages.Add(package1); | |||
} | |||
finally | |||
{ | |||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||
} | |||
continue; | |||
} | |||
//数据体 | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
fixedHeaderInfo.FoundHeader = true; | |||
} | |||
} | |||
if (seqReader.End) break; | |||
} | |||
Assert.Single(packages); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
@@ -251,5 +384,11 @@ namespace JT1078.Gateway.Test | |||
var extension = Path.GetExtension(filename); | |||
var queryParams = uri.Query.Substring(1, uri.Query.Length - 1).Split('&'); | |||
} | |||
class FixedHeaderInfo | |||
{ | |||
public bool FoundHeader { get; set; } | |||
public int TotalSize { get; set; } | |||
} | |||
} | |||
} |
@@ -6,8 +6,9 @@ | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="JT1078.Flv" Version="1.0.0-preview6" /> | |||
<PackageReference Include="JT1078.Hls" Version="1.0.0-preview1" /> | |||
<PackageReference Include="JT1078.Flv" Version="1.0.0-preview8" /> | |||
<PackageReference Include="JT1078.Hls" Version="1.0.0-preview2" /> | |||
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="3.1.7" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.7" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.7" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.7" /> | |||
@@ -1,6 +1,8 @@ | |||
using JT1078.Flv; | |||
using JT1078.Gateway.InMemoryMQ; | |||
using JT1078.Gateway.TestNormalHosting.Services; | |||
using JT1078.Hls; | |||
using JT1078.Hls.Options; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
@@ -27,9 +29,16 @@ namespace JT1078.Gateway.TestNormalHosting | |||
}) | |||
.ConfigureServices((hostContext, services) => | |||
{ | |||
services.AddMemoryCache(); | |||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
services.AddSingleton<FlvEncoder>(); | |||
services.AddSingleton<TSEncoder>(); | |||
services.AddSingleton(new M3U8Option | |||
{ | |||
}); | |||
services.AddSingleton<M3U8FileManage>(); | |||
//使用内存队列实现会话通知 | |||
services.AddJT1078Gateway(hostContext.Configuration) | |||
.AddTcp() | |||
@@ -39,9 +48,17 @@ namespace JT1078.Gateway.TestNormalHosting | |||
.AddNormal() | |||
.AddMsgProducer() | |||
.AddMsgConsumer(); | |||
services.AddHostedService<JT1078NormalMsgHostedService>(); | |||
services.AddHostedService<JT1078FlvNormalMsgHostedService>(); | |||
//services.AddHostedService<JT1078HlsNormalMsgHostedService>(); | |||
}); | |||
//测试1: | |||
//发送完整包 | |||
//303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2 | |||
//测试2: | |||
//发送头部包 | |||
//303163648162000000190130503701010000016f95973f840000000003b6 | |||
//发送数据体 | |||
//00000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2 | |||
await serverHostBuilder.RunConsoleAsync(); | |||
} | |||
} | |||
@@ -11,12 +11,12 @@ using System.Linq; | |||
namespace JT1078.Gateway.TestNormalHosting.Services | |||
{ | |||
public class JT1078NormalMsgHostedService : BackgroundService | |||
public class JT1078FlvNormalMsgHostedService : BackgroundService | |||
{ | |||
private IJT1078PackageConsumer PackageConsumer; | |||
private JT1078HttpSessionManager HttpSessionManager; | |||
private FlvEncoder FlvEncoder; | |||
public JT1078NormalMsgHostedService( | |||
public JT1078FlvNormalMsgHostedService( | |||
FlvEncoder flvEncoder, | |||
JT1078HttpSessionManager httpSessionManager, | |||
IJT1078PackageConsumer packageConsumer) |
@@ -164,6 +164,7 @@ namespace JT1078.Gateway | |||
} | |||
private async Task ReadPipeAsync(JT1078TcpSession session, PipeReader reader) | |||
{ | |||
FixedHeaderInfo fixedHeaderInfo = new FixedHeaderInfo(); | |||
while (true) | |||
{ | |||
ReadResult result = await reader.ReadAsync(); | |||
@@ -179,7 +180,7 @@ namespace JT1078.Gateway | |||
if (result.IsCanceled) break; | |||
if (buffer.Length > 0) | |||
{ | |||
ReaderBuffer(ref buffer, session, out consumed, out examined); | |||
ReaderBuffer(ref buffer, fixedHeaderInfo, session, out consumed, out examined); | |||
} | |||
} | |||
#pragma warning disable CA1031 // Do not catch general exception types | |||
@@ -196,32 +197,32 @@ namespace JT1078.Gateway | |||
} | |||
reader.Complete(); | |||
} | |||
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) | |||
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, FixedHeaderInfo fixedHeaderInfo, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) | |||
{ | |||
consumed = buffer.Start; | |||
examined = buffer.End; | |||
if (buffer.Length < 15) | |||
{ | |||
throw new ArgumentException("not JT1078 package"); | |||
} | |||
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer); | |||
long totalConsumed = 0; | |||
while (!seqReader.End) | |||
{ | |||
if ((seqReader.Length - seqReader.Consumed) < 15) | |||
{ | |||
throw new ArgumentException("not JT1078 package"); | |||
} | |||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||
if (JT1078Package.FH == headerValue) | |||
if (!fixedHeaderInfo.FoundHeader) | |||
{ | |||
//sim | |||
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||
//根据数据类型处理对应的数据长度 | |||
seqReader.Advance(15); | |||
if (seqReader.TryRead(out byte dataType)) | |||
if (seqReader.Length < 4) | |||
throw new ArgumentException("not JT1078 package"); | |||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||
if (JT1078Package.FH == headerValue) | |||
{ | |||
//sim | |||
fixedHeaderInfo.SIM = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||
if (string.IsNullOrEmpty(fixedHeaderInfo.SIM)) | |||
{ | |||
fixedHeaderInfo.SIM = session.SessionID; | |||
} | |||
//根据数据类型处理对应的数据长度 | |||
fixedHeaderInfo.TotalSize += 15; | |||
var dataType = seqReader.Sequence.Slice(seqReader.Consumed+fixedHeaderInfo.TotalSize, 1).FirstSpan[0]; | |||
fixedHeaderInfo.TotalSize += 1; | |||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||
int bodyLength = 0; | |||
//透传的时候没有该字段 | |||
@@ -238,38 +239,67 @@ namespace JT1078.Gateway | |||
//上一个关键帧 + 上一帧 = 2 + 2 | |||
bodyLength += 4; | |||
} | |||
seqReader.Advance(bodyLength); | |||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 2).FirstSpan; | |||
//数据体长度 | |||
seqReader.Advance(2); | |||
fixedHeaderInfo.TotalSize += 2; | |||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||
//数据体 | |||
seqReader.Advance(bodyLength); | |||
if (string.IsNullOrEmpty(sim)) | |||
{ | |||
sim = session.SessionID; | |||
} | |||
SessionManager.TryLink(sim, session); | |||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); | |||
try | |||
if (bodyLength == 0)//数据体长度为0 | |||
{ | |||
if (jT1078UseType== JT1078UseType.Queue) | |||
seqReader.Advance(fixedHeaderInfo.TotalSize); | |||
var package1 = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); | |||
try | |||
{ | |||
jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); | |||
SessionManager.TryLink(fixedHeaderInfo.SIM, session); | |||
if (jT1078UseType == JT1078UseType.Queue) | |||
{ | |||
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.FirstSpan.ToArray()); | |||
} | |||
else | |||
{ | |||
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1.FirstSpan)); | |||
} | |||
} | |||
else | |||
catch (Exception ex) | |||
{ | |||
jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); | |||
Logger.LogError(ex, $"[Parse]:{package1.ToArray().ToHexString()}"); | |||
} | |||
finally | |||
{ | |||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||
fixedHeaderInfo.Reset(); | |||
} | |||
continue; | |||
} | |||
catch (Exception ex) | |||
{ | |||
Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); | |||
} | |||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||
if (seqReader.End) break; | |||
//数据体 | |||
fixedHeaderInfo.TotalSize += bodyLength; | |||
fixedHeaderInfo.FoundHeader = true; | |||
} | |||
} | |||
if (seqReader.Length < fixedHeaderInfo.TotalSize) break; | |||
seqReader.Advance(fixedHeaderInfo.TotalSize); | |||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); | |||
try | |||
{ | |||
SessionManager.TryLink(fixedHeaderInfo.SIM, session); | |||
if (jT1078UseType == JT1078UseType.Queue) | |||
{ | |||
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package.ToArray()); | |||
} | |||
else | |||
{ | |||
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package.FirstSpan)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); | |||
} | |||
finally | |||
{ | |||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||
fixedHeaderInfo.Reset(); | |||
} | |||
} | |||
if (seqReader.Length == totalConsumed) | |||
{ | |||
@@ -298,5 +328,17 @@ namespace JT1078.Gateway | |||
} | |||
return bcdSb.ToString().TrimStart('0'); | |||
} | |||
class FixedHeaderInfo | |||
{ | |||
public bool FoundHeader { get; set; } | |||
public int TotalSize { get; set; } | |||
public string SIM { get; set; } | |||
public void Reset() | |||
{ | |||
FoundHeader = false; | |||
TotalSize = 0; | |||
} | |||
} | |||
} | |||
} |