diff --git a/src/JT1078.Protocol.Test/Extensions/JT1078PackageExtensionsTest.cs b/src/JT1078.Protocol.Test/Extensions/JT1078PackageExtensionsTest.cs index 0033236..0290a71 100644 --- a/src/JT1078.Protocol.Test/Extensions/JT1078PackageExtensionsTest.cs +++ b/src/JT1078.Protocol.Test/Extensions/JT1078PackageExtensionsTest.cs @@ -1,4 +1,5 @@ -using JT1078.Protocol.Extensions; +using JT1078.Protocol.Enums; +using JT1078.Protocol.Extensions; using System; using System.Collections.Generic; using System.IO; @@ -23,7 +24,7 @@ namespace JT1078.Protocol.Test.Extensions var bytes = data[6].ToHexBytes(); JT1078Package package = JT1078Serializer.Deserialize(bytes); mergeBodyLength += package.DataBodyLength; - merge = JT1078Serializer.Merge(package); + merge = JT1078Serializer.Merge(package,JT808ChannelType.Live); } var packages = merge.Bodies.ConvertVideo(merge.SIM, merge.LogicChannelNumber, merge.Label2.PT, merge.Label3.DataType, merge.Timestamp, merge.LastFrameInterval, merge.LastFrameInterval); diff --git a/src/JT1078.Protocol.Test/H264/H264DecoderTest.cs b/src/JT1078.Protocol.Test/H264/H264DecoderTest.cs index 00aa8ee..d4da921 100644 --- a/src/JT1078.Protocol.Test/H264/H264DecoderTest.cs +++ b/src/JT1078.Protocol.Test/H264/H264DecoderTest.cs @@ -9,6 +9,7 @@ using Newtonsoft.Json; using JT1078.Protocol.H264; using JT808.Protocol.Extensions; using JT1078.Protocol.MessagePack; +using JT1078.Protocol.Enums; namespace JT1078.Protocol.Test.H264 { @@ -41,7 +42,7 @@ namespace JT1078.Protocol.Test.H264 var bytes = data[6].ToHexBytes(); JT1078Package package = JT1078Serializer.Deserialize(bytes); mergeBodyLength += package.DataBodyLength; - Package = JT1078Serializer.Merge(package); + Package = JT1078Serializer.Merge(package,JT808ChannelType.Live); } H264Decoder decoder = new H264Decoder(); var nalus = decoder.ParseNALU(Package); @@ -128,7 +129,7 @@ namespace JT1078.Protocol.Test.H264 { var bytes = line.ToHexBytes(); JT1078Package package = JT1078Serializer.Deserialize(bytes); - var packageMerge = JT1078Serializer.Merge(package); + var packageMerge = JT1078Serializer.Merge(package,JT808ChannelType.Live); if (packageMerge != null) { var nalus = decoder.ParseNALU(packageMerge); diff --git a/src/JT1078.Protocol.Test/JT1078SerializerTest.cs b/src/JT1078.Protocol.Test/JT1078SerializerTest.cs index eda06ee..30c7e5d 100644 --- a/src/JT1078.Protocol.Test/JT1078SerializerTest.cs +++ b/src/JT1078.Protocol.Test/JT1078SerializerTest.cs @@ -345,7 +345,7 @@ namespace JT1078.Protocol.Test var bytes = data[6].ToHexBytes(); JT1078Package package = JT1078Serializer.Deserialize(bytes); mergeBodyLength += package.DataBodyLength; - merge = JT1078Serializer.Merge(package); + merge = JT1078Serializer.Merge(package,JT808ChannelType.Live); } Assert.NotNull(merge); Assert.Equal(mergeBodyLength, merge.Bodies.Length); diff --git a/src/JT1078.Protocol/Enums/JT808ChannelType.cs b/src/JT1078.Protocol/Enums/JT808ChannelType.cs new file mode 100644 index 0000000..e3c2772 --- /dev/null +++ b/src/JT1078.Protocol/Enums/JT808ChannelType.cs @@ -0,0 +1,17 @@ +namespace JT1078.Protocol.Enums +{ + /// + /// data channel type + /// + public enum JT808ChannelType + { + /// + /// live channel + /// + Live, + /// + /// history channel + /// + History + } +} \ No newline at end of file diff --git a/src/JT1078.Protocol/JT1078Serializer.cs b/src/JT1078.Protocol/JT1078Serializer.cs index 819d727..18f6753 100644 --- a/src/JT1078.Protocol/JT1078Serializer.cs +++ b/src/JT1078.Protocol/JT1078Serializer.cs @@ -1,19 +1,19 @@ -using JT1078.Protocol.Enums; -using JT1078.Protocol.Extensions; -using JT1078.Protocol.MessagePack; -using System; +using System; using System.Collections.Concurrent; -using System.Collections.Generic; using System.IO; using System.Linq; using System.Text; using System.Text.Json; +using JT1078.Protocol.Enums; +using JT1078.Protocol.Extensions; +using JT1078.Protocol.MessagePack; namespace JT1078.Protocol { public static class JT1078Serializer { - private readonly static ConcurrentDictionary JT1078PackageGroupDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + private readonly static ConcurrentDictionary livePackageGroup = new(StringComparer.OrdinalIgnoreCase); + private readonly static ConcurrentDictionary historyPackageGroup = new(StringComparer.OrdinalIgnoreCase); public static byte[] Serialize(JT1078Package package, int minBufferSize = 4096) { byte[] buffer = JT1078ArrayPool.Rent(minBufferSize); @@ -72,18 +72,33 @@ namespace JT1078.Protocol jT1078Package.Bodies = jT1078MessagePackReader.ReadRemainArray().ToArray(); return jT1078Package; } - public static JT1078Package Merge(JT1078Package jT1078Package) + + /// + /// merge package + /// due to the agreement cannot distinguish the Stream type, so need clear Stream type at the time of merger, avoid confusion + /// + /// package of jt1078 + /// package type is live or history + /// + /// + public static JT1078Package Merge(JT1078Package jT1078Package,JT808ChannelType channelType) { string cacheKey = jT1078Package.GetKey(); + var packageGroup = channelType switch + { + JT808ChannelType.Live=>livePackageGroup, + JT808ChannelType.History=>historyPackageGroup, + _=>throw new Exception("channel type error") + }; if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) { - JT1078PackageGroupDict.TryRemove(cacheKey, out _); - JT1078PackageGroupDict.TryAdd(cacheKey, jT1078Package); + packageGroup.TryRemove(cacheKey, out _); + packageGroup.TryAdd(cacheKey, jT1078Package); return default; } else if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) { - if (JT1078PackageGroupDict.TryGetValue(cacheKey, out var tmpPackage)) + if (packageGroup.TryGetValue(cacheKey, out var tmpPackage)) { var totalLength = tmpPackage.Bodies.Length + jT1078Package.Bodies.Length; byte[] poolBytes = JT1078ArrayPool.Rent(totalLength); @@ -92,13 +107,13 @@ namespace JT1078.Protocol jT1078Package.Bodies.CopyTo(tmpSpan.Slice(tmpPackage.Bodies.Length)); tmpPackage.Bodies = tmpSpan.Slice(0, totalLength).ToArray(); JT1078ArrayPool.Return(poolBytes); - JT1078PackageGroupDict[cacheKey] = tmpPackage; + packageGroup[cacheKey] = tmpPackage; } return default; } else if (jT1078Package.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) { - if (JT1078PackageGroupDict.TryRemove(cacheKey, out var tmpPackage)) + if (packageGroup.TryRemove(cacheKey, out var tmpPackage)) { var totalLength = tmpPackage.Bodies.Length + jT1078Package.Bodies.Length; byte[] poolBytes = JT1078ArrayPool.Rent(totalLength);