Vous ne pouvez pas sélectionner plus de 25 sujets Les noms de sujets doivent commencer par une lettre ou un nombre, peuvent contenir des tirets ('-') et peuvent comporter jusqu'à 35 caractères.
 
 
 

319 lignes
14 KiB

  1. using System;
  2. using System.Buffers;
  3. using System.IO.Pipelines;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using JT808.Gateway.Abstractions;
  9. using JT808.Gateway.Abstractions.Configurations;
  10. using JT808.Gateway.Services;
  11. using JT808.Gateway.Session;
  12. using JT808.Protocol;
  13. using JT808.Protocol.Exceptions;
  14. using JT808.Protocol.Extensions;
  15. using Microsoft.Extensions.Hosting;
  16. using Microsoft.Extensions.Logging;
  17. using Microsoft.Extensions.Options;
  18. namespace JT808.Gateway
  19. {
  20. /// <summary>
  21. /// 808 tcp服务器
  22. /// </summary>
  23. public class JT808TcpServer : IHostedService
  24. {
  25. private Socket server;
  26. private readonly ILogger Logger;
  27. private readonly JT808SessionManager SessionManager;
  28. private readonly JT808BlacklistManager BlacklistManager;
  29. private readonly JT808Serializer Serializer;
  30. private readonly JT808MessageHandler MessageHandler;
  31. private readonly IJT808MsgProducer MsgProducer;
  32. private readonly IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;
  33. private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;
  34. private long MessageReceiveCounter = 0;
  35. /// <summary>
  36. /// 初始化服务注册
  37. /// </summary>
  38. /// <param name="configurationMonitor"></param>
  39. /// <param name="msgProducer"></param>
  40. /// <param name="msgReplyLoggingProducer"></param>
  41. /// <param name="messageHandler"></param>
  42. /// <param name="jT808Config"></param>
  43. /// <param name="loggerFactory"></param>
  44. /// <param name="jT808SessionManager"></param>
  45. /// <param name="jT808BlacklistManager"></param>
  46. public JT808TcpServer(
  47. IOptionsMonitor<JT808Configuration> configurationMonitor,
  48. IJT808MsgProducer msgProducer,
  49. IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
  50. JT808MessageHandler messageHandler,
  51. IJT808Config jT808Config,
  52. ILoggerFactory loggerFactory,
  53. JT808SessionManager jT808SessionManager,
  54. JT808BlacklistManager jT808BlacklistManager)
  55. {
  56. MessageHandler = messageHandler;
  57. MsgProducer = msgProducer;
  58. MsgReplyLoggingProducer = msgReplyLoggingProducer;
  59. ConfigurationMonitor = configurationMonitor;
  60. SessionManager = jT808SessionManager;
  61. BlacklistManager = jT808BlacklistManager;
  62. Logger = loggerFactory.CreateLogger<JT808TcpServer>();
  63. Serializer = jT808Config.GetSerializer();
  64. InitServer();
  65. }
  66. private void InitServer()
  67. {
  68. var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, ConfigurationMonitor.CurrentValue.TcpPort);
  69. server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  70. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
  71. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  72. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  73. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  74. server.LingerState = new LingerOption(true, 0);
  75. server.Bind(IPEndPoint);
  76. server.Listen(ConfigurationMonitor.CurrentValue.SoBacklog);
  77. }
  78. /// <summary>
  79. ///
  80. /// </summary>
  81. /// <param name="cancellationToken"></param>
  82. /// <returns></returns>
  83. public Task StartAsync(CancellationToken cancellationToken)
  84. {
  85. Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
  86. Task.Run(async () =>
  87. {
  88. while (!cancellationToken.IsCancellationRequested)
  89. {
  90. try
  91. {
  92. var socket = await server.AcceptAsync(cancellationToken);
  93. JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
  94. SessionManager.TryAdd(jT808TcpSession);
  95. await Task.Factory.StartNew(async (state) =>
  96. {
  97. var session = (JT808TcpSession)state;
  98. if (Logger.IsEnabled(LogLevel.Information))
  99. {
  100. Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
  101. }
  102. var pipe = new Pipe();
  103. Task writing = FillPipeAsync(session, pipe.Writer);
  104. Task reading = ReadPipeAsync(session, pipe.Reader);
  105. await Task.WhenAll(reading, writing);
  106. SessionManager.RemoveBySessionId(session.SessionID);
  107. }, jT808TcpSession);
  108. }
  109. catch (OperationCanceledException)
  110. {
  111. }
  112. catch (Exception)
  113. {
  114. }
  115. }
  116. });
  117. return Task.CompletedTask;
  118. }
  119. private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)
  120. {
  121. while (true)
  122. {
  123. try
  124. {
  125. Memory<byte> memory = writer.GetMemory(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  126. //设备多久没发数据就断开连接 Receive Timeout.
  127. int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token);
  128. if (bytesRead == 0)
  129. {
  130. break;
  131. }
  132. writer.Advance(bytesRead);
  133. FlushResult result = await writer.FlushAsync(session.ReceiveTimeout.Token);
  134. if (result.IsCompleted)
  135. {
  136. break;
  137. }
  138. }
  139. catch (OperationCanceledException)
  140. {
  141. Logger.LogError($"[Receive Timeout Or Operation Canceled]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  142. break;
  143. }
  144. catch (System.Net.Sockets.SocketException ex)
  145. {
  146. Logger.LogError($"[{ex.SocketErrorCode},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  147. break;
  148. }
  149. #pragma warning disable CA1031 // Do not catch general exception types
  150. catch (Exception ex)
  151. {
  152. Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  153. break;
  154. }
  155. #pragma warning restore CA1031 // Do not catch general exception types
  156. }
  157. writer.Complete();
  158. }
  159. private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader)
  160. {
  161. while (true)
  162. {
  163. ReadResult result = await reader.ReadAsync();
  164. if (result.IsCompleted)
  165. {
  166. break;
  167. }
  168. ReadOnlySequence<byte> buffer = result.Buffer;
  169. SequencePosition consumed = buffer.Start;
  170. SequencePosition examined = buffer.End;
  171. try
  172. {
  173. if (result.IsCanceled) break;
  174. if (buffer.Length > 0)
  175. {
  176. ReaderBuffer(ref buffer, session, out consumed);
  177. }
  178. }
  179. catch (Exception ex)
  180. {
  181. Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  182. break;
  183. }
  184. finally
  185. {
  186. reader.AdvanceTo(consumed, examined);
  187. }
  188. }
  189. reader.Complete();
  190. }
  191. private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed)
  192. {
  193. SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
  194. if (seqReader.TryPeek(out byte beginMark))
  195. {
  196. if (beginMark != JT808Package.BeginFlag) throw new ArgumentException("Not JT808 Packages.");
  197. }
  198. byte mark = 0;
  199. long totalConsumed = 0;
  200. while (!seqReader.End)
  201. {
  202. if (seqReader.IsNext(JT808Package.BeginFlag, advancePast: true))
  203. {
  204. if (mark == 1)
  205. {
  206. byte[] data = null;
  207. try
  208. {
  209. data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
  210. //过滤掉不是808标准包(14)
  211. //(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1
  212. if (data != null && data.Length > 14)
  213. {
  214. var package = Serializer.HeaderDeserialize(data);
  215. if (BlacklistManager.Contains(package.Header.TerminalPhoneNo))
  216. {
  217. if (Logger.IsEnabled(LogLevel.Warning))
  218. Logger.LogWarning($"[Blacklist {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
  219. session.ReceiveTimeout.Cancel();
  220. break;
  221. }
  222. # if DEBUG
  223. Interlocked.Increment(ref MessageReceiveCounter);
  224. if (Logger.IsEnabled(LogLevel.Trace))
  225. Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()},Counter:{MessageReceiveCounter}");
  226. #else
  227. if (Logger.IsEnabled(LogLevel.Trace))
  228. Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
  229. #endif
  230. SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
  231. Processor(session, package);
  232. }
  233. }
  234. catch (NotImplementedException ex)
  235. {
  236. Logger.LogError(ex.Message, $"[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  237. }
  238. catch (JT808Exception ex)
  239. {
  240. Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  241. }
  242. totalConsumed += seqReader.Consumed - totalConsumed;
  243. if (seqReader.End) break;
  244. seqReader.Advance(1);
  245. mark = 0;
  246. }
  247. mark++;
  248. }
  249. else
  250. {
  251. seqReader.Advance(1);
  252. }
  253. }
  254. if (seqReader.Length == totalConsumed)
  255. {
  256. consumed = buffer.End;
  257. }
  258. else
  259. {
  260. consumed = buffer.GetPosition(totalConsumed);
  261. }
  262. }
  263. private void Processor(in IJT808Session session, in JT808HeaderPackage package)
  264. {
  265. try
  266. {
  267. MsgProducer?.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData);
  268. var downData = MessageHandler.Processor(in package);
  269. if (ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply != null && ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
  270. {
  271. if (!ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Contains(package.Header.MsgId))
  272. {
  273. session.SendAsync(downData);
  274. }
  275. }
  276. else
  277. {
  278. session.SendAsync(downData);
  279. }
  280. if (MsgReplyLoggingProducer != null)
  281. {
  282. if (downData != null)
  283. MsgReplyLoggingProducer.ProduceAsync(package.Header.TerminalPhoneNo, downData);
  284. }
  285. }
  286. catch (Exception ex)
  287. {
  288. Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  289. }
  290. }
  291. /// <summary>
  292. ///
  293. /// </summary>
  294. /// <param name="cancellationToken"></param>
  295. /// <returns></returns>
  296. public Task StopAsync(CancellationToken cancellationToken)
  297. {
  298. Logger.LogInformation("JT808 Tcp Server Stop");
  299. if (server?.Connected ?? false)
  300. server.Shutdown(SocketShutdown.Both);
  301. server?.Close();
  302. return Task.CompletedTask;
  303. }
  304. }
  305. }