No puede seleccionar más de 25 temas Los temas deben comenzar con una letra o número, pueden incluir guiones ('-') y pueden tener hasta 35 caracteres de largo.
 
 
 

319 líneas
14 KiB

  1. using System;
  2. using System.Buffers;
  3. using System.IO.Pipelines;
  4. using System.Net;
  5. using System.Net.Sockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using JT808.Gateway.Abstractions;
  9. using JT808.Gateway.Abstractions.Configurations;
  10. using JT808.Gateway.Services;
  11. using JT808.Gateway.Session;
  12. using JT808.Protocol;
  13. using JT808.Protocol.Exceptions;
  14. using JT808.Protocol.Extensions;
  15. using Microsoft.Extensions.Hosting;
  16. using Microsoft.Extensions.Logging;
  17. using Microsoft.Extensions.Options;
  18. namespace JT808.Gateway
  19. {
  20. /// <summary>
  21. /// 808 tcp服务器
  22. /// </summary>
  23. public class JT808TcpServer : IHostedService
  24. {
  25. private Socket server;
  26. private readonly ILogger Logger;
  27. private readonly JT808SessionManager SessionManager;
  28. private readonly JT808BlacklistManager BlacklistManager;
  29. private readonly JT808Serializer Serializer;
  30. private readonly JT808MessageHandler MessageHandler;
  31. private readonly IJT808MsgProducer MsgProducer;
  32. private readonly IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;
  33. private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;
  34. private long MessageReceiveCounter = 0;
  35. /// <summary>
  36. /// 初始化服务注册
  37. /// </summary>
  38. /// <param name="configurationMonitor"></param>
  39. /// <param name="msgProducer"></param>
  40. /// <param name="msgReplyLoggingProducer"></param>
  41. /// <param name="messageHandler"></param>
  42. /// <param name="jT808Config"></param>
  43. /// <param name="loggerFactory"></param>
  44. /// <param name="jT808SessionManager"></param>
  45. /// <param name="jT808BlacklistManager"></param>
  46. public JT808TcpServer(
  47. IOptionsMonitor<JT808Configuration> configurationMonitor,
  48. IJT808MsgProducer msgProducer,
  49. IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
  50. JT808MessageHandler messageHandler,
  51. IJT808Config jT808Config,
  52. ILoggerFactory loggerFactory,
  53. JT808SessionManager jT808SessionManager,
  54. JT808BlacklistManager jT808BlacklistManager)
  55. {
  56. MessageHandler = messageHandler;
  57. MsgProducer = msgProducer;
  58. MsgReplyLoggingProducer = msgReplyLoggingProducer;
  59. ConfigurationMonitor = configurationMonitor;
  60. SessionManager = jT808SessionManager;
  61. BlacklistManager = jT808BlacklistManager;
  62. Logger = loggerFactory.CreateLogger<JT808TcpServer>();
  63. Serializer = jT808Config.GetSerializer();
  64. InitServer();
  65. }
  66. private void InitServer()
  67. {
  68. var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, ConfigurationMonitor.CurrentValue.TcpPort);
  69. server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
  70. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
  71. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
  72. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  73. server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  74. server.LingerState = new LingerOption(true, 0);
  75. server.Bind(IPEndPoint);
  76. server.Listen(ConfigurationMonitor.CurrentValue.SoBacklog);
  77. }
  78. /// <summary>
  79. ///
  80. /// </summary>
  81. /// <param name="cancellationToken"></param>
  82. /// <returns></returns>
  83. public Task StartAsync(CancellationToken cancellationToken)
  84. {
  85. Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
  86. Task.Run(async () =>
  87. {
  88. while (!cancellationToken.IsCancellationRequested)
  89. {
  90. try
  91. {
  92. var socket = await server.AcceptAsync(cancellationToken);
  93. JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
  94. SessionManager.TryAdd(jT808TcpSession);
  95. await Task.Factory.StartNew(async (state) =>
  96. {
  97. var session = (JT808TcpSession)state;
  98. if (Logger.IsEnabled(LogLevel.Information))
  99. {
  100. Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
  101. }
  102. var pipe = new Pipe();
  103. Task writing = FillPipeAsync(session, pipe.Writer);
  104. Task reading = ReadPipeAsync(session, pipe.Reader);
  105. await Task.WhenAny(reading, writing);
  106. SessionManager.RemoveBySessionId(session.SessionID);
  107. }, jT808TcpSession);
  108. }
  109. catch (Exception)
  110. {
  111. break;
  112. }
  113. }
  114. });
  115. return Task.CompletedTask;
  116. }
  117. private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)
  118. {
  119. while (true)
  120. {
  121. try
  122. {
  123. Memory<byte> memory = writer.GetMemory(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
  124. //设备多久没发数据就断开连接 Receive Timeout.
  125. int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token);
  126. if (bytesRead == 0)
  127. {
  128. break;
  129. }
  130. writer.Advance(bytesRead);
  131. FlushResult result = await writer.FlushAsync(session.ReceiveTimeout.Token);
  132. if (result.IsCompleted)
  133. {
  134. break;
  135. }
  136. }
  137. catch (OperationCanceledException)
  138. {
  139. Logger.LogError($"[Receive Timeout Or Operation Canceled]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  140. break;
  141. }
  142. catch (System.Net.Sockets.SocketException ex)
  143. {
  144. Logger.LogError($"[{ex.SocketErrorCode},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  145. break;
  146. }
  147. catch (Exception ex)
  148. {
  149. Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  150. break;
  151. }
  152. }
  153. writer.Complete();
  154. }
  155. private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader)
  156. {
  157. while (true)
  158. {
  159. ReadResult result = await reader.ReadAsync(session.ReceiveTimeout.Token);
  160. if (result.IsCompleted)
  161. {
  162. break;
  163. }
  164. ReadOnlySequence<byte> buffer = result.Buffer;
  165. SequencePosition consumed = buffer.Start;
  166. SequencePosition examined = buffer.End;
  167. try
  168. {
  169. if (result.IsCanceled) break;
  170. if (buffer.Length > 0)
  171. {
  172. ReaderBuffer(ref buffer, session, out consumed);
  173. }
  174. }
  175. catch (Exception ex)
  176. {
  177. Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  178. break;
  179. }
  180. finally
  181. {
  182. reader.AdvanceTo(consumed, examined);
  183. }
  184. }
  185. reader.Complete();
  186. }
  187. private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed)
  188. {
  189. SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
  190. if (seqReader.TryPeek(out byte beginMark))
  191. {
  192. if (beginMark != JT808Package.BeginFlag) throw new ArgumentException("Not JT808 Packages.");
  193. }
  194. byte mark = 0;
  195. long totalConsumed = 0;
  196. while (!seqReader.End)
  197. {
  198. if (seqReader.IsNext(JT808Package.BeginFlag, advancePast: true))
  199. {
  200. if (mark == 1)
  201. {
  202. byte[] data = null;
  203. try
  204. {
  205. data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
  206. //过滤掉不是808标准包(14)
  207. //(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1
  208. if (data != null && data.Length > 14)
  209. {
  210. var package = Serializer.HeaderDeserialize(data);
  211. if (BlacklistManager.Contains(package.Header.TerminalPhoneNo))
  212. {
  213. if (Logger.IsEnabled(LogLevel.Warning))
  214. Logger.LogWarning($"[Blacklist {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
  215. session.ReceiveTimeout.Cancel();
  216. break;
  217. }
  218. # if DEBUG
  219. Interlocked.Increment(ref MessageReceiveCounter);
  220. if (Logger.IsEnabled(LogLevel.Trace))
  221. Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{package.Header.TerminalPhoneNo}]:{package.OriginalData.ToHexString()},Counter:{MessageReceiveCounter}");
  222. #else
  223. if (Logger.IsEnabled(LogLevel.Trace))
  224. Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
  225. #endif
  226. SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
  227. Processor(session, package);
  228. }
  229. }
  230. catch (NotImplementedException ex)
  231. {
  232. Logger.LogError(ex.Message, $"[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  233. }
  234. catch (JT808Exception ex)
  235. {
  236. Logger.LogError($"[HeaderDeserialize ErrorCode]:{ex.ErrorCode},[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  237. }
  238. totalConsumed += seqReader.Consumed - totalConsumed;
  239. if (seqReader.End) break;
  240. seqReader.Advance(1);
  241. mark = 0;
  242. }
  243. mark++;
  244. }
  245. else
  246. {
  247. seqReader.Advance(1);
  248. }
  249. }
  250. if (seqReader.Length == totalConsumed)
  251. {
  252. consumed = buffer.End;
  253. }
  254. else
  255. {
  256. consumed = buffer.GetPosition(totalConsumed);
  257. }
  258. }
  259. private void Processor(in IJT808Session session, in JT808HeaderPackage package)
  260. {
  261. try
  262. {
  263. MsgProducer?.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData);
  264. var downData = MessageHandler.Processor(in package);
  265. if (ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply != null && ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
  266. {
  267. if (!ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Contains(package.Header.MsgId))
  268. {
  269. session.SendAsync(downData);
  270. }
  271. }
  272. else
  273. {
  274. session.SendAsync(downData);
  275. }
  276. if (MsgReplyLoggingProducer != null)
  277. {
  278. if (downData != null)
  279. MsgReplyLoggingProducer.ProduceAsync(package.Header.TerminalPhoneNo, downData);
  280. }
  281. }
  282. catch (Exception ex)
  283. {
  284. Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
  285. }
  286. }
  287. /// <summary>
  288. ///
  289. /// </summary>
  290. /// <param name="cancellationToken"></param>
  291. /// <returns></returns>
  292. public Task StopAsync(CancellationToken cancellationToken)
  293. {
  294. Logger.LogInformation("JT808 Tcp Server Stop");
  295. foreach (var item in SessionManager.Sessions)
  296. {
  297. item.Value.Client.Close();
  298. }
  299. if (server?.Connected ?? false)
  300. server.Shutdown(SocketShutdown.Both);
  301. server?.Close();
  302. return Task.CompletedTask;
  303. }
  304. }
  305. }