You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

136 line
5.5 KiB

  1. using System.Collections.Concurrent;
  2. using JT808.Protocol.Interfaces;
  3. using JT808.Protocol.MessagePack;
  4. namespace JT808.Protocol.Internal
  5. {
  6. /// <summary>
  7. /// 默认分包合并实现
  8. /// </summary>
  9. public class DefaultMerger : IMerger, IDisposable
  10. {
  11. /// <summary>
  12. /// 分包数据缓存
  13. /// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>
  14. /// </summary>
  15. private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> splitPackageDictionary = new();
  16. private readonly ConcurrentDictionary<string, DateTime> timeoutDictionary = new ConcurrentDictionary<string, DateTime>();
  17. private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
  18. private readonly CancellationTokenSource cts = new CancellationTokenSource();
  19. private bool disposed;
  20. public DefaultMerger()
  21. {
  22. Task.Run(async () =>
  23. {
  24. while (!cts.IsCancellationRequested)
  25. {
  26. timeoutDictionary.ToList().ForEach(x =>
  27. {
  28. var key = x.Key;
  29. var datetime = x.Value;
  30. if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
  31. {
  32. timeoutDictionary.TryRemove(key, out _);
  33. }
  34. });
  35. await Task.Delay(cleanInterval);
  36. }
  37. }, cts.Token);
  38. }
  39. /// <inheritdoc/>
  40. public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
  41. {
  42. body = null;
  43. var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
  44. if (!CheckTimeout(timeoutKey)) return false;
  45. var timeout = DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond);
  46. if (timeoutDictionary.TryAdd(timeoutKey, timeout))
  47. timeoutDictionary.TryUpdate(timeoutKey, timeout, timeout);
  48. if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
  49. {
  50. packages.Add((header.PackageIndex, data));
  51. if (packages.Count != header.PackgeCount)
  52. {
  53. return false;
  54. }
  55. item.TryRemove(header.MsgId, out _);
  56. splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);
  57. var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
  58. byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
  59. try
  60. {
  61. var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
  62. if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
  63. {
  64. body = instance.DeserializeExt<JT808Bodies>(ref reader, config);
  65. header.MessageBodyProperty.IsMerged = true;
  66. return true;
  67. }
  68. }
  69. finally
  70. {
  71. JT808ArrayPool.Return(buffer);
  72. }
  73. }
  74. else
  75. {
  76. splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
  77. {
  78. value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
  79. {
  80. item.Add((header.PackageIndex, data));
  81. return item;
  82. });
  83. return value;
  84. });
  85. }
  86. return false;
  87. }
  88. private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;
  89. private const char keyJoiner = '-';
  90. private const string keyJoinerNET7 = "-";
  91. private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoinerNET7, new[] { phoneNumber, messageId.ToString() });
  92. private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
  93. {
  94. phoneNumber = null;
  95. messageId = 0;
  96. var tmp = key.Split(keyJoiner);
  97. if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
  98. {
  99. phoneNumber = tmp[0];
  100. return true;
  101. }
  102. return false;
  103. }
  104. protected virtual void Dispose(bool disposing)
  105. {
  106. if (!disposed)
  107. {
  108. if (disposing)
  109. {
  110. cts.Cancel();
  111. cts.Dispose();
  112. }
  113. disposed = true;
  114. }
  115. }
  116. public void Dispose()
  117. {
  118. Dispose(true);
  119. GC.SuppressFinalize(this);
  120. }
  121. ~DefaultMerger()
  122. {
  123. Dispose(false);
  124. }
  125. }
  126. }