You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

132 line
5.2 KiB

  1. using System.Collections.Concurrent;
  2. using JT808.Protocol.Interfaces;
  3. using JT808.Protocol.MessagePack;
  4. namespace JT808.Protocol.Internal
  5. {
  6. /// <summary>
  7. /// 默认分包合并实现
  8. /// </summary>
  9. public class DefaultMerger : IMerger, IDisposable
  10. {
  11. /// <summary>
  12. /// 分包数据缓存
  13. /// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>
  14. /// </summary>
  15. private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> splitPackageDictionary = new();
  16. private readonly ConcurrentDictionary<string, DateTime> timeoutDictionary = new();
  17. private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
  18. private readonly CancellationTokenSource cts = new();
  19. private bool disposed;
  20. public DefaultMerger()
  21. {
  22. Task.Run(async () =>
  23. {
  24. while (!cts.IsCancellationRequested)
  25. {
  26. timeoutDictionary.ToList().ForEach(x =>
  27. {
  28. var (key, datetime) = x;
  29. if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
  30. {
  31. timeoutDictionary.TryRemove(key, out _);
  32. }
  33. });
  34. await Task.Delay(cleanInterval);
  35. }
  36. }, cts.Token);
  37. }
  38. /// <inheritdoc/>
  39. public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
  40. {
  41. // TODO: 添加SplitPackages缓存超时,达到阈值时移除该项缓存
  42. body = null;
  43. var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
  44. if (!CheckTimeout(timeoutKey)) return false;
  45. timeoutDictionary.TryAdd(timeoutKey, DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond));
  46. if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
  47. {
  48. packages.Add((header.PackageIndex, data));
  49. if (packages.Count != header.PackgeCount)
  50. {
  51. return false;
  52. }
  53. item.TryRemove(header.MsgId, out _);
  54. splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);
  55. var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
  56. byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
  57. try
  58. {
  59. var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
  60. if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
  61. {
  62. body = instance.DeserializeExt<JT808Bodies>(ref reader, config);
  63. return true;
  64. }
  65. }
  66. finally
  67. {
  68. JT808ArrayPool.Return(buffer);
  69. }
  70. }
  71. else
  72. {
  73. splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
  74. {
  75. value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
  76. {
  77. item.Add((header.PackageIndex, data));
  78. return item;
  79. });
  80. return value;
  81. });
  82. }
  83. return false;
  84. }
  85. private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;
  86. private const string keyJoiner = "-";
  87. private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoiner, new[] { phoneNumber, messageId.ToString() });
  88. private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
  89. {
  90. phoneNumber = null;
  91. messageId = 0;
  92. var tmp = key.Split(keyJoiner);
  93. if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
  94. {
  95. phoneNumber = tmp[0];
  96. return true;
  97. }
  98. return false;
  99. }
  100. protected virtual void Dispose(bool disposing)
  101. {
  102. if (!disposed)
  103. {
  104. if (disposing)
  105. {
  106. cts.Cancel();
  107. cts.Dispose();
  108. }
  109. disposed = true;
  110. }
  111. }
  112. public void Dispose()
  113. {
  114. Dispose(true);
  115. GC.SuppressFinalize(this);
  116. }
  117. ~DefaultMerger()
  118. {
  119. Dispose(false);
  120. }
  121. }
  122. }