From 09414d2f395539ac8965568055085077312824a2 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Fri, 6 Nov 2020 18:25:44 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E5=AE=A2=E6=88=B7=E7=AB=AF?= =?UTF-8?q?=E6=96=AD=E7=BA=BF=E9=87=8D=E8=BF=9E=E6=9C=BA=E5=88=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../IJT808MessageProducer.cs | 27 ++++++++ .../Internal/JT808RetryBlockingCollection.cs | 17 +++++ .../JT808.Gateway.Client.csproj | 1 - .../JT808ClientExtensions.cs | 17 +++++ src/JT808.Gateway.Client/JT808DeviceConfig.cs | 5 +- src/JT808.Gateway.Client/JT808TcpClient.cs | 38 ++++++++-- .../Services/JT808RetryClientHostedService.cs | 69 +++++++++++++++++++ src/JT808.Gateway/JT808TcpServer.cs | 6 +- 8 files changed, 168 insertions(+), 12 deletions(-) create mode 100644 src/JT808.Gateway.Client/IJT808MessageProducer.cs create mode 100644 src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs create mode 100644 src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs diff --git a/src/JT808.Gateway.Client/IJT808MessageProducer.cs b/src/JT808.Gateway.Client/IJT808MessageProducer.cs new file mode 100644 index 0000000..89e3c00 --- /dev/null +++ b/src/JT808.Gateway.Client/IJT808MessageProducer.cs @@ -0,0 +1,27 @@ +using JT808.Protocol; +using System; +using System.Threading.Tasks; + +namespace JT808.Gateway.Client +{ + /// + /// 消息数据包 + /// + public interface IJT808MessageProducer : IDisposable + { + ValueTask ProduceAsync(JT808Package package); + } + + internal class JT808MessageProducerEmpty : IJT808MessageProducer + { + public void Dispose() + { + + } + + public ValueTask ProduceAsync(JT808Package package) + { + return default; + } + } +} diff --git a/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs b/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs new file mode 100644 index 0000000..76e4ccb --- /dev/null +++ b/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Client.Internal +{ + public class JT808RetryBlockingCollection + { + public BlockingCollection RetryBlockingCollection { get; } + + public JT808RetryBlockingCollection() + { + RetryBlockingCollection = new BlockingCollection(999999); + } + } +} diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj index d516862..ec88a32 100644 --- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj +++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj @@ -23,7 +23,6 @@ - diff --git a/src/JT808.Gateway.Client/JT808ClientExtensions.cs b/src/JT808.Gateway.Client/JT808ClientExtensions.cs index b5a1cb7..2700b00 100644 --- a/src/JT808.Gateway.Client/JT808ClientExtensions.cs +++ b/src/JT808.Gateway.Client/JT808ClientExtensions.cs @@ -5,6 +5,8 @@ using System.Text; using JT808.Protocol; using Microsoft.Extensions.Configuration; using JT808.Gateway.Client.Services; +using Microsoft.Extensions.DependencyInjection.Extensions; +using JT808.Gateway.Client.Internal; namespace JT808.Gateway.Client { @@ -13,12 +15,27 @@ namespace JT808.Gateway.Client public static IJT808ClientBuilder AddClient(this IJT808Builder jT808Builder) { JT808ClientBuilderDefault jT808ClientBuilderDefault = new JT808ClientBuilderDefault(jT808Builder); + jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton(); jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton(); jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton(); jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton(); + jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton(); return jT808ClientBuilderDefault; } + public static IJT808ClientBuilder AddMessageProducer(this IJT808ClientBuilder jT808ClientBuilder) + where TJT808MessageProducer: IJT808MessageProducer + { + jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MessageProducer), typeof(TJT808MessageProducer), ServiceLifetime.Singleton)); + return jT808ClientBuilder; + } + + public static IJT808ClientBuilder AddClientRetry(this IJT808ClientBuilder jT808ClientBuilder) + { + jT808ClientBuilder.JT808Builder.Services.AddHostedService(); + return jT808ClientBuilder; + } + public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder) { jT808ClientBuilder.JT808Builder.Services.Configure((options) => { }); diff --git a/src/JT808.Gateway.Client/JT808DeviceConfig.cs b/src/JT808.Gateway.Client/JT808DeviceConfig.cs index 4b49e11..7169005 100644 --- a/src/JT808.Gateway.Client/JT808DeviceConfig.cs +++ b/src/JT808.Gateway.Client/JT808DeviceConfig.cs @@ -24,7 +24,10 @@ namespace JT808.Gateway.Client /// 心跳时间(秒) /// public int Heartbeat { get; set; } = 30; - + /// + /// 自动重连 默认true + /// + public bool AutoReconnection { get; set; } = true; public IJT808MsgSNDistributed MsgSNDistributed { get; } } } diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs index 9a10211..17096d4 100644 --- a/src/JT808.Gateway.Client/JT808TcpClient.cs +++ b/src/JT808.Gateway.Client/JT808TcpClient.cs @@ -12,21 +12,24 @@ using JT808.Protocol.Extensions; using JT808.Gateway.Client.Services; using JT808.Gateway.Client.Metadata; using Microsoft.Extensions.DependencyInjection; +using JT808.Gateway.Client.Internal; namespace JT808.Gateway.Client { public class JT808TcpClient:IDisposable { - //todo: 客户端的断线重连 + //todo: 客户端心跳时间 private bool disposed = false; private Socket clientSocket; private readonly ILogger Logger; private readonly JT808Serializer JT808Serializer; private readonly JT808SendAtomicCounterService SendAtomicCounterService; private readonly JT808ReceiveAtomicCounterService ReceiveAtomicCounterService; + private readonly JT808RetryBlockingCollection RetryBlockingCollection; private bool socketState = true; public JT808DeviceConfig DeviceConfig { get; } + private IJT808MessageProducer producer; public JT808TcpClient( JT808DeviceConfig deviceConfig, IServiceProvider serviceProvider) @@ -36,6 +39,8 @@ namespace JT808.Gateway.Client ReceiveAtomicCounterService = serviceProvider.GetRequiredService(); JT808Serializer = serviceProvider.GetRequiredService().GetSerializer(); Logger = serviceProvider.GetRequiredService().CreateLogger(); + producer = serviceProvider.GetRequiredService(); + RetryBlockingCollection = serviceProvider.GetRequiredService(); } public async ValueTask ConnectAsync(EndPoint remoteEndPoint) { @@ -47,6 +52,8 @@ namespace JT808.Gateway.Client } catch (Exception e) { + Logger.LogError(e, "ConnectAsync Error"); + RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig); return false; } } @@ -63,6 +70,7 @@ namespace JT808.Gateway.Client Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken); Task reading = ReadPipeAsync(session, pipe.Reader); await Task.WhenAll(reading, writing); + RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig); }, clientSocket); } private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken) @@ -79,6 +87,11 @@ namespace JT808.Gateway.Client } writer.Advance(bytesRead); } + catch (OperationCanceledException ex) + { + Logger.LogError($"[Receive Timeout]:{session.RemoteEndPoint}"); + break; + } catch (System.Net.Sockets.SocketException ex) { Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.RemoteEndPoint}"); @@ -148,10 +161,15 @@ namespace JT808.Gateway.Client { try { - var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(),minBufferSize:8096); + var data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + var package = JT808Serializer.Deserialize(data, minBufferSize: 8096); + if (producer != null) + { + producer.ProduceAsync(package); + } ReceiveAtomicCounterService.MsgSuccessIncrement(); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}"); - if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{data.ToHexString()}"); } catch (JT808Exception ex) { @@ -178,9 +196,9 @@ namespace JT808.Gateway.Client consumed = buffer.GetPosition(totalConsumed); } } - public async ValueTask SendAsync(JT808ClientRequest message) + public async ValueTask SendAsync(JT808ClientRequest message) { - if (disposed) return; + if (disposed) return false; if (IsOpen && socketState) { if (message.Package != null) @@ -188,38 +206,44 @@ namespace JT808.Gateway.Client try { var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize); - //clientSocket.Send(sendData); await clientSocket.SendAsync(sendData, SocketFlags.None); SendAtomicCounterService.MsgSuccessIncrement(); + return true; } catch (System.Net.Sockets.SocketException ex) { socketState = false; Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]"); + return false; } catch (System.Exception ex) { Logger.LogError(ex.Message); + return false; } } else if (message.HexData != null) { try { - clientSocket.Send(message.HexData); + await clientSocket.SendAsync(message.HexData, SocketFlags.None); SendAtomicCounterService.MsgSuccessIncrement(); + return true; } catch (System.Net.Sockets.SocketException ex) { socketState = false; Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]"); + return false; } catch (System.Exception ex) { Logger.LogError(ex.Message); + return false; } } } + return false; } public void Close() diff --git a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs new file mode 100644 index 0000000..dba8a83 --- /dev/null +++ b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs @@ -0,0 +1,69 @@ +using JT808.Gateway.Client.Internal; +using JT808.Gateway.Client.Metadata; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Client.Services +{ + internal class JT808RetryClientHostedService : BackgroundService + { + private readonly IJT808TcpClientFactory jT808TcpClientFactory; + + private readonly ILogger logger; + + private readonly JT808RetryBlockingCollection RetryBlockingCollection; + + public JT808RetryClientHostedService( + JT808RetryBlockingCollection retryBlockingCollection, + ILoggerFactory loggerFactory, + IJT808TcpClientFactory jT808TcpClientFactory) + { + logger = loggerFactory.CreateLogger(); + this.jT808TcpClientFactory = jT808TcpClientFactory; + RetryBlockingCollection = retryBlockingCollection; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(stoppingToken)) + { + try + { + jT808TcpClientFactory.Remove(item); + if (item.AutoReconnection) + { + var result = await jT808TcpClientFactory.Create(item, stoppingToken); + if (result != null) + { + if (logger.IsEnabled(LogLevel.Information)) + { + logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}"); + } + } + else + { + if (logger.IsEnabled(LogLevel.Warning)) + { + logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}"); + } + } + } + } + catch (Exception ex) + { + logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}"); + } + } + } + } +} diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index 1b85faf..6dec164 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -62,7 +62,7 @@ namespace JT808.Gateway server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); - server.LingerState = new LingerOption(false, 0); + server.LingerState = new LingerOption(true, 0); server.Bind(IPEndPoint); server.Listen(Configuration.SoBacklog); } @@ -201,11 +201,11 @@ namespace JT808.Gateway } catch (NotImplementedException ex) { - Logger.LogError(ex.Message); + Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint}"); } catch (JT808Exception ex) { - Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}"); + Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint}"); } totalConsumed += (seqReader.Consumed - totalConsumed); if (seqReader.End) break;