您最多选择25个主题 主题必须以字母或数字开头,可以包含连字符 (-),并且长度不得超过35个字符
 
 

221 行
7.2 KiB

  1. using Microsoft.Extensions.Logging;
  2. using Microsoft.Extensions.Logging.Abstractions;
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Linq;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. namespace JT809Netty.Core
  9. {
  10. public class SessionManager:IDisposable
  11. {
  12. private readonly ILogger<SessionManager> logger;
  13. private readonly CancellationTokenSource cancellationTokenSource;
  14. #if DEBUG
  15. private const int timeout = 1 * 1000 * 60;
  16. #else
  17. private const int timeout = 5 * 1000 * 60;
  18. #endif
  19. public SessionManager(ILoggerFactory loggerFactory)
  20. {
  21. logger = loggerFactory.CreateLogger<SessionManager>();
  22. cancellationTokenSource = new CancellationTokenSource();
  23. Task.Run(() =>
  24. {
  25. while (!cancellationTokenSource.IsCancellationRequested)
  26. {
  27. logger.LogInformation($"Online Count>>>{SessionCount}");
  28. if (SessionCount > 0)
  29. {
  30. logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}");
  31. logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", CustomKey_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}");
  32. }
  33. Thread.Sleep(timeout);
  34. }
  35. }, cancellationTokenSource.Token);
  36. }
  37. /// <summary>
  38. /// Netty生成的sessionID和Session的对应关系
  39. /// key = seession id
  40. /// value = Session
  41. /// </summary>
  42. private ConcurrentDictionary<string, JT809Session> SessionIdDict = new ConcurrentDictionary<string, JT809Session>(StringComparer.OrdinalIgnoreCase);
  43. /// <summary>
  44. /// 自定义Key和netty生成的sessionID的对应关系
  45. /// key = 终端手机号
  46. /// value = seession id
  47. /// </summary>
  48. private ConcurrentDictionary<string, string> CustomKey_SessionId_Dict = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);
  49. public int SessionCount
  50. {
  51. get
  52. {
  53. return SessionIdDict.Count;
  54. }
  55. }
  56. public void RegisterSession(JT809Session appSession)
  57. {
  58. if (CustomKey_SessionId_Dict.ContainsKey(appSession.Key))
  59. {
  60. return;
  61. }
  62. if (SessionIdDict.TryAdd(appSession.SessionID, appSession) &&
  63. CustomKey_SessionId_Dict.TryAdd(appSession.Key, appSession.SessionID))
  64. {
  65. return;
  66. }
  67. }
  68. public JT809Session GetSessionByID(string sessionID)
  69. {
  70. if (string.IsNullOrEmpty(sessionID))
  71. return default;
  72. JT809Session targetSession;
  73. SessionIdDict.TryGetValue(sessionID, out targetSession);
  74. return targetSession;
  75. }
  76. public JT809Session GetSessionByTerminalPhoneNo(string key)
  77. {
  78. try
  79. {
  80. if (string.IsNullOrEmpty(key))
  81. return default;
  82. if (CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId))
  83. {
  84. if (SessionIdDict.TryGetValue(sessionId, out JT809Session targetSession))
  85. {
  86. return targetSession;
  87. }
  88. else
  89. {
  90. return default;
  91. }
  92. }
  93. else
  94. {
  95. return default;
  96. }
  97. }
  98. catch (Exception ex)
  99. {
  100. logger.LogError(ex, key);
  101. return default;
  102. }
  103. }
  104. public void Heartbeat(string key)
  105. {
  106. try
  107. {
  108. if(CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId))
  109. {
  110. if (SessionIdDict.TryGetValue(sessionId, out JT809Session oldjT808Session))
  111. {
  112. if (oldjT808Session.Channel.Active)
  113. {
  114. oldjT808Session.LastActiveTime = DateTime.Now;
  115. if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session))
  116. {
  117. }
  118. }
  119. }
  120. }
  121. }
  122. catch (Exception ex)
  123. {
  124. logger.LogError(ex, key);
  125. }
  126. }
  127. /// <summary>
  128. /// 通过通道Id和自定义key进行关联
  129. /// </summary>
  130. /// <param name="sessionID"></param>
  131. /// <param name="key"></param>
  132. public void UpdateSessionByID(string sessionID, string key)
  133. {
  134. try
  135. {
  136. if (SessionIdDict.TryGetValue(sessionID, out JT809Session oldjT808Session))
  137. {
  138. oldjT808Session.Key = key;
  139. if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session))
  140. {
  141. CustomKey_SessionId_Dict.AddOrUpdate(key, sessionID, (tpn, sid) =>
  142. {
  143. return sessionID;
  144. });
  145. }
  146. }
  147. }
  148. catch (Exception ex)
  149. {
  150. logger.LogError(ex, $"{sessionID},{key}");
  151. }
  152. }
  153. public void RemoveSessionByID(string sessionID)
  154. {
  155. if (sessionID == null) return;
  156. try
  157. {
  158. if (SessionIdDict.TryRemove(sessionID, out JT809Session session))
  159. {
  160. if (session.Key != null)
  161. {
  162. if(CustomKey_SessionId_Dict.TryRemove(session.Key, out string sessionid))
  163. {
  164. logger.LogInformation($">>>{sessionID}-{session.Key} Session Remove.");
  165. }
  166. }
  167. else
  168. {
  169. logger.LogInformation($">>>{sessionID} Session Remove.");
  170. }
  171. session.Channel.CloseAsync();
  172. }
  173. }
  174. catch (Exception ex)
  175. {
  176. logger.LogError(ex, $">>>{sessionID} Session Remove Exception");
  177. }
  178. }
  179. public void RemoveSessionByKey(string key)
  180. {
  181. if (key == null) return;
  182. try
  183. {
  184. if (CustomKey_SessionId_Dict.TryRemove(key, out string sessionid))
  185. {
  186. if (SessionIdDict.TryRemove(sessionid, out JT809Session session))
  187. {
  188. logger.LogInformation($">>>{key}-{sessionid} Key Remove.");
  189. }
  190. else
  191. {
  192. logger.LogInformation($">>>{key} Key Remove.");
  193. }
  194. }
  195. }
  196. catch (Exception ex)
  197. {
  198. logger.LogError(ex, $">>>{key} Key Remove Exception.");
  199. }
  200. }
  201. public void Dispose()
  202. {
  203. cancellationTokenSource.Cancel();
  204. }
  205. }
  206. }