using System.Collections.Concurrent;
using JT808.Protocol.Interfaces;
using JT808.Protocol.MessagePack;
namespace JT808.Protocol.Internal
{
///
/// 默认分包合并实现
///
public class DefaultMerger : IMerger
{
///
/// 分包数据缓存
/// key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)
///
private readonly ConcurrentDictionary>> SplitPackages = new();
///
public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
{
body = null;
if (header.PackageIndex == header.PackgeCount)
{
if (SplitPackages.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryRemove(header.MsgId, out var packages))
{
SplitPackages.TryRemove(header.TerminalPhoneNo, out _);
var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
try
{
var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
{
body = instance.DeserializeExt(ref reader, config);
return true;
}
}
finally
{
JT808ArrayPool.Return(buffer);
}
}
return default;
}
else
{
SplitPackages.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
{
value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
{
item.Add((header.PackageIndex, data));
return item;
});
return value;
});
}
return false;
}
}
}