Explorar el Código

1.完善客户端工具注册方式

2.增加客户端工厂移出功能
tags/pipeline-1.1.0
SmallChi(Koike) hace 4 años
padre
commit
46191e3e1b
Se han modificado 7 ficheros con 85 adiciones y 25 borrados
  1. +2
    -1
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Program.cs
  2. +14
    -0
      src/JT808.Gateway.Client/IJT808ClientBuilder.cs
  3. +24
    -0
      src/JT808.Gateway.Client/Internal/JT808ClientBuilderDefault.cs
  4. +22
    -21
      src/JT808.Gateway.Client/JT808ClientExtensions.cs
  5. +2
    -3
      src/JT808.Gateway.Client/JT808TcpClient.cs
  6. +19
    -0
      src/JT808.Gateway.Client/JT808TcpClientFactory.cs
  7. +2
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs

+ 2
- 1
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Program.cs Ver fichero

@@ -68,7 +68,8 @@ namespace JT808.Gateway.CleintBenchmark
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddClient();
.AddClient()
.AddClientReport();
services.AddHostedService<CleintBenchmarkHostedService>();
});
await serverHostBuilder.RunConsoleAsync();


+ 14
- 0
src/JT808.Gateway.Client/IJT808ClientBuilder.cs Ver fichero

@@ -0,0 +1,14 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Client
{
public interface IJT808ClientBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 24
- 0
src/JT808.Gateway.Client/Internal/JT808ClientBuilderDefault.cs Ver fichero

@@ -0,0 +1,24 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Client
{
internal class JT808ClientBuilderDefault : IJT808ClientBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808ClientBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 22
- 21
src/JT808.Gateway.Client/JT808ClientExtensions.cs Ver fichero

@@ -10,33 +10,34 @@ namespace JT808.Gateway.Client
{
public static class JT808ClientExtensions
{
public static IJT808Builder AddClient(this IJT808Builder jT808Builder)
public static IJT808ClientBuilder AddClient(this IJT808Builder jT808Builder)
{
jT808Builder.Services.AddSingleton<JT808SendAtomicCounterService>();
jT808Builder.Services.AddSingleton<JT808ReceiveAtomicCounterService>();
jT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
jT808Builder.Services.Configure<JT808ReportOptions>((options)=> { });
jT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808Builder;
JT808ClientBuilderDefault jT808ClientBuilderDefault = new JT808ClientBuilderDefault(jT808Builder);
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<JT808SendAtomicCounterService>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<JT808ReceiveAtomicCounterService>();
jT808ClientBuilderDefault.JT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
return jT808ClientBuilderDefault;
}
public static IJT808Builder AddClient(this IJT808Builder jT808Builder, IConfiguration Configuration)

public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808ReportOptions>((options) => { });
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808ClientBuilder;
}

public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration Configuration)
{
jT808Builder.Services.AddSingleton<JT808SendAtomicCounterService>();
jT808Builder.Services.AddSingleton<JT808ReceiveAtomicCounterService>();
jT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
jT808Builder.Services.Configure<JT808ReportOptions>(Configuration.GetSection("JT808ReportOptions"));
jT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808Builder;
jT808ClientBuilder.JT808Builder.Services.Configure<JT808ReportOptions>(Configuration.GetSection("JT808ReportOptions"));
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808ClientBuilder;
}

public static IJT808Builder AddClient(this IJT808Builder jT808Builder, Action<JT808ReportOptions> reportOptions)
public static IJT808ClientBuilder AddClientReport(this IJT808ClientBuilder jT808ClientBuilder, Action<JT808ReportOptions> reportOptions)
{
jT808Builder.Services.AddSingleton<JT808SendAtomicCounterService>();
jT808Builder.Services.AddSingleton<JT808ReceiveAtomicCounterService>();
jT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
jT808Builder.Services.Configure(reportOptions);
jT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808Builder;
jT808ClientBuilder.JT808Builder.Services.Configure(reportOptions);
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReportHostedService>();
return jT808ClientBuilder;
}
}
}

+ 2
- 3
src/JT808.Gateway.Client/JT808TcpClient.cs Ver fichero

@@ -19,7 +19,6 @@ namespace JT808.Gateway.Client
public class JT808TcpClient:IDisposable
{
//todo: 客户端的断线重连
//todo: 客户端的消息处理handler
private bool disposed = false;
private Socket clientSocket;
private readonly ILogger Logger;
@@ -72,7 +71,7 @@ namespace JT808.Gateway.Client
{
try
{
Memory<byte> memory = writer.GetMemory(80960);
Memory<byte> memory = writer.GetMemory(8096);
int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None, cancellationToken);
if (bytesRead == 0)
{
@@ -149,7 +148,7 @@ namespace JT808.Gateway.Client
{
try
{
var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan,minBufferSize:10240);
var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(),minBufferSize:8096);
ReceiveAtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");


+ 19
- 0
src/JT808.Gateway.Client/JT808TcpClientFactory.cs Ver fichero

@@ -14,6 +14,8 @@ namespace JT808.Gateway.Client
{
ValueTask<JT808TcpClient> Create(JT808DeviceConfig deviceConfig, CancellationToken cancellationToken);

void Remove(JT808DeviceConfig deviceConfign);

List<JT808TcpClient> GetAll();
}

@@ -55,6 +57,7 @@ namespace JT808.Gateway.Client
{
try
{
client.Value.Close();
client.Value.Dispose();
}
catch
@@ -67,5 +70,21 @@ namespace JT808.Gateway.Client
{
return dict.Values.ToList();
}

public void Remove(JT808DeviceConfig deviceConfig)
{
if(dict.TryRemove(deviceConfig.TerminalPhoneNo,out var client))
{
try
{
client.Close();
client.Dispose();
}
catch (Exception)
{
}
}
}
}
}

+ 2
- 0
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs Ver fichero

@@ -54,6 +54,8 @@ namespace JT808.Gateway.QueueHosting
.Builder()
//添加客户端工具
.AddClient()
.AddClientReport()
.Builder()
//添加客户端服务
.AddClientKafka()
.AddMsgConsumer(hostContext.Configuration)


Cargando…
Cancelar
Guardar