Du kannst nicht mehr als 25 Themen auswählen Themen müssen entweder mit einem Buchstaben oder einer Ziffer beginnen. Sie können Bindestriche („-“) enthalten und bis zu 35 Zeichen lang sein.

134 Zeilen
5.4 KiB

  1. using System.Collections.Concurrent;
  2. using JT808.Protocol.Interfaces;
  3. using JT808.Protocol.MessagePack;
  4. namespace JT808.Protocol.Internal
  5. {
  6. /// <summary>
  7. /// 默认分包合并实现
  8. /// </summary>
  9. public class DefaultMerger : IMerger, IDisposable
  10. {
  11. /// <summary>
  12. /// 分包数据缓存
  13. /// <para>key为sim卡号,value为字典,key为消息id,value为元组,结构为:(分包索引,分包元数据)</para>
  14. /// </summary>
  15. private readonly ConcurrentDictionary<string, ConcurrentDictionary<ushort, List<(ushort index, byte[] data)>>> splitPackageDictionary = new();
  16. private readonly ConcurrentDictionary<string, DateTime> timeoutDictionary = new ConcurrentDictionary<string, DateTime>();
  17. private readonly TimeSpan cleanInterval = TimeSpan.FromSeconds(60);
  18. private readonly CancellationTokenSource cts = new CancellationTokenSource();
  19. private bool disposed;
  20. public DefaultMerger()
  21. {
  22. Task.Run(async () =>
  23. {
  24. while (!cts.IsCancellationRequested)
  25. {
  26. timeoutDictionary.ToList().ForEach(x =>
  27. {
  28. var key = x.Key;
  29. var datetime = x.Value;
  30. if (datetime < DateTime.Now && TryParseKey(key, out var phoneNumber, out var messageId) && splitPackageDictionary.TryGetValue(phoneNumber, out var value) && value.TryRemove(messageId, out var caches) && value.Count == 0 && splitPackageDictionary.TryRemove(phoneNumber, out _))
  31. {
  32. timeoutDictionary.TryRemove(key, out _);
  33. }
  34. });
  35. await Task.Delay(cleanInterval);
  36. }
  37. }, cts.Token);
  38. }
  39. /// <inheritdoc/>
  40. public bool TryMerge(JT808Header header, byte[] data, IJT808Config config, out JT808Bodies body)
  41. {
  42. body = null;
  43. var timeoutKey = GenerateKey(header.TerminalPhoneNo, header.MsgId);
  44. if (!CheckTimeout(timeoutKey)) return false;
  45. timeoutDictionary.TryAdd(timeoutKey, DateTime.Now.AddSeconds(config.AutoMergeTimeoutSecond));
  46. if (splitPackageDictionary.TryGetValue(header.TerminalPhoneNo, out var item) && item.TryGetValue(header.MsgId, out var packages))
  47. {
  48. packages.Add((header.PackageIndex, data));
  49. if (packages.Count != header.PackgeCount)
  50. {
  51. return false;
  52. }
  53. item.TryRemove(header.MsgId, out _);
  54. splitPackageDictionary.TryRemove(header.TerminalPhoneNo, out _);
  55. var mateData = packages.OrderBy(x => x.index).SelectMany(x => x.data).Concat(data).ToArray();
  56. byte[] buffer = JT808ArrayPool.Rent(mateData.Length);
  57. try
  58. {
  59. var reader = new JT808MessagePackReader(mateData, (Enums.JT808Version)header.ProtocolVersion);
  60. if (config.MsgIdFactory.TryGetValue(header.MsgId, out var value) && value is JT808Bodies instance)
  61. {
  62. body = instance.DeserializeExt<JT808Bodies>(ref reader, config);
  63. header.MessageBodyProperty.IsMerged = true;
  64. return true;
  65. }
  66. }
  67. finally
  68. {
  69. JT808ArrayPool.Return(buffer);
  70. }
  71. }
  72. else
  73. {
  74. splitPackageDictionary.AddOrUpdate(header.TerminalPhoneNo, new ConcurrentDictionary<ushort, List<(ushort, byte[])>>() { [header.MsgId] = new List<(ushort, byte[])> { (header.PackageIndex, data) } }, (_, value) =>
  75. {
  76. value.AddOrUpdate(header.MsgId, new List<(ushort, byte[])> { (header.PackageIndex, data) }, (_, item) =>
  77. {
  78. item.Add((header.PackageIndex, data));
  79. return item;
  80. });
  81. return value;
  82. });
  83. }
  84. return false;
  85. }
  86. private bool CheckTimeout(string key) => !timeoutDictionary.TryGetValue(key, out var dateTime) || dateTime >= DateTime.Now;
  87. private const char keyJoiner = '-';
  88. private const string keyJoinerNET7 = "-";
  89. private string GenerateKey(string phoneNumber, ushort messageId) => string.Join(keyJoinerNET7, new[] { phoneNumber, messageId.ToString() });
  90. private bool TryParseKey(string key, out string phoneNumber, out ushort messageId)
  91. {
  92. phoneNumber = null;
  93. messageId = 0;
  94. var tmp = key.Split(keyJoiner);
  95. if (tmp.Length == 2 && ushort.TryParse(tmp[1], out messageId))
  96. {
  97. phoneNumber = tmp[0];
  98. return true;
  99. }
  100. return false;
  101. }
  102. protected virtual void Dispose(bool disposing)
  103. {
  104. if (!disposed)
  105. {
  106. if (disposing)
  107. {
  108. cts.Cancel();
  109. cts.Dispose();
  110. }
  111. disposed = true;
  112. }
  113. }
  114. public void Dispose()
  115. {
  116. Dispose(true);
  117. GC.SuppressFinalize(this);
  118. }
  119. ~DefaultMerger()
  120. {
  121. Dispose(false);
  122. }
  123. }
  124. }