using System.Collections.Concurrent;
using JT808.Protocol.Interfaces;
using JT808.Protocol.MessagePack;
namespace JT808.Protocol.Internal
{
///
/// 默认分包合并实现
///
public class DefaultMerger : IMerger, IDisposable
{
///
/// 分包数据缓存
/// key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)
///
private readonly ConcurrentDictionary>> splitPackageDictionary = new();
private readonly ConcurrentDictionary timeoutDictionary = new ConcurrentDictionary();
private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
private readonly CancellationTokenSource cts = new CancellationTokenSource();
private bool disposed;
public DefaultMerger()
{
Task.Run(async () =>
{
while (!cts.IsCancellationRequested)
{
timeoutDictionary.ToList().ForEach(x =>
{
var key = x.Key;
var datetime = x.Value;
if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
{
timeoutDictionary.TryRemove(key, out _);
}
});
await Task.Delay(cleanInterval);
}
}, cts.Token);
}
///
public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
{
body = null;
var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
if (!CheckTimeout(timeoutKey)) return false;
var timeout = DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond);
if (timeoutDictionary.TryAdd(timeoutKey, timeout))
timeoutDictionary.TryUpdate(timeoutKey, timeout, timeout);
if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
{
packages.Add((header.PackageIndex, data));
if (packages.Count != header.PackgeCount)
{
return false;
}
item.TryRemove(header.MsgId, out _);
splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);
var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
try
{
var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
{
body = instance.DeserializeExt(ref reader, config);
header.MessageBodyProperty.IsMerged = true;
return true;
}
}
finally
{
JT808ArrayPool.Return(buffer);
}
}
else
{
splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
{
value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
{
item.Add((header.PackageIndex, data));
return item;
});
return value;
});
}
return false;
}
private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;
private const char keyJoiner = '-';
private const string keyJoinerNET7 = "-";
private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoinerNET7, new[] { phoneNumber, messageId.ToString() });
private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
{
phoneNumber = null;
messageId = 0;
var tmp = key.Split(keyJoiner);
if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
{
phoneNumber = tmp[0];
return true;
}
return false;
}
protected virtual void Dispose(bool disposing)
{
if (!disposed)
{
if (disposing)
{
cts.Cancel();
cts.Dispose();
}
disposed = true;
}
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
~DefaultMerger()
{
Dispose(false);
}
}
}