diff --git a/src/JT808.Protocol.Test/JT808SerializerTest.cs b/src/JT808.Protocol.Test/JT808SerializerTest.cs index 80ba596..57e6421 100644 --- a/src/JT808.Protocol.Test/JT808SerializerTest.cs +++ b/src/JT808.Protocol.Test/JT808SerializerTest.cs @@ -3,6 +3,7 @@ using System.Collections.Generic; using System.IO; using System.Reflection; using System.Text; +using System.Threading; using System.Threading.Tasks; using JT808.Protocol.Enums; using JT808.Protocol.Extensions; @@ -49,30 +50,55 @@ namespace JT808.Protocol.Test [Fact] public void MergerTest() { - var index = 0; - new List + var config = new DefaultGlobalConfig(); + config.EnableAutoMerge = true; + config.AutoMergeTimeoutSecond = 5; + var array = new[] { - //分包数据第二包 - "7E010420D301127540038104E2000200020000007A04000000000000007B0200000000007C1400000000000000000000000000000000000000000000F364381E06000E1007D003020101FFFFFFFFFFFFFFFFFF0000000302000000060302320503021B320503021E320503020A320503020302FFFFFFFF0000F365311E06000E1007D003020101FFFFFFFF00000007012C0078FFFFFF320503023205030232050302320503023205030200FFFF0000FF010537303231380000FF02144B352D50000000000000000000000000000000000000F373040BB80BB80000FF0006FEEA4C0510BD1F7E", //分包数据第一包分包数据第二包 + "7E010420D301127540038104E2000200020000007A04000000000000007B0200000000007C1400000000000000000000000000000000000000000000F364381E06000E1007D003020101FFFFFFFFFFFFFFFFFF0000000302000000060302320503021B320503021E320503020A320503020302FFFFFFFF0000F365311E06000E1007D003020101FFFFFFFF00000007012C0078FFFFFF320503023205030232050302320503023205030200FFFF0000FF010537303231380000FF02144B352D50000000000000000000000000000000000000F373040BB80BB80000FF0006FEEA4C0510BD1F7E", + }; + + //正序 + for (int i = 1; i <= array.Length; i++) + { + var package = config.GetSerializer().Deserialize(array[i - 1].ToHexBytes()); + if (i == array.Length) + { + Assert.NotNull(package.Bodies); + } + else + { + Assert.Null(package.Bodies); + } } - .ForEach(data => + + //倒序 + for (int i = array.Length - 1; i >= 0; i--) { - index++; - var config = new DefaultGlobalConfig(); - config.EnableAutoMerge = true; - var package = config.GetSerializer().Deserialize(data.ToHexBytes()); - if (index == package.Header.PackgeCount) + var package = config.GetSerializer().Deserialize(array[i].ToHexBytes()); + if (i == 0) { Assert.NotNull(package.Bodies); } else { Assert.Null(package.Bodies); - Assert.NotEmpty(package.SubDataBodies); } - }); + } + + //超时 + for (int i = 1; i <= array.Length; i++) + { + var package = config.GetSerializer().Deserialize(array[i - 1].ToHexBytes()); + if (i != array.Length) + { + Thread.Sleep(TimeSpan.FromSeconds(config.AutoMergeTimeoutSecond + 1)); + } + Assert.Null(package.Bodies); + } } } } diff --git a/src/JT808.Protocol/Interfaces/GlobalConfigBase.cs b/src/JT808.Protocol/Interfaces/GlobalConfigBase.cs index 84c8210..e47bd89 100644 --- a/src/JT808.Protocol/Interfaces/GlobalConfigBase.cs +++ b/src/JT808.Protocol/Interfaces/GlobalConfigBase.cs @@ -124,6 +124,10 @@ namespace JT808.Protocol.Interfaces public virtual IJT808_0x8105_Cusotm_Factory JT808_0x8105_Cusotm_Factory { get; set; } /// public virtual bool EnableAutoMerge { get; set; } + /// + public double AutoMergeTimeoutSecond { get; set; } = 300; + /// + public IMerger Jt808PackageMerger { get; set; } = new DefaultMerger(); /// /// 外部扩展程序集注册 diff --git a/src/JT808.Protocol/Interfaces/IJT808Config.cs b/src/JT808.Protocol/Interfaces/IJT808Config.cs index 7ede120..327aa31 100644 --- a/src/JT808.Protocol/Interfaces/IJT808Config.cs +++ b/src/JT808.Protocol/Interfaces/IJT808Config.cs @@ -36,6 +36,10 @@ namespace JT808.Protocol /// IJT808SplitPackageStrategy SplitPackageStrategy { get; set; } /// + /// 808自动合并组包接口 + /// + IMerger Jt808PackageMerger { get; set; } + /// /// 序列化器工厂 /// IJT808FormatterFactory FormatterFactory { get; set; } @@ -111,6 +115,12 @@ namespace JT808.Protocol /// /// 启用该选项存在一定风险,请谨慎使用。 bool EnableAutoMerge { get; set; } + /// + /// 自动合并分包超时时间,收到第一个分包开始计算,单位:秒,默认值300秒 + /// 如该值为30且第一个分包在2011-11-11 11:11:11时收到,则在2011-11-11 11:11:41时认为过期,期间如果未收到所有分包,则自动合并分包将无法完成,并将自动清理相关缓存 + /// + double AutoMergeTimeoutSecond { get; set; } + /// /// 全局注册外部程序集 /// diff --git a/src/JT808.Protocol/Internal/DefaultMerger.cs b/src/JT808.Protocol/Internal/DefaultMerger.cs index 6bf4b03..672161a 100644 --- a/src/JT808.Protocol/Internal/DefaultMerger.cs +++ b/src/JT808.Protocol/Internal/DefaultMerger.cs @@ -7,21 +7,45 @@ namespace JT808.Protocol.Internal /// /// 默认分包合并实现 /// - public class DefaultMerger : IMerger + public class DefaultMerger : IMerger, IDisposable { /// /// 分包数据缓存 /// key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据) /// - private readonly ConcurrentDictionary>> SplitPackages = new(); + private readonly ConcurrentDictionary>> splitPackageDictionary = new(); + private readonly ConcurrentDictionary timeoutDictionary = new(); + private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60); + private readonly CancellationTokenSource cts = new(); + private bool disposed; + public DefaultMerger() + { + Task.Run(async () => + { + while (!cts.IsCancellationRequested) + { + timeoutDictionary.ToList().ForEach(x => + { + var (key, datetime) = x; + if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _)) + { + timeoutDictionary.TryRemove(key, out _); + } + }); + await Task.Delay(cleanInterval); + } + }, cts.Token); + } /// public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body) { // TODO: 添加SplitPackages缓存超时,达到阈值时移除该项缓存 body = null; - - if (SplitPackages.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages)) + var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId); + if (!CheckTimeout(timeoutKey)) return false; + timeoutDictionary.TryAdd(timeoutKey, DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond)); + if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages)) { packages.Add((header.PackageIndex, data)); if (packages.Count != header.PackgeCount) @@ -29,7 +53,7 @@ namespace JT808.Protocol.Internal return false; } item.TryRemove(header.MsgId, out _); - SplitPackages.TryRemove(header.TerminalPhoneNo, out _); + splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _); var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray(); @@ -50,7 +74,7 @@ namespace JT808.Protocol.Internal } else { - SplitPackages.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) => + splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) => { value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) => { @@ -62,5 +86,47 @@ namespace JT808.Protocol.Internal } return false; } + private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now; + + private const string keyJoiner = "-"; + + private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoiner, new[] { phoneNumber, messageId.ToString() }); + + private bool TryParseKey(string key, out string phoneNumber, out ushort messageId) + { + phoneNumber = null; + messageId = 0; + var tmp = key.Split(keyJoiner); + if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId)) + { + phoneNumber = tmp[0]; + return true; + } + return false; + } + + protected virtual void Dispose(bool disposing) + { + if (!disposed) + { + if (disposing) + { + cts.Cancel(); + cts.Dispose(); + } + disposed = true; + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(this); + } + + ~DefaultMerger() + { + Dispose(false); + } } } \ No newline at end of file diff --git a/src/JT808.Protocol/JT808.Protocol.xml b/src/JT808.Protocol/JT808.Protocol.xml index 4bd4c81..8b33370 100644 --- a/src/JT808.Protocol/JT808.Protocol.xml +++ b/src/JT808.Protocol/JT808.Protocol.xml @@ -4354,6 +4354,12 @@ + + + + + + 外部扩展程序集注册 @@ -4794,6 +4800,11 @@ 注意:处理808的分包读取完流需要先进行转义在进行分包 + + + 808自动合并组包接口 + + 序列化器工厂 @@ -4887,6 +4898,12 @@ 启用该选项存在一定风险,请谨慎使用。 + + + 自动合并分包超时时间,收到第一个分包开始计算,单位:秒,默认值300秒 + 如该值为30且第一个分包在2011-11-11 11:11:11时收到,则在2011-11-11 11:11:41时认为过期,期间如果未收到所有分包,则自动合并分包将无法完成,并将自动清理相关缓存 + + 全局注册外部程序集 @@ -4936,7 +4953,7 @@ 默认分包合并实现 - + 分包数据缓存 key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据) diff --git a/src/JT808.Protocol/JT808Package.cs b/src/JT808.Protocol/JT808Package.cs index 07b5ef5..b84b5eb 100644 --- a/src/JT808.Protocol/JT808Package.cs +++ b/src/JT808.Protocol/JT808Package.cs @@ -121,16 +121,11 @@ namespace JT808.Protocol //读取分包的数据体 try { - var data = reader.ReadArray(jT808Package.Header.MessageBodyProperty.DataLength).ToArray(); - - if (config.EnableAutoMerge && config.GetSerializer().merger.TryMerge(jT808Package.Header, data, config, out var body)) + jT808Package.SubDataBodies = reader.ReadArray(jT808Package.Header.MessageBodyProperty.DataLength).ToArray(); + if (config.EnableAutoMerge && config.Jt808PackageMerger.TryMerge(jT808Package.Header, jT808Package.SubDataBodies, config, out var body)) { jT808Package.Bodies = body; } - else - { - jT808Package.SubDataBodies = data; - } } catch (Exception ex) { diff --git a/src/JT808.Protocol/JT808Serializer.cs b/src/JT808.Protocol/JT808Serializer.cs index 2312b9e..5b938ae 100644 --- a/src/JT808.Protocol/JT808Serializer.cs +++ b/src/JT808.Protocol/JT808Serializer.cs @@ -17,7 +17,6 @@ namespace JT808.Protocol /// public class JT808Serializer { - internal readonly IMerger merger; private readonly static JT808Package jT808Package = new JT808Package(); private readonly static Type JT808_Header_Package_Type = typeof(JT808HeaderPackage); @@ -41,8 +40,6 @@ namespace JT808.Protocol /// public JT808Serializer(IJT808Config jT808Config) { - if (jT808Config.EnableAutoMerge) - merger = new DefaultMerger(); this.jT808Config = jT808Config; }