Nevar pievienot vairāk kā 25 tēmas Tēmai ir jāsākas ar burtu vai ciparu, tā var saturēt domu zīmes ('-') un var būt līdz 35 simboliem gara.
 
 
 

279 rindas
12 KiB

  1. using System;
  2. using System.Buffers;
  3. using System.IO.Pipelines;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using JT808.Gateway.Abstractions;
  9. using JT808.Gateway.Abstractions.Configurations;
  10. using JT808.Gateway.Session;
  11. using JT808.Protocol;
  12. using JT808.Protocol.Exceptions;
  13. using JT808.Protocol.Extensions;
  14. using Microsoft.Extensions.Hosting;
  15. using Microsoft.Extensions.Logging;
  16. using Microsoft.Extensions.Options;
  17. namespace JT808.Gateway
  18. {
  19. public class JT808TcpServer : IHostedService
  20. {
  21. private Socket server;
  22. private readonly ILogger Logger;
  23. private readonly JT808SessionManager SessionManager;
  24. private readonly JT808Serializer Serializer;
  25. private readonly JT808MessageHandler MessageHandler;
  26. private readonly IJT808MsgProducer MsgProducer;
  27. private readonly IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;
  28. private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;
  29. /// <summary>
  30. /// 使用队列方式
  31. /// </summary>
  32. /// <param name="configurationMonitor"></param>
  33. /// <param name="msgProducer"></param>
  34. /// <param name="msgReplyLoggingProducer"></param>
  35. /// <param name="messageHandler"></param>
  36. /// <param name="jT808Config"></param>
  37. /// <param name="loggerFactory"></param>
  38. /// <param name="jT808SessionManager"></param>
  39. public JT808TcpServer(
  40. IOptionsMonitor<JT808Configuration> configurationMonitor,
  41. IJT808MsgProducer msgProducer,
  42. IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
  43. JT808MessageHandler messageHandler,
  44. IJT808Config jT808Config,
  45. ILoggerFactory loggerFactory,
  46. JT808SessionManager jT808SessionManager)
  47. {
  48. MessageHandler = messageHandler;
  49. MsgProducer = msgProducer;
  50. MsgReplyLoggingProducer = msgReplyLoggingProducer;
  51. ConfigurationMonitor = configurationMonitor;
  52. SessionManager = jT808SessionManager;
  53. Logger = loggerFactory.CreateLogger<JT808TcpServer>();
  54. Serializer = jT808Config.GetSerializer();
  55. InitServer();
  56. }
  57. private void InitServer()
  58. {
  59. var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, ConfigurationMonitor.CurrentValue.TcpPort);
  60. server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  61. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
  62. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  63. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  64. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  65. server.LingerState = new LingerOption(true, 0);
  66. server.Bind(IPEndPoint);
  67. server.Listen(ConfigurationMonitor.CurrentValue.SoBacklog);
  68. }
  69. public Task StartAsync(CancellationToken cancellationToken)
  70. {
  71. Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
  72. Task.Factory.StartNew(async () =>
  73. {
  74. while (!cancellationToken.IsCancellationRequested)
  75. {
  76. var socket = await server.AcceptAsync();
  77. JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
  78. SessionManager.TryAdd(jT808TcpSession);
  79. await Task.Factory.StartNew(async (state) =>
  80. {
  81. var session = (JT808TcpSession)state;
  82. if (Logger.IsEnabled(LogLevel.Information))
  83. {
  84. Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
  85. }
  86. var pipe = new Pipe();
  87. Task writing = FillPipeAsync(session, pipe.Writer);
  88. Task reading = ReadPipeAsync(session, pipe.Reader);
  89. await Task.WhenAll(reading, writing);
  90. SessionManager.RemoveBySessionId(session.SessionID);
  91. }, jT808TcpSession);
  92. }
  93. }, cancellationToken);
  94. return Task.CompletedTask;
  95. }
  96. private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)
  97. {
  98. while (true)
  99. {
  100. try
  101. {
  102. Memory<byte> memory = writer.GetMemory(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  103. //设备多久没发数据就断开连接 Receive Timeout.
  104. int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token);
  105. if (bytesRead == 0)
  106. {
  107. break;
  108. }
  109. writer.Advance(bytesRead);
  110. }
  111. catch (OperationCanceledException ex)
  112. {
  113. Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  114. break;
  115. }
  116. catch (System.Net.Sockets.SocketException ex)
  117. {
  118. Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  119. break;
  120. }
  121. #pragma warning disable CA1031 // Do not catch general exception types
  122. catch (Exception ex)
  123. {
  124. Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  125. break;
  126. }
  127. #pragma warning restore CA1031 // Do not catch general exception types
  128. FlushResult result = await writer.FlushAsync();
  129. if (result.IsCompleted)
  130. {
  131. break;
  132. }
  133. }
  134. writer.Complete();
  135. }
  136. private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader)
  137. {
  138. while (true)
  139. {
  140. ReadResult result = await reader.ReadAsync();
  141. if (result.IsCompleted)
  142. {
  143. break;
  144. }
  145. ReadOnlySequence<byte> buffer = result.Buffer;
  146. SequencePosition consumed = buffer.Start;
  147. SequencePosition examined = buffer.End;
  148. try
  149. {
  150. if (result.IsCanceled) break;
  151. if (buffer.Length > 0)
  152. {
  153. ReaderBuffer(ref buffer, session, out consumed, out examined);
  154. }
  155. }
  156. #pragma warning disable CA1031 // Do not catch general exception types
  157. catch (Exception ex)
  158. {
  159. Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  160. break;
  161. }
  162. #pragma warning restore CA1031 // Do not catch general exception types
  163. finally
  164. {
  165. reader.AdvanceTo(consumed, examined);
  166. }
  167. }
  168. reader.Complete();
  169. }
  170. private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined)
  171. {
  172. consumed = buffer.Start;
  173. examined = buffer.End;
  174. SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
  175. if (seqReader.TryPeek(out byte beginMark))
  176. {
  177. if (beginMark != JT808Package.BeginFlag) throw new ArgumentException("Not JT808 Packages.");
  178. }
  179. byte mark = 0;
  180. long totalConsumed = 0;
  181. while (!seqReader.End)
  182. {
  183. if (seqReader.IsNext(JT808Package.BeginFlag, advancePast: true))
  184. {
  185. if (mark == 1)
  186. {
  187. ReadOnlySpan<byte> contentSpan = ReadOnlySpan<byte>.Empty;
  188. try
  189. {
  190. contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan;
  191. //过滤掉不是808标准包(14)
  192. //(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1
  193. if (contentSpan.Length > 14)
  194. {
  195. var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240);
  196. if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
  197. SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
  198. Processor(session, package);
  199. }
  200. }
  201. catch (NotImplementedException ex)
  202. {
  203. Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  204. }
  205. catch (JT808Exception ex)
  206. {
  207. Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  208. }
  209. totalConsumed += (seqReader.Consumed - totalConsumed);
  210. if (seqReader.End) break;
  211. seqReader.Advance(1);
  212. mark = 0;
  213. }
  214. mark++;
  215. }
  216. else
  217. {
  218. seqReader.Advance(1);
  219. }
  220. }
  221. if (seqReader.Length == totalConsumed)
  222. {
  223. examined = consumed = buffer.End;
  224. }
  225. else
  226. {
  227. consumed = buffer.GetPosition(totalConsumed);
  228. }
  229. }
  230. private void Processor(in IJT808Session session,in JT808HeaderPackage package)
  231. {
  232. try
  233. {
  234. MsgProducer?.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData);
  235. var downData = MessageHandler.Processor(in package);
  236. if (ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply != null && ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
  237. {
  238. if (!ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Contains(package.Header.MsgId))
  239. {
  240. session.SendAsync(downData);
  241. }
  242. }
  243. else
  244. {
  245. session.SendAsync(downData);
  246. }
  247. if (MsgReplyLoggingProducer != null)
  248. {
  249. if (downData != null)
  250. MsgReplyLoggingProducer.ProduceAsync(package.Header.TerminalPhoneNo, downData);
  251. }
  252. }
  253. catch (Exception ex)
  254. {
  255. Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  256. }
  257. }
  258. public Task StopAsync(CancellationToken cancellationToken)
  259. {
  260. Logger.LogInformation("JT808 Tcp Server Stop");
  261. if (server?.Connected ?? false)
  262. server.Shutdown(SocketShutdown.Both);
  263. server?.Close();
  264. return Task.CompletedTask;
  265. }
  266. }
  267. }