Przeglądaj źródła

增加客户端断线重连机制

tags/pipeline-1.1.0
SmallChi(Koike) 4 lat temu
rodzic
commit
09414d2f39
8 zmienionych plików z 168 dodań i 12 usunięć
  1. +27
    -0
      src/JT808.Gateway.Client/IJT808MessageProducer.cs
  2. +17
    -0
      src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs
  3. +0
    -1
      src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
  4. +17
    -0
      src/JT808.Gateway.Client/JT808ClientExtensions.cs
  5. +4
    -1
      src/JT808.Gateway.Client/JT808DeviceConfig.cs
  6. +31
    -7
      src/JT808.Gateway.Client/JT808TcpClient.cs
  7. +69
    -0
      src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
  8. +3
    -3
      src/JT808.Gateway/JT808TcpServer.cs

+ 27
- 0
src/JT808.Gateway.Client/IJT808MessageProducer.cs Wyświetl plik

@@ -0,0 +1,27 @@
using JT808.Protocol;
using System;
using System.Threading.Tasks;

namespace JT808.Gateway.Client
{
/// <summary>
/// 消息数据包
/// </summary>
public interface IJT808MessageProducer : IDisposable
{
ValueTask ProduceAsync(JT808Package package);
}

internal class JT808MessageProducerEmpty : IJT808MessageProducer
{
public void Dispose()
{
}

public ValueTask ProduceAsync(JT808Package package)
{
return default;
}
}
}

+ 17
- 0
src/JT808.Gateway.Client/Internal/JT808RetryBlockingCollection.cs Wyświetl plik

@@ -0,0 +1,17 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Client.Internal
{
public class JT808RetryBlockingCollection
{
public BlockingCollection<JT808DeviceConfig> RetryBlockingCollection { get; }

public JT808RetryBlockingCollection()
{
RetryBlockingCollection = new BlockingCollection<JT808DeviceConfig>(999999);
}
}
}

+ 0
- 1
src/JT808.Gateway.Client/JT808.Gateway.Client.csproj Wyświetl plik

@@ -23,7 +23,6 @@
<ItemGroup>
<PackageReference Include="JT808" Version="2.2.14" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.9" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.9" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.9" />


+ 17
- 0
src/JT808.Gateway.Client/JT808ClientExtensions.cs Wyświetl plik

@@ -5,6 +5,8 @@ using System.Text;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using JT808.Gateway.Client.Services;
using Microsoft.Extensions.DependencyInjection.Extensions;
using JT808.Gateway.Client.Internal;

namespace JT808.Gateway.Client
{
@@ -13,12 +15,27 @@ namespace JT808.Gateway.Client
public static IJT808ClientBuilder AddClient(this IJT808Builder jT808Builder)
{
JT808ClientBuilderDefault jT808ClientBuilderDefault = new JT808ClientBuilderDefault(jT808Builder);
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<JT808RetryBlockingCollection>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<JT808SendAtomicCounterService>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<JT808ReceiveAtomicCounterService>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<IJT808MessageProducer, JT808MessageProducerEmpty>();
return jT808ClientBuilderDefault;
}

public static IJT808ClientBuilder AddMessageProducer<TJT808MessageProducer>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808MessageProducer: IJT808MessageProducer
{
jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MessageProducer), typeof(TJT808MessageProducer), ServiceLifetime.Singleton));
return jT808ClientBuilder;
}

public static IJT808ClientBuilder AddClientRetry(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808RetryClientHostedService>();
return jT808ClientBuilder;
}
public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808ReportOptions>((options) => { });


+ 4
- 1
src/JT808.Gateway.Client/JT808DeviceConfig.cs Wyświetl plik

@@ -24,7 +24,10 @@ namespace JT808.Gateway.Client
/// 心跳时间(秒)
/// </summary>
public int Heartbeat { get; set; } = 30;

/// <summary>
/// 自动重连 默认true
/// </summary>
public bool AutoReconnection { get; set; } = true;
public IJT808MsgSNDistributed MsgSNDistributed { get; }
}
}

+ 31
- 7
src/JT808.Gateway.Client/JT808TcpClient.cs Wyświetl plik

@@ -12,21 +12,24 @@ using JT808.Protocol.Extensions;
using JT808.Gateway.Client.Services;
using JT808.Gateway.Client.Metadata;
using Microsoft.Extensions.DependencyInjection;
using JT808.Gateway.Client.Internal;

namespace JT808.Gateway.Client
{

public class JT808TcpClient:IDisposable
{
//todo: 客户端的断线重连
//todo: 客户端心跳时间
private bool disposed = false;
private Socket clientSocket;
private readonly ILogger Logger;
private readonly JT808Serializer JT808Serializer;
private readonly JT808SendAtomicCounterService SendAtomicCounterService;
private readonly JT808ReceiveAtomicCounterService ReceiveAtomicCounterService;
private readonly JT808RetryBlockingCollection RetryBlockingCollection;
private bool socketState = true;
public JT808DeviceConfig DeviceConfig { get; }
private IJT808MessageProducer producer;
public JT808TcpClient(
JT808DeviceConfig deviceConfig,
IServiceProvider serviceProvider)
@@ -36,6 +39,8 @@ namespace JT808.Gateway.Client
ReceiveAtomicCounterService = serviceProvider.GetRequiredService<JT808ReceiveAtomicCounterService>();
JT808Serializer = serviceProvider.GetRequiredService<IJT808Config>().GetSerializer();
Logger = serviceProvider.GetRequiredService<ILoggerFactory>().CreateLogger<JT808TcpClient>();
producer = serviceProvider.GetRequiredService<IJT808MessageProducer>();
RetryBlockingCollection = serviceProvider.GetRequiredService<JT808RetryBlockingCollection>();
}
public async ValueTask<bool> ConnectAsync(EndPoint remoteEndPoint)
{
@@ -47,6 +52,8 @@ namespace JT808.Gateway.Client
}
catch (Exception e)
{
Logger.LogError(e, "ConnectAsync Error");
RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
return false;
}
}
@@ -63,6 +70,7 @@ namespace JT808.Gateway.Client
Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
}, clientSocket);
}
private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken)
@@ -79,6 +87,11 @@ namespace JT808.Gateway.Client
}
writer.Advance(bytesRead);
}
catch (OperationCanceledException ex)
{
Logger.LogError($"[Receive Timeout]:{session.RemoteEndPoint}");
break;
}
catch (System.Net.Sockets.SocketException ex)
{
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.RemoteEndPoint}");
@@ -148,10 +161,15 @@ namespace JT808.Gateway.Client
{
try
{
var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(),minBufferSize:8096);
var data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
var package = JT808Serializer.Deserialize(data, minBufferSize: 8096);
if (producer != null)
{
producer.ProduceAsync(package);
}
ReceiveAtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{data.ToHexString()}");
}
catch (JT808Exception ex)
{
@@ -178,9 +196,9 @@ namespace JT808.Gateway.Client
consumed = buffer.GetPosition(totalConsumed);
}
}
public async ValueTask SendAsync(JT808ClientRequest message)
public async ValueTask<bool> SendAsync(JT808ClientRequest message)
{
if (disposed) return;
if (disposed) return false;
if (IsOpen && socketState)
{
if (message.Package != null)
@@ -188,38 +206,44 @@ namespace JT808.Gateway.Client
try
{
var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize);
//clientSocket.Send(sendData);
await clientSocket.SendAsync(sendData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement();
return true;
}
catch (System.Net.Sockets.SocketException ex)
{
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
return false;
}
catch (System.Exception ex)
{
Logger.LogError(ex.Message);
return false;
}
}
else if (message.HexData != null)
{
try
{
clientSocket.Send(message.HexData);
await clientSocket.SendAsync(message.HexData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement();
return true;
}
catch (System.Net.Sockets.SocketException ex)
{
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
return false;
}
catch (System.Exception ex)
{
Logger.LogError(ex.Message);
return false;
}
}
}
return false;
}

public void Close()


+ 69
- 0
src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs Wyświetl plik

@@ -0,0 +1,69 @@
using JT808.Gateway.Client.Internal;
using JT808.Gateway.Client.Metadata;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Client.Services
{
internal class JT808RetryClientHostedService : BackgroundService
{
private readonly IJT808TcpClientFactory jT808TcpClientFactory;

private readonly ILogger logger;

private readonly JT808RetryBlockingCollection RetryBlockingCollection;

public JT808RetryClientHostedService(
JT808RetryBlockingCollection retryBlockingCollection,
ILoggerFactory loggerFactory,
IJT808TcpClientFactory jT808TcpClientFactory)
{
logger = loggerFactory.CreateLogger<JT808RetryClientHostedService>();
this.jT808TcpClientFactory = jT808TcpClientFactory;
RetryBlockingCollection = retryBlockingCollection;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(stoppingToken))
{
try
{
jT808TcpClientFactory.Remove(item);
if (item.AutoReconnection)
{
var result = await jT808TcpClientFactory.Create(item, stoppingToken);
if (result != null)
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
}
}
else
{
if (logger.IsEnabled(LogLevel.Warning))
{
logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
}
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
}
}
}
}
}

+ 3
- 3
src/JT808.Gateway/JT808TcpServer.cs Wyświetl plik

@@ -62,7 +62,7 @@ namespace JT808.Gateway
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize);
server.LingerState = new LingerOption(false, 0);
server.LingerState = new LingerOption(true, 0);
server.Bind(IPEndPoint);
server.Listen(Configuration.SoBacklog);
}
@@ -201,11 +201,11 @@ namespace JT808.Gateway
}
catch (NotImplementedException ex)
{
Logger.LogError(ex.Message);
Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint}");
}
catch (JT808Exception ex)
{
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break;


Ładowanie…
Anuluj
Zapisz