You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

307 lines
12 KiB

  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Collections.Generic;
  4. using System.Linq;
  5. using System.Net;
  6. using System.Net.Sockets;
  7. using System.Threading.Tasks;
  8. using JT808.Gateway.Abstractions;
  9. using JT808.Gateway.Abstractions.Enums;
  10. using Microsoft.Extensions.Logging;
  11. namespace JT808.Gateway.Session
  12. {
  13. /// <summary>
  14. ///
  15. /// <remark>不支持变态类型:既发TCP和UDP</remark>
  16. /// </summary>
  17. public class JT808SessionManager
  18. {
  19. private readonly ILogger logger;
  20. private readonly IJT808SessionProducer SessionProducer;
  21. /// <summary>
  22. /// socket连接会话
  23. /// </summary>
  24. public ConcurrentDictionary<string, IJT808Session> Sessions { get; }
  25. /// <summary>
  26. /// socket绑定的终端SIM连接会话
  27. /// </summary>
  28. public ConcurrentDictionary<string, IJT808Session> TerminalPhoneNoSessions { get; }
  29. /// <summary>
  30. ///
  31. /// </summary>
  32. /// <param name="jT808SessionProducer"></param>
  33. /// <param name="loggerFactory"></param>
  34. public JT808SessionManager(
  35. IJT808SessionProducer jT808SessionProducer,
  36. ILoggerFactory loggerFactory
  37. )
  38. {
  39. SessionProducer = jT808SessionProducer;
  40. Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
  41. TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
  42. logger = loggerFactory.CreateLogger<JT808SessionManager>();
  43. }
  44. /// <summary>
  45. ///
  46. /// </summary>
  47. /// <param name="loggerFactory"></param>
  48. public JT808SessionManager(ILoggerFactory loggerFactory)
  49. {
  50. Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
  51. TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
  52. logger = loggerFactory.CreateLogger<JT808SessionManager>();
  53. }
  54. /// <summary>
  55. /// 获取会话总数量
  56. /// </summary>
  57. public int TotalSessionCount
  58. {
  59. get
  60. {
  61. return Sessions.Count;
  62. }
  63. }
  64. /// <summary>
  65. /// 获取tcp会话数量
  66. /// </summary>
  67. public int TcpSessionCount
  68. {
  69. get
  70. {
  71. return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Count();
  72. }
  73. }
  74. /// <summary>
  75. /// 获取udp会话数量
  76. /// </summary>
  77. public int UdpSessionCount
  78. {
  79. get
  80. {
  81. return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Count();
  82. }
  83. }
  84. /// <summary>
  85. ///
  86. /// </summary>
  87. /// <param name="terminalPhoneNo"></param>
  88. /// <param name="session"></param>
  89. internal void TryLink(string terminalPhoneNo, IJT808Session session)
  90. {
  91. session.TerminalPhoneNo = terminalPhoneNo;
  92. DateTime curretDatetime = DateTime.Now;
  93. if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT808Session cacheSession))
  94. {
  95. if (session.SessionID != cacheSession.SessionID)
  96. {
  97. //从转发到直连的数据需要更新缓存
  98. session.ActiveTime = curretDatetime;
  99. TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession);
  100. cacheSession.Close();
  101. //会话通知
  102. if (SessionProducer != null)
  103. {
  104. SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
  105. }
  106. }
  107. else
  108. {
  109. cacheSession.ActiveTime = curretDatetime;
  110. TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, cacheSession, cacheSession);
  111. }
  112. }
  113. else
  114. {
  115. if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
  116. {
  117. //会话通知
  118. if (SessionProducer != null)
  119. {
  120. SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
  121. }
  122. }
  123. }
  124. }
  125. /// <summary>
  126. ///
  127. /// </summary>
  128. /// <param name="terminalPhoneNo"></param>
  129. /// <param name="socket"></param>
  130. /// <param name="remoteEndPoint"></param>
  131. /// <returns></returns>
  132. public IJT808Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint)
  133. {
  134. if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT808Session currentSession))
  135. {
  136. currentSession.ActiveTime = DateTime.Now;
  137. currentSession.TerminalPhoneNo = terminalPhoneNo;
  138. currentSession.RemoteEndPoint = remoteEndPoint;
  139. TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, currentSession, currentSession);
  140. }
  141. else
  142. {
  143. JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint);
  144. session.TerminalPhoneNo = terminalPhoneNo;
  145. Sessions.TryAdd(session.SessionID, session);
  146. TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session);
  147. currentSession = session;
  148. }
  149. //会话通知
  150. //使用场景:
  151. //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
  152. //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
  153. //有设备关联上来可以进行通知 例如:使用Redis发布订阅
  154. if (SessionProducer != null)
  155. {
  156. SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
  157. }
  158. return currentSession;
  159. }
  160. /// <summary>
  161. ///
  162. /// </summary>
  163. /// <param name="session"></param>
  164. /// <returns></returns>
  165. internal bool TryAdd(IJT808Session session)
  166. {
  167. return Sessions.TryAdd(session.SessionID, session);
  168. }
  169. public async ValueTask<bool> TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data)
  170. {
  171. if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var session))
  172. {
  173. if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
  174. {
  175. await session.Client.SendAsync(data, SocketFlags.None);
  176. }
  177. else
  178. {
  179. await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint);
  180. }
  181. return true;
  182. }
  183. else
  184. {
  185. return false;
  186. }
  187. }
  188. public async ValueTask<bool> TrySendBySessionIdAsync(string sessionId, byte[] data)
  189. {
  190. if (Sessions.TryGetValue(sessionId, out var session))
  191. {
  192. if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
  193. {
  194. await session.Client.SendAsync(data, SocketFlags.None);
  195. }
  196. else
  197. {
  198. await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint);
  199. }
  200. return true;
  201. }
  202. else
  203. {
  204. return false;
  205. }
  206. }
  207. public void RemoveByTerminalPhoneNo(string terminalPhoneNo)
  208. {
  209. if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeTerminalPhoneNoSessions))
  210. {
  211. // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据
  212. //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号
  213. var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == removeTerminalPhoneNoSessions.SessionID).Select(s => s.Key).ToList();
  214. //2.存在则一个个移除
  215. string tmpTerminalPhoneNo = terminalPhoneNo;
  216. if (terminalPhoneNos.Count > 0)
  217. {
  218. //3.移除包括当前的设备号
  219. foreach (var item in terminalPhoneNos)
  220. {
  221. TerminalPhoneNoSessions.TryRemove(item, out _);
  222. }
  223. tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
  224. }
  225. if (Sessions.TryRemove(removeTerminalPhoneNoSessions.SessionID, out var removeSession))
  226. {
  227. removeSession.Close();
  228. if (logger.IsEnabled(LogLevel.Information))
  229. logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
  230. if (SessionProducer != null)
  231. {
  232. SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
  233. }
  234. }
  235. }
  236. }
  237. public void RemoveBySessionId(string sessionId)
  238. {
  239. if (Sessions.TryRemove(sessionId, out var removeSession))
  240. {
  241. var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == sessionId).Select(s => s.Key).ToList();
  242. if (terminalPhoneNos.Count > 0)
  243. {
  244. foreach (var item in terminalPhoneNos)
  245. {
  246. TerminalPhoneNoSessions.TryRemove(item, out _);
  247. }
  248. var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
  249. if (SessionProducer != null)
  250. {
  251. SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
  252. }
  253. if (logger.IsEnabled(LogLevel.Information))
  254. logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}");
  255. }
  256. removeSession.Close();
  257. }
  258. }
  259. public List<JT808TcpSession> GetTcpAll(Func<IJT808Session, bool> predicate = null)
  260. {
  261. var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp);
  262. if (predicate != null)
  263. {
  264. query = query.Where(s => predicate(s.Value));
  265. }
  266. return query.Select(s => (JT808TcpSession)s.Value).ToList();
  267. }
  268. public IEnumerable<JT808TcpSession> GetTcpByPage(Func<IJT808Session, bool> predicate = null)
  269. {
  270. var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp);
  271. if (predicate != null)
  272. {
  273. query = query.Where(s => predicate(s.Value));
  274. }
  275. return query.Select(s => (JT808TcpSession)s.Value);
  276. }
  277. public List<JT808UdpSession> GetUdpAll(Func<IJT808Session, bool> predicate = null)
  278. {
  279. var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp);
  280. if (predicate != null)
  281. {
  282. query = query.Where(s => predicate(s.Value));
  283. }
  284. return query.Select(s => (JT808UdpSession)s.Value).ToList();
  285. }
  286. public IEnumerable<JT808UdpSession> GetUdpByPage(Func<IJT808Session, bool> predicate = null)
  287. {
  288. var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp);
  289. if (predicate != null)
  290. {
  291. query = query.Where(s => predicate(s.Value));
  292. }
  293. return query.Select(s => (JT808UdpSession)s.Value);
  294. }
  295. }
  296. }