Browse Source

自动组包添加超时策略及定时清理过期缓存

tags/v2.6.2
yedajiang44 2 years ago
parent
commit
1c2c59ae21
7 changed files with 144 additions and 29 deletions
  1. +38
    -12
      src/JT808.Protocol.Test/JT808SerializerTest.cs
  2. +4
    -0
      src/JT808.Protocol/Interfaces/GlobalConfigBase.cs
  3. +10
    -0
      src/JT808.Protocol/Interfaces/IJT808Config.cs
  4. +72
    -6
      src/JT808.Protocol/Internal/DefaultMerger.cs
  5. +18
    -1
      src/JT808.Protocol/JT808.Protocol.xml
  6. +2
    -7
      src/JT808.Protocol/JT808Package.cs
  7. +0
    -3
      src/JT808.Protocol/JT808Serializer.cs

+ 38
- 12
src/JT808.Protocol.Test/JT808SerializerTest.cs View File

@@ -3,6 +3,7 @@ using System.Collections.Generic;
using System.IO; using System.IO;
using System.Reflection; using System.Reflection;
using System.Text; using System.Text;
using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using JT808.Protocol.Enums; using JT808.Protocol.Enums;
using JT808.Protocol.Extensions; using JT808.Protocol.Extensions;
@@ -49,30 +50,55 @@ namespace JT808.Protocol.Test
[Fact] [Fact]
public void MergerTest() public void MergerTest()
{ {
var index = 0;
new List<string>
var config = new DefaultGlobalConfig();
config.EnableAutoMerge = true;
config.AutoMergeTimeoutSecond = 5;
var array = new[]
{ {
//分包数据第二包
"7E010420D301127540038104E2000200020000007A04000000000000007B0200000000007C1400000000000000000000000000000000000000000000F364381E06000E1007D003020101FFFFFFFFFFFFFFFFFF0000000302000000060302320503021B320503021E320503020A320503020302FFFFFFFF0000F365311E06000E1007D003020101FFFFFFFF00000007012C0078FFFFFF320503023205030232050302320503023205030200FFFF0000FF010537303231380000FF02144B352D50000000000000000000000000000000000000F373040BB80BB80000FF0006FEEA4C0510BD1F7E",
//分包数据第一包 //分包数据第一包

//分包数据第二包
"7E010420D301127540038104E2000200020000007A04000000000000007B0200000000007C1400000000000000000000000000000000000000000000F364381E06000E1007D003020101FFFFFFFFFFFFFFFFFF0000000302000000060302320503021B320503021E320503020A320503020302FFFFFFFF0000F365311E06000E1007D003020101FFFFFFFF00000007012C0078FFFFFF320503023205030232050302320503023205030200FFFF0000FF010537303231380000FF02144B352D50000000000000000000000000000000000000F373040BB80BB80000FF0006FEEA4C0510BD1F7E",
};

//正序
for (int i = 1; i <= array.Length; i++)
{
var package = config.GetSerializer().Deserialize(array[i - 1].ToHexBytes());
if (i == array.Length)
{
Assert.NotNull(package.Bodies);
}
else
{
Assert.Null(package.Bodies);
}
} }
.ForEach(data =>

//倒序
for (int i = array.Length - 1; i >= 0; i--)
{ {
index++;
var config = new DefaultGlobalConfig();
config.EnableAutoMerge = true;
var package = config.GetSerializer().Deserialize(data.ToHexBytes());
if (index == package.Header.PackgeCount)
var package = config.GetSerializer().Deserialize(array[i].ToHexBytes());
if (i == 0)
{ {
Assert.NotNull(package.Bodies); Assert.NotNull(package.Bodies);
} }
else else
{ {
Assert.Null(package.Bodies); Assert.Null(package.Bodies);
Assert.NotEmpty(package.SubDataBodies);
} }
});
}

//超时
for (int i = 1; i <= array.Length; i++)
{
var package = config.GetSerializer().Deserialize(array[i - 1].ToHexBytes());
if (i != array.Length)
{
Thread.Sleep(TimeSpan.FromSeconds(config.AutoMergeTimeoutSecond + 1));
}
Assert.Null(package.Bodies);
}
} }
} }
} }

+ 4
- 0
src/JT808.Protocol/Interfaces/GlobalConfigBase.cs View File

@@ -124,6 +124,10 @@ namespace JT808.Protocol.Interfaces
public virtual IJT808_0x8105_Cusotm_Factory JT808_0x8105_Cusotm_Factory { get; set; } public virtual IJT808_0x8105_Cusotm_Factory JT808_0x8105_Cusotm_Factory { get; set; }
/// <inheritdoc/> /// <inheritdoc/>
public virtual bool EnableAutoMerge { get; set; } public virtual bool EnableAutoMerge { get; set; }
/// <inheritdoc/>
public double AutoMergeTimeoutSecond { get; set; } = 300;
/// <inheritdoc/>
public IMerger Jt808PackageMerger { get; set; } = new DefaultMerger();


/// <summary> /// <summary>
/// 外部扩展程序集注册 /// 外部扩展程序集注册


+ 10
- 0
src/JT808.Protocol/Interfaces/IJT808Config.cs View File

@@ -36,6 +36,10 @@ namespace JT808.Protocol
/// </summary> /// </summary>
IJT808SplitPackageStrategy SplitPackageStrategy { get; set; } IJT808SplitPackageStrategy SplitPackageStrategy { get; set; }
/// <summary> /// <summary>
/// 808自动合并组包接口
/// </summary>
IMerger Jt808PackageMerger { get; set; }
/// <summary>
/// 序列化器工厂 /// 序列化器工厂
/// </summary> /// </summary>
IJT808FormatterFactory FormatterFactory { get; set; } IJT808FormatterFactory FormatterFactory { get; set; }
@@ -111,6 +115,12 @@ namespace JT808.Protocol
/// </summary> /// </summary>
/// <remarks>启用该选项存在一定风险,请谨慎使用。</remarks> /// <remarks>启用该选项存在一定风险,请谨慎使用。</remarks>
bool EnableAutoMerge { get; set; } bool EnableAutoMerge { get; set; }
/// <summary>
/// 自动合并分包超时时间,收到第一个分包开始计算,单位:秒,默认值300秒
/// <para>如该值为30且第一个分包在2011-11-11 11:11:11时收到,则在2011-11-11 11:11:41时认为过期,期间如果未收到所有分包,则自动合并分包将无法完成,并将自动清理相关缓存</para>
/// </summary>
double AutoMergeTimeoutSecond { get; set; }

/// <summary> /// <summary>
/// 全局注册外部程序集 /// 全局注册外部程序集
/// </summary> /// </summary>


+ 72
- 6
src/JT808.Protocol/Internal/DefaultMerger.cs View File

@@ -7,21 +7,45 @@ namespace JT808.Protocol.Internal
/// <summary> /// <summary>
/// 默认分包合并实现 /// 默认分包合并实现
/// </summary> /// </summary>
public class DefaultMerger : IMerger
public class DefaultMerger : IMerger, IDisposable
{ {
/// <summary> /// <summary>
/// 分包数据缓存 /// 分包数据缓存
/// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para> /// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>
/// </summary> /// </summary>
private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> SplitPackages = new();
private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> splitPackageDictionary = new();


private readonly ConcurrentDictionary<string, DateTime> timeoutDictionary = new();
private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
private readonly CancellationTokenSource cts = new();
private bool disposed;
public DefaultMerger()
{
Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
timeoutDictionary.ToList().ForEach(x =>
{
var (key, datetime) = x;
if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
{
timeoutDictionary.TryRemove(key, out _);
}
});
await Task.Delay(cleanInterval);
}
}, cts.Token);
}
/// <inheritdoc/> /// <inheritdoc/>
public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body) public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
{ {
// TODO: 添加SplitPackages缓存超时,达到阈值时移除该项缓存 // TODO: 添加SplitPackages缓存超时,达到阈值时移除该项缓存
body = null; body = null;

if (SplitPackages.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
if (!CheckTimeout(timeoutKey)) return false;
timeoutDictionary.TryAdd(timeoutKey, DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond));
if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
{ {
packages.Add((header.PackageIndex, data)); packages.Add((header.PackageIndex, data));
if (packages.Count != header.PackgeCount) if (packages.Count != header.PackgeCount)
@@ -29,7 +53,7 @@ namespace JT808.Protocol.Internal
return false; return false;
} }
item.TryRemove(header.MsgId, out _); item.TryRemove(header.MsgId, out _);
SplitPackages.TryRemove(header.TerminalPhoneNo, out _);
splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);


var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray(); var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();


@@ -50,7 +74,7 @@ namespace JT808.Protocol.Internal
} }
else else
{ {
SplitPackages.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
{ {
value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) => value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
{ {
@@ -62,5 +86,47 @@ namespace JT808.Protocol.Internal
} }
return false; return false;
} }
private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;

private const string keyJoiner = "-";

private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoiner, new[] { phoneNumber, messageId.ToString() });

private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
{
phoneNumber = null;
messageId = 0;
var tmp = key.Split(keyJoiner);
if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
{
phoneNumber = tmp[0];
return true;
}
return false;
}

protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
cts.Cancel();
cts.Dispose();
}
disposed = true;
}
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

~DefaultMerger()
{
Dispose(false);
}
} }
} }

+ 18
- 1
src/JT808.Protocol/JT808.Protocol.xml View File

@@ -4354,6 +4354,12 @@
<member name="P:JT808.Protocol.Interfaces.GlobalConfigBase.EnableAutoMerge"> <member name="P:JT808.Protocol.Interfaces.GlobalConfigBase.EnableAutoMerge">
<inheritdoc/> <inheritdoc/>
</member> </member>
<member name="P:JT808.Protocol.Interfaces.GlobalConfigBase.AutoMergeTimeoutSecond">
<inheritdoc/>
</member>
<member name="P:JT808.Protocol.Interfaces.GlobalConfigBase.Jt808PackageMerger">
<inheritdoc/>
</member>
<member name="M:JT808.Protocol.Interfaces.GlobalConfigBase.Register(System.Reflection.Assembly[])"> <member name="M:JT808.Protocol.Interfaces.GlobalConfigBase.Register(System.Reflection.Assembly[])">
<summary> <summary>
外部扩展程序集注册 外部扩展程序集注册
@@ -4794,6 +4800,11 @@
注意:处理808的分包读取完流需要先进行转义在进行分包 注意:处理808的分包读取完流需要先进行转义在进行分包
</summary> </summary>
</member> </member>
<member name="P:JT808.Protocol.IJT808Config.Jt808PackageMerger">
<summary>
808自动合并组包接口
</summary>
</member>
<member name="P:JT808.Protocol.IJT808Config.FormatterFactory"> <member name="P:JT808.Protocol.IJT808Config.FormatterFactory">
<summary> <summary>
序列化器工厂 序列化器工厂
@@ -4887,6 +4898,12 @@
</summary> </summary>
<remarks>启用该选项存在一定风险,请谨慎使用。</remarks> <remarks>启用该选项存在一定风险,请谨慎使用。</remarks>
</member> </member>
<member name="P:JT808.Protocol.IJT808Config.AutoMergeTimeoutSecond">
<summary>
自动合并分包超时时间,收到第一个分包开始计算,单位:秒,默认值300秒
<para>如该值为30且第一个分包在2011-11-11 11:11:11时收到,则在2011-11-11 11:11:41时认为过期,期间如果未收到所有分包,则自动合并分包将无法完成,并将自动清理相关缓存</para>
</summary>
</member>
<member name="M:JT808.Protocol.IJT808Config.Register(System.Reflection.Assembly[])"> <member name="M:JT808.Protocol.IJT808Config.Register(System.Reflection.Assembly[])">
<summary> <summary>
全局注册外部程序集 全局注册外部程序集
@@ -4936,7 +4953,7 @@
默认分包合并实现 默认分包合并实现
</summary> </summary>
</member> </member>
<member name="F:JT808.Protocol.Internal.DefaultMerger.SplitPackages">
<member name="F:JT808.Protocol.Internal.DefaultMerger.splitPackageDictionary">
<summary> <summary>
分包数据缓存 分包数据缓存
<para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para> <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>


+ 2
- 7
src/JT808.Protocol/JT808Package.cs View File

@@ -121,16 +121,11 @@ namespace JT808.Protocol
//读取分包的数据体 //读取分包的数据体
try try
{ {
var data = reader.ReadArray(jT808Package.Header.MessageBodyProperty.DataLength).ToArray();

if (config.EnableAutoMerge && config.GetSerializer().merger.TryMerge(jT808Package.Header, data, config, out var body))
jT808Package.SubDataBodies = reader.ReadArray(jT808Package.Header.MessageBodyProperty.DataLength).ToArray();
if (config.EnableAutoMerge && config.Jt808PackageMerger.TryMerge(jT808Package.Header, jT808Package.SubDataBodies, config, out var body))
{ {
jT808Package.Bodies = body; jT808Package.Bodies = body;
} }
else
{
jT808Package.SubDataBodies = data;
}
} }
catch (Exception ex) catch (Exception ex)
{ {


+ 0
- 3
src/JT808.Protocol/JT808Serializer.cs View File

@@ -17,7 +17,6 @@ namespace JT808.Protocol
/// </summary> /// </summary>
public class JT808Serializer public class JT808Serializer
{ {
internal readonly IMerger merger;
private readonly static JT808Package jT808Package = new JT808Package(); private readonly static JT808Package jT808Package = new JT808Package();


private readonly static Type JT808_Header_Package_Type = typeof(JT808HeaderPackage); private readonly static Type JT808_Header_Package_Type = typeof(JT808HeaderPackage);
@@ -41,8 +40,6 @@ namespace JT808.Protocol
/// <param name="jT808Config"></param> /// <param name="jT808Config"></param>
public JT808Serializer(IJT808Config jT808Config) public JT808Serializer(IJT808Config jT808Config)
{ {
if (jT808Config.EnableAutoMerge)
merger = new DefaultMerger();
this.jT808Config = jT808Config; this.jT808Config = jT808Config;
} }




Loading…
Cancel
Save