diff --git a/src/JT808.Gateway.Client/IJT808MessageProducer.cs b/src/JT808.Gateway.Client/IJT808MessageProducer.cs
new file mode 100644
index 0000000..89e3c00
--- /dev/null
+++ b/src/JT808.Gateway.Client/IJT808MessageProducer.cs
@@ -0,0 +1,27 @@
+using JT808.Protocol;
+using System;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Client
+{
+ ///
+ /// 消息数据包
+ ///
+ public interface IJT808MessageProducer : IDisposable
+ {
+ ValueTask ProduceAsync(JT808Package package);
+ }
+
+ internal class JT808MessageProducerEmpty : IJT808MessageProducer
+ {
+ public void Dispose()
+ {
+
+ }
+
+ public ValueTask ProduceAsync(JT808Package package)
+ {
+ return default;
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs b/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs
new file mode 100644
index 0000000..76e4ccb
--- /dev/null
+++ b/src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs
@@ -0,0 +1,17 @@
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Client.Internal
+{
+ public class JT808RetryBlockingCollection
+ {
+ public BlockingCollection RetryBlockingCollection { get; }
+
+ public JT808RetryBlockingCollection()
+ {
+ RetryBlockingCollection = new BlockingCollection(999999);
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
index d516862..ec88a32 100644
--- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
+++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
@@ -23,7 +23,6 @@
-
diff --git a/src/JT808.Gateway.Client/JT808ClientExtensions.cs b/src/JT808.Gateway.Client/JT808ClientExtensions.cs
index b5a1cb7..2700b00 100644
--- a/src/JT808.Gateway.Client/JT808ClientExtensions.cs
+++ b/src/JT808.Gateway.Client/JT808ClientExtensions.cs
@@ -5,6 +5,8 @@ using System.Text;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using JT808.Gateway.Client.Services;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using JT808.Gateway.Client.Internal;
namespace JT808.Gateway.Client
{
@@ -13,12 +15,27 @@ namespace JT808.Gateway.Client
public static IJT808ClientBuilder AddClient(this IJT808Builder jT808Builder)
{
JT808ClientBuilderDefault jT808ClientBuilderDefault = new JT808ClientBuilderDefault(jT808Builder);
+ jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton();
+ jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton();
return jT808ClientBuilderDefault;
}
+ public static IJT808ClientBuilder AddMessageProducer(this IJT808ClientBuilder jT808ClientBuilder)
+ where TJT808MessageProducer: IJT808MessageProducer
+ {
+ jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MessageProducer), typeof(TJT808MessageProducer), ServiceLifetime.Singleton));
+ return jT808ClientBuilder;
+ }
+
+ public static IJT808ClientBuilder AddClientRetry(this IJT808ClientBuilder jT808ClientBuilder)
+ {
+ jT808ClientBuilder.JT808Builder.Services.AddHostedService();
+ return jT808ClientBuilder;
+ }
+
public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.Configure((options) => { });
diff --git a/src/JT808.Gateway.Client/JT808DeviceConfig.cs b/src/JT808.Gateway.Client/JT808DeviceConfig.cs
index 4b49e11..7169005 100644
--- a/src/JT808.Gateway.Client/JT808DeviceConfig.cs
+++ b/src/JT808.Gateway.Client/JT808DeviceConfig.cs
@@ -24,7 +24,10 @@ namespace JT808.Gateway.Client
/// 心跳时间(秒)
///
public int Heartbeat { get; set; } = 30;
-
+ ///
+ /// 自动重连 默认true
+ ///
+ public bool AutoReconnection { get; set; } = true;
public IJT808MsgSNDistributed MsgSNDistributed { get; }
}
}
diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs
index 9a10211..17096d4 100644
--- a/src/JT808.Gateway.Client/JT808TcpClient.cs
+++ b/src/JT808.Gateway.Client/JT808TcpClient.cs
@@ -12,21 +12,24 @@ using JT808.Protocol.Extensions;
using JT808.Gateway.Client.Services;
using JT808.Gateway.Client.Metadata;
using Microsoft.Extensions.DependencyInjection;
+using JT808.Gateway.Client.Internal;
namespace JT808.Gateway.Client
{
public class JT808TcpClient:IDisposable
{
- //todo: 客户端的断线重连
+ //todo: 客户端心跳时间
private bool disposed = false;
private Socket clientSocket;
private readonly ILogger Logger;
private readonly JT808Serializer JT808Serializer;
private readonly JT808SendAtomicCounterService SendAtomicCounterService;
private readonly JT808ReceiveAtomicCounterService ReceiveAtomicCounterService;
+ private readonly JT808RetryBlockingCollection RetryBlockingCollection;
private bool socketState = true;
public JT808DeviceConfig DeviceConfig { get; }
+ private IJT808MessageProducer producer;
public JT808TcpClient(
JT808DeviceConfig deviceConfig,
IServiceProvider serviceProvider)
@@ -36,6 +39,8 @@ namespace JT808.Gateway.Client
ReceiveAtomicCounterService = serviceProvider.GetRequiredService();
JT808Serializer = serviceProvider.GetRequiredService().GetSerializer();
Logger = serviceProvider.GetRequiredService().CreateLogger();
+ producer = serviceProvider.GetRequiredService();
+ RetryBlockingCollection = serviceProvider.GetRequiredService();
}
public async ValueTask ConnectAsync(EndPoint remoteEndPoint)
{
@@ -47,6 +52,8 @@ namespace JT808.Gateway.Client
}
catch (Exception e)
{
+ Logger.LogError(e, "ConnectAsync Error");
+ RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
return false;
}
}
@@ -63,6 +70,7 @@ namespace JT808.Gateway.Client
Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
+ RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
}, clientSocket);
}
private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken)
@@ -79,6 +87,11 @@ namespace JT808.Gateway.Client
}
writer.Advance(bytesRead);
}
+ catch (OperationCanceledException ex)
+ {
+ Logger.LogError($"[Receive Timeout]:{session.RemoteEndPoint}");
+ break;
+ }
catch (System.Net.Sockets.SocketException ex)
{
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.RemoteEndPoint}");
@@ -148,10 +161,15 @@ namespace JT808.Gateway.Client
{
try
{
- var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(),minBufferSize:8096);
+ var data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
+ var package = JT808Serializer.Deserialize(data, minBufferSize: 8096);
+ if (producer != null)
+ {
+ producer.ProduceAsync(package);
+ }
ReceiveAtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}");
- if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
+ if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{data.ToHexString()}");
}
catch (JT808Exception ex)
{
@@ -178,9 +196,9 @@ namespace JT808.Gateway.Client
consumed = buffer.GetPosition(totalConsumed);
}
}
- public async ValueTask SendAsync(JT808ClientRequest message)
+ public async ValueTask SendAsync(JT808ClientRequest message)
{
- if (disposed) return;
+ if (disposed) return false;
if (IsOpen && socketState)
{
if (message.Package != null)
@@ -188,38 +206,44 @@ namespace JT808.Gateway.Client
try
{
var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize);
- //clientSocket.Send(sendData);
await clientSocket.SendAsync(sendData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement();
+ return true;
}
catch (System.Net.Sockets.SocketException ex)
{
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
+ return false;
}
catch (System.Exception ex)
{
Logger.LogError(ex.Message);
+ return false;
}
}
else if (message.HexData != null)
{
try
{
- clientSocket.Send(message.HexData);
+ await clientSocket.SendAsync(message.HexData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement();
+ return true;
}
catch (System.Net.Sockets.SocketException ex)
{
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
+ return false;
}
catch (System.Exception ex)
{
Logger.LogError(ex.Message);
+ return false;
}
}
}
+ return false;
}
public void Close()
diff --git a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
new file mode 100644
index 0000000..dba8a83
--- /dev/null
+++ b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
@@ -0,0 +1,69 @@
+using JT808.Gateway.Client.Internal;
+using JT808.Gateway.Client.Metadata;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Linq;
+using System.Text;
+using System.Text.Json;
+using System.Text.Json.Serialization;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Client.Services
+{
+ internal class JT808RetryClientHostedService : BackgroundService
+ {
+ private readonly IJT808TcpClientFactory jT808TcpClientFactory;
+
+ private readonly ILogger logger;
+
+ private readonly JT808RetryBlockingCollection RetryBlockingCollection;
+
+ public JT808RetryClientHostedService(
+ JT808RetryBlockingCollection retryBlockingCollection,
+ ILoggerFactory loggerFactory,
+ IJT808TcpClientFactory jT808TcpClientFactory)
+ {
+ logger = loggerFactory.CreateLogger();
+ this.jT808TcpClientFactory = jT808TcpClientFactory;
+ RetryBlockingCollection = retryBlockingCollection;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(stoppingToken))
+ {
+ try
+ {
+ jT808TcpClientFactory.Remove(item);
+ if (item.AutoReconnection)
+ {
+ var result = await jT808TcpClientFactory.Create(item, stoppingToken);
+ if (result != null)
+ {
+ if (logger.IsEnabled(LogLevel.Information))
+ {
+ logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
+ }
+ }
+ else
+ {
+ if (logger.IsEnabled(LogLevel.Warning))
+ {
+ logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
+ }
+ }
+ }
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
+ }
+ }
+ }
+ }
+}
diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs
index 1b85faf..6dec164 100644
--- a/src/JT808.Gateway/JT808TcpServer.cs
+++ b/src/JT808.Gateway/JT808TcpServer.cs
@@ -62,7 +62,7 @@ namespace JT808.Gateway
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize);
- server.LingerState = new LingerOption(false, 0);
+ server.LingerState = new LingerOption(true, 0);
server.Bind(IPEndPoint);
server.Listen(Configuration.SoBacklog);
}
@@ -201,11 +201,11 @@ namespace JT808.Gateway
}
catch (NotImplementedException ex)
{
- Logger.LogError(ex.Message);
+ Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint}");
}
catch (JT808Exception ex)
{
- Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}");
+ Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break;