Explorar el Código

pipeline-1.1.0-preview4

1.调整消息处理程序
2.调整kafka异步消费模式
3.修改简单接入方式
4.调整http应答方式
tags/pipeline-1.1.0
SmallChi(Koike) hace 4 años
padre
commit
d287a85b32
Se han modificado 37 ficheros con 285 adiciones y 193 borrados
  1. +3
    -3
      simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
  2. +2
    -1
      simples/JT808.Gateway.SimpleClient/Program.cs
  3. +1
    -1
      simples/JT808.Gateway.SimpleQueueNotification/JT808.Gateway.SimpleQueueNotification.csproj
  4. +1
    -0
      simples/JT808.Gateway.SimpleQueueNotification/Startup.cs
  5. +2
    -2
      simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
  6. +3
    -1
      simples/JT808.Gateway.SimpleQueueServer/appsettings.json
  7. +6
    -6
      simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
  8. +6
    -7
      simples/JT808.Gateway.SimpleServer/Impl/JT808MessageHandlerImpl.cs
  9. +2
    -1
      simples/JT808.Gateway.SimpleServer/Impl/JT808MsgIdHandler.cs
  10. +1
    -1
      simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs
  11. +5
    -5
      simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
  12. +1
    -1
      simples/JT808.Gateway.SimpleServer/appsettings.json
  13. +1
    -1
      src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs
  14. +1
    -1
      src/JT808.Gateway.Abstractions/IJT808MsgReplyLoggingProducer.cs
  15. +1
    -1
      src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs
  16. +1
    -1
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  17. +1
    -1
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml
  18. +4
    -32
      src/JT808.Gateway.Abstractions/JT808MessageHandler.cs
  19. +2
    -6
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
  20. +36
    -34
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs
  21. +2
    -2
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/appsettings.json
  22. +1
    -0
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/wwwroot/signalr.min.js.map
  23. +18
    -5
      src/JT808.Gateway.Kafka/JT808MsgProducer.cs
  24. +20
    -7
      src/JT808.Gateway.Kafka/JT808MsgReplyLoggingProducer.cs
  25. +18
    -5
      src/JT808.Gateway.Kafka/JT808SessionProducer.cs
  26. +2
    -11
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs
  27. +1
    -1
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionProducer.cs
  28. +2
    -9
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs
  29. +3
    -3
      src/JT808.Gateway/Extensions/JT808HttpContextExtensions.cs
  30. +2
    -2
      src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs
  31. +2
    -2
      src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs
  32. +4
    -2
      src/JT808.Gateway/JT808.Gateway.xml
  33. +8
    -3
      src/JT808.Gateway/JT808HttpServer.cs
  34. +57
    -20
      src/JT808.Gateway/JT808TcpServer.cs
  35. +44
    -9
      src/JT808.Gateway/JT808UdpServer.cs
  36. +20
    -5
      src/JT808.Gateway/Session/JT808SessionManager.cs
  37. +1
    -1
      src/Version.props

+ 3
- 3
simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj Ver fichero

@@ -6,9 +6,9 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway.Abstractions" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Client" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.WebApiClientTool" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Abstractions" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.Client" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.WebApiClientTool" Version="1.1.0-preview4" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />


+ 2
- 1
simples/JT808.Gateway.SimpleClient/Program.cs Ver fichero

@@ -30,8 +30,9 @@ namespace JT808.Gateway.SimpleClient
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure() services.AddJT808Configure()
.AddClient() .AddClient()
.Builder()
.AddWebApiClientTool(new Uri("http://127.0.0.1:828/"), "123456")
; ;
services.AddJT808WebApiClientTool(new Uri("http://127.0.0.1:828/"),"12346");
services.AddHostedService<UpService>(); services.AddHostedService<UpService>();
services.AddHostedService<Up2019Service>(); services.AddHostedService<Up2019Service>();
services.AddHostedService<CallHttpClientJob>(); services.AddHostedService<CallHttpClientJob>();


+ 1
- 1
simples/JT808.Gateway.SimpleQueueNotification/JT808.Gateway.SimpleQueueNotification.csproj Ver fichero

@@ -27,7 +27,7 @@
</ItemGroup> </ItemGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.2-preview1" /> <PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.2-preview1" />
</ItemGroup> </ItemGroup>
</Project> </Project>

+ 1
- 0
simples/JT808.Gateway.SimpleQueueNotification/Startup.cs Ver fichero

@@ -67,6 +67,7 @@ namespace JT808.Gateway.SimpleQueueNotification
{ {
app.UseDefaultFiles(); app.UseDefaultFiles();
app.UseStaticFiles(); app.UseStaticFiles();
app.UseFileServer();
app.UseRouting(); app.UseRouting();
app.UseJT808JwtVerify(); app.UseJT808JwtVerify();
app.UseCors("CorsPolicy"); app.UseCors("CorsPolicy");


+ 2
- 2
simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj Ver fichero

@@ -6,8 +6,8 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview4" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />


+ 3
- 1
simples/JT808.Gateway.SimpleQueueServer/appsettings.json Ver fichero

@@ -16,7 +16,9 @@
"TcpPort": 808, "TcpPort": 808,
"UdpPort": 808, "UdpPort": 808,
"WebApiPort": 828, "WebApiPort": 828,
"WebApiToken": "smallchi518"
"WebApiToken": "123456",
//"IgnoreMsgIdReply":[512]
"IgnoreMsgIdReply": []
}, },
"JT808MsgProducerConfig": { "JT808MsgProducerConfig": {
"TopicName": "JT808Msg", "TopicName": "JT808Msg",


+ 6
- 6
simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj Ver fichero

@@ -6,14 +6,14 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.Kafka" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.2-preview1" /> <PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.2-preview1" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.Traffic" Version="1.0.2-preview1" /> <PackageReference Include="JT808.Gateway.Traffic" Version="1.0.2-preview1" />
<PackageReference Include="JT808.Gateway.Transmit" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Transmit" Version="1.1.0-preview4" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />


+ 6
- 7
simples/JT808.Gateway.SimpleServer/Impl/JT808MessageHandlerImpl.cs Ver fichero

@@ -23,7 +23,7 @@ namespace JT808.Gateway.SimpleServer.Impl
JT808TransmitService jT808TransmitService, JT808TransmitService jT808TransmitService,
IJT808MsgLogging jT808MsgLogging, IJT808MsgLogging jT808MsgLogging,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor, IJT808MsgProducer msgProducer, IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, IJT808Config jT808Config) IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor, IJT808MsgProducer msgProducer, IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, IJT808Config jT808Config)
: base(jT808ConfigurationOptionsMonitor, msgProducer, msgReplyLoggingProducer, jT808Config)
: base(jT808Config)
{ {
this.jT808TransmitService = jT808TransmitService; this.jT808TransmitService = jT808TransmitService;
this.jT808MsgLogging = jT808MsgLogging; this.jT808MsgLogging = jT808MsgLogging;
@@ -37,7 +37,7 @@ namespace JT808.Gateway.SimpleServer.Impl
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="session"></param> /// <param name="session"></param>
public override byte[] Processor(JT808HeaderPackage request, IJT808Session session)
public override byte[] Processor(in JT808HeaderPackage request)
{ {
//AOP 可以自定义添加一些东西:上下行日志、数据转发 //AOP 可以自定义添加一些东西:上下行日志、数据转发
logger.LogDebug("可以自定义添加一些东西:上下行日志、数据转发"); logger.LogDebug("可以自定义添加一些东西:上下行日志、数据转发");
@@ -54,7 +54,7 @@ namespace JT808.Gateway.SimpleServer.Impl
//上行日志(可同步也可以使用队列进行异步) //上行日志(可同步也可以使用队列进行异步)
jT808MsgLogging.Processor(parameter, JT808MsgLoggingType.up); jT808MsgLogging.Processor(parameter, JT808MsgLoggingType.up);
//处理上行消息 //处理上行消息
var down= base.Processor(request, session);
var down= base.Processor(request);
//下行日志(可同步也可以使用队列进行异步) //下行日志(可同步也可以使用队列进行异步)
jT808MsgLogging.Processor((request.Header.TerminalPhoneNo, down), JT808MsgLoggingType.down); jT808MsgLogging.Processor((request.Header.TerminalPhoneNo, down), JT808MsgLoggingType.down);
return down; return down;
@@ -64,12 +64,11 @@ namespace JT808.Gateway.SimpleServer.Impl
/// 重写自带的消息 /// 重写自带的消息
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="session"></param>
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session)
public override byte[] Msg0x0200(JT808HeaderPackage request)
{ {
//logger.LogDebug("重写自带Msg0x0200的消息"); //logger.LogDebug("重写自带Msg0x0200的消息");
logger.LogDebug($"重写自带Msg0x0200的消息{request.Header.TerminalPhoneNo}-{request.OriginalData.ToArray().ToHexString()}"); logger.LogDebug($"重写自带Msg0x0200的消息{request.Header.TerminalPhoneNo}-{request.OriginalData.ToArray().ToHexString()}");
return base.Msg0x0200(request, session);
return base.Msg0x0200(request);
} }


/// <summary> /// <summary>
@@ -77,7 +76,7 @@ namespace JT808.Gateway.SimpleServer.Impl
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <returns></returns> /// <returns></returns>
public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session)
public byte[] Msg0x9999(JT808HeaderPackage request)
{ {
logger.LogDebug("自定义消息"); logger.LogDebug("自定义消息");
return default; return default;


+ 2
- 1
simples/JT808.Gateway.SimpleServer/Impl/JT808MsgIdHandler.cs Ver fichero

@@ -16,7 +16,8 @@ namespace JT808.Gateway.SimpleServer.Impl


public JT808MsgIdHandler( public JT808MsgIdHandler(
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor, IJT808MsgProducer msgProducer, IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor, msgProducer, msgReplyLoggingProducer, jT808Config)
IJT808Config jT808Config)
: base(jT808Config)
{ {
Logger = loggerFactory.CreateLogger<JT808MsgIdHandler>(); Logger = loggerFactory.CreateLogger<JT808MsgIdHandler>();
} }


+ 1
- 1
simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs Ver fichero

@@ -18,7 +18,7 @@ namespace JT808.Gateway.SimpleServer.Impl
JT808SessionService = jT808SessionService; JT808SessionService = jT808SessionService;
} }


public async ValueTask ProduceAsync(string notice,string terminalNo)
public async void ProduceAsync(string notice,string terminalNo)
{ {
await JT808SessionService.WriteAsync(notice, terminalNo); await JT808SessionService.WriteAsync(notice, terminalNo);
} }


+ 5
- 5
simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj Ver fichero

@@ -5,11 +5,11 @@
<TargetFramework>net5.0</TargetFramework> <TargetFramework>net5.0</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway.Transmit" Version="1.1.0-preview1" />
<PackageReference Include="JT808.Gateway" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.1.0-preview4" />
<PackageReference Include="JT808.Gateway.Transmit" Version="1.1.0-preview4" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />


+ 1
- 1
simples/JT808.Gateway.SimpleServer/appsettings.json Ver fichero

@@ -16,7 +16,7 @@
"TcpPort": 808, "TcpPort": 808,
"UdpPort": 808, "UdpPort": 808,
"WebApiPort": 828, "WebApiPort": 828,
"WebApiToken": "smallchi518"
"WebApiToken": "123456"
}, },
"RemoteServerOptions": { "RemoteServerOptions": {
"DataTransfer": [ "DataTransfer": [


+ 1
- 1
src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs Ver fichero

@@ -12,6 +12,6 @@ namespace JT808.Gateway.Abstractions
/// </summary> /// </summary>
/// <param name="terminalNo">设备终端号</param> /// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param> /// <param name="data">808 hex data</param>
ValueTask ProduceAsync(string terminalNo, byte[] data);
void ProduceAsync(string terminalNo, byte[] data);
} }
} }

+ 1
- 1
src/JT808.Gateway.Abstractions/IJT808MsgReplyLoggingProducer.cs Ver fichero

@@ -12,6 +12,6 @@ namespace JT808.Gateway.Abstractions
/// </summary> /// </summary>
/// <param name="terminalNo">设备终端号</param> /// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param> /// <param name="data">808 hex data</param>
ValueTask ProduceAsync(string terminalNo, byte[] data);
void ProduceAsync(string terminalNo, byte[] data);
} }
} }

+ 1
- 1
src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs Ver fichero

@@ -8,6 +8,6 @@ namespace JT808.Gateway.Abstractions
/// </summary> /// </summary>
public interface IJT808SessionProducer : IJT808PubSub, IDisposable public interface IJT808SessionProducer : IJT808PubSub, IDisposable
{ {
ValueTask ProduceAsync(string notice,string terminalNo);
void ProduceAsync(string notice,string terminalNo);
} }
} }

+ 1
- 1
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj Ver fichero

@@ -28,7 +28,7 @@
<Compile Remove="JT808QueueReplyMessageHandler.cs" /> <Compile Remove="JT808QueueReplyMessageHandler.cs" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="JT808" Version="2.3.0" />
<PackageReference Include="JT808" Version="2.3.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Options" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="5.0.0" />


+ 1
- 1
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml Ver fichero

@@ -169,7 +169,7 @@
通用消息处理程序 通用消息处理程序
</summary> </summary>
</member> </member>
<member name="M:JT808.Gateway.Abstractions.JT808MessageHandler.Processor(JT808.Protocol.JT808HeaderPackage)">
<member name="M:JT808.Gateway.Abstractions.JT808MessageHandler.Processor(JT808.Protocol.JT808HeaderPackage@)">
<summary> <summary>
消息处理 消息处理
</summary> </summary>


+ 4
- 32
src/JT808.Gateway.Abstractions/JT808MessageHandler.cs Ver fichero

@@ -21,24 +21,12 @@ namespace JT808.Gateway.Abstractions
protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package); protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package);
protected JT808Serializer JT808Serializer { get; } protected JT808Serializer JT808Serializer { get; }


protected IJT808MsgProducer MsgProducer;

protected IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;

protected IOptionsMonitor<JT808Configuration> JT808ConfigurationOptionsMonitor;

protected IJT808Config JT808Config; protected IJT808Config JT808Config;


public JT808MessageHandler(
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config)
public JT808MessageHandler(IJT808Config jT808Config)
{ {
this.JT808Config = jT808Config;
this.JT808Serializer = jT808Config.GetSerializer(); this.JT808Serializer = jT808Config.GetSerializer();
this.MsgProducer = msgProducer;
this.MsgReplyLoggingProducer = msgReplyLoggingProducer;
this.JT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
HandlerDict = new Dictionary<ushort, MsgIdMethodDelegate> { HandlerDict = new Dictionary<ushort, MsgIdMethodDelegate> {
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, {JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001},
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, {JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102},
@@ -67,27 +55,11 @@ namespace JT808.Gateway.Abstractions
/// </summary> /// </summary>
/// <param name="request">请求数据</param> /// <param name="request">请求数据</param>
/// <returns>应答消息数据</returns> /// <returns>应答消息数据</returns>
public virtual byte[] Processor(JT808HeaderPackage request)
public virtual byte[] Processor(in JT808HeaderPackage request)
{ {
if (MsgProducer != null)
{
MsgProducer.ProduceAsync(request.Header.TerminalPhoneNo, request.OriginalData.ToArray());
}
if (HandlerDict.TryGetValue(request.Header.MsgId, out var func)) if (HandlerDict.TryGetValue(request.Header.MsgId, out var func))
{ {
var data = func(request);
if (MsgReplyLoggingProducer != null)
{
MsgReplyLoggingProducer.ProduceAsync(request.Header.TerminalPhoneNo, data);
}
if (JT808ConfigurationOptionsMonitor.CurrentValue.IgnoreMsgIdReply != null && JT808ConfigurationOptionsMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
{
if (JT808ConfigurationOptionsMonitor.CurrentValue.IgnoreMsgIdReply.Contains(request.Header.MsgId))
{
return default;
}
}
return data;
return func(request);
} }
else else
{ {


+ 2
- 6
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj Ver fichero

@@ -7,6 +7,7 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<Content Remove="appsettings.json" />
<Content Remove="wwwroot\axios.min.js" /> <Content Remove="wwwroot\axios.min.js" />
<Content Remove="wwwroot\day.js" /> <Content Remove="wwwroot\day.js" />
<Content Remove="wwwroot\echarts.min.js" /> <Content Remove="wwwroot\echarts.min.js" />
@@ -27,6 +28,7 @@
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" /> <None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />
<None Include="appsettings.json" />
<None Include="wwwroot\axios.min.js"> <None Include="wwwroot\axios.min.js">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
@@ -46,12 +48,6 @@
</None> </None>
</ItemGroup> </ItemGroup>


<ItemGroup>
<Content Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</Content>
</ItemGroup>

<ItemGroup> <ItemGroup>
<None Update="appsettings.json"> <None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>


+ 36
- 34
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs Ver fichero

@@ -45,45 +45,47 @@ namespace JT808.Gateway.CleintBenchmark.Services
logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}"); logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}");
logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}"); logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}");
taskFactory = new TaskFactory(cancellationToken); taskFactory = new TaskFactory(cancellationToken);
for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++)
{
taskFactory.StartNew(async (state) => {
string deviceNo = ((int)state + 1 + clientBenchmarkOptions.DeviceTemplate).ToString();
var client = await jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo,
clientBenchmarkOptions.IP,
clientBenchmarkOptions.Port,
clientBenchmarkOptions.LocalIPAddress,
clientBenchmarkOptions.LocalPort + (int)state + 1), cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{
try
Task.Run(() => {
for (int i = 0; i < clientBenchmarkOptions.DeviceCount; i++)
{
taskFactory.StartNew(async (state) => {
string deviceNo = ((int)state + 1 + clientBenchmarkOptions.DeviceTemplate).ToString();
var client = await jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo,
clientBenchmarkOptions.IP,
clientBenchmarkOptions.Port,
clientBenchmarkOptions.LocalIPAddress,
clientBenchmarkOptions.LocalPort + (int)state + 1), cancellationToken);
while (!cancellationToken.IsCancellationRequested)
{ {
int lat = new Random(1000).Next(100000, 180000);
int Lng = new Random(1000).Next(100000, 180000);
if (client != null)
try
{ {
await client.SendAsync(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200()
int lat = new Random(1000).Next(100000, 180000);
int Lng = new Random(1000).Next(100000, 180000);
if (client != null)
{ {
Lat = lat,
Lng = Lng,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
}));
await client.SendAsync(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200()
{
Lat = lat,
Lng = Lng,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
}));
}
} }
catch (Exception ex)
{
logger.LogError(ex.Message);
}
await Task.Delay(clientBenchmarkOptions.Interval);
} }
catch (Exception ex)
{
logger.LogError(ex.Message);
}
await Task.Delay(clientBenchmarkOptions.Interval);
}
}, i);
Thread.Sleep(500);
}
}, i);
Thread.Sleep(300);
}
});
return Task.CompletedTask; return Task.CompletedTask;
} }




+ 2
- 2
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/appsettings.json Ver fichero

@@ -15,9 +15,9 @@
"AllowedHosts": "*", "AllowedHosts": "*",
"urls": "http://*:5000;", "urls": "http://*:5000;",
"ClientBenchmarkOptions": { "ClientBenchmarkOptions": {
"IP": "172.18.0.7",
"IP": "127.0.0.1",
"Port": 808, "Port": 808,
"DeviceCount": 1,
"DeviceCount": 100,
"Interval": 1000, "Interval": 1000,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
} }


+ 1
- 0
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/wwwroot/signalr.min.js.map
La diferencia del archivo ha sido suprimido porque es demasiado grande
Ver fichero


+ 18
- 5
src/JT808.Gateway.Kafka/JT808MsgProducer.cs Ver fichero

@@ -6,6 +6,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging;


namespace JT808.Gateway.Kafka namespace JT808.Gateway.Kafka
{ {
@@ -15,21 +16,33 @@ namespace JT808.Gateway.Kafka
public string TopicName { get; } public string TopicName { get; }


private readonly IProducer<string, byte[]> producer; private readonly IProducer<string, byte[]> producer;

private readonly ILogger logger;

public JT808MsgProducer( public JT808MsgProducer(
ILoggerFactory loggerFactory,
IOptions<JT808MsgProducerConfig> producerConfigAccessor) IOptions<JT808MsgProducerConfig> producerConfigAccessor)
{ {
logger = loggerFactory.CreateLogger<JT808MsgProducer>();
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName; TopicName = producerConfigAccessor.Value.TopicName;
} }


public async ValueTask ProduceAsync(string terminalNo, byte[] data)
public async void ProduceAsync(string terminalNo, byte[] data)
{ {
if (disposed) return; if (disposed) return;
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
try
{ {
Key = terminalNo,
Value = data
});
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
{
Key = terminalNo,
Value = data
});
}
catch (AggregateException ex)
{
logger.LogError(ex, "kafka error");
}
} }
private void Dispose(bool disposing) private void Dispose(bool disposing)
{ {


+ 20
- 7
src/JT808.Gateway.Kafka/JT808MsgReplyLoggingProducer.cs Ver fichero

@@ -6,6 +6,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging;


namespace JT808.Gateway.Kafka namespace JT808.Gateway.Kafka
{ {
@@ -14,22 +15,34 @@ namespace JT808.Gateway.Kafka
private bool disposed = false; private bool disposed = false;
public string TopicName { get;} public string TopicName { get;}


private IProducer<string, byte[]> producer;
private readonly IProducer<string, byte[]> producer;

private readonly ILogger logger;

public JT808MsgReplyLoggingProducer( public JT808MsgReplyLoggingProducer(
IOptions<JT808MsgReplyLoggingProducerConfig> producerConfigAccessor)
ILoggerFactory loggerFactory,
IOptions<JT808MsgReplyLoggingProducerConfig> producerConfigAccessor)
{ {
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName; TopicName = producerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger<JT808MsgReplyLoggingProducer>();
} }


public async ValueTask ProduceAsync(string terminalNo, byte[] data)
public async void ProduceAsync(string terminalNo, byte[] data)
{ {
if (disposed) return; if (disposed) return;
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
try
{ {
Key = terminalNo,
Value = data
});
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
{
Key = terminalNo,
Value = data
});
}
catch (AggregateException ex)
{
logger.LogError(ex, "kafka error");
}
} }


private void Dispose(bool disposing) private void Dispose(bool disposing)


+ 18
- 5
src/JT808.Gateway.Kafka/JT808SessionProducer.cs Ver fichero

@@ -6,6 +6,7 @@ using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading.Tasks; using System.Threading.Tasks;
using Microsoft.Extensions.Logging;


namespace JT808.Gateway.Kafka namespace JT808.Gateway.Kafka
{ {
@@ -15,21 +16,33 @@ namespace JT808.Gateway.Kafka
public string TopicName { get; } public string TopicName { get; }


private readonly IProducer<string, string> producer; private readonly IProducer<string, string> producer;

private readonly ILogger logger;

public JT808SessionProducer( public JT808SessionProducer(
ILoggerFactory loggerFactory,
IOptions<JT808SessionProducerConfig> producerConfigAccessor) IOptions<JT808SessionProducerConfig> producerConfigAccessor)
{ {
producer = new ProducerBuilder<string, string>(producerConfigAccessor.Value).Build(); producer = new ProducerBuilder<string, string>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName; TopicName = producerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger<JT808SessionProducer>();
} }


public async ValueTask ProduceAsync(string notice,string terminalNo)
public async void ProduceAsync(string notice,string terminalNo)
{ {
if (disposed) return; if (disposed) return;
await producer.ProduceAsync(TopicName, new Message<string, string>
try
{ {
Key = notice,
Value = terminalNo
});
await producer.ProduceAsync(TopicName, new Message<string, string>
{
Key = notice,
Value = terminalNo
});
}
catch (AggregateException ex)
{
logger.LogError(ex, "kafka error");
}
} }


private void Dispose(bool disposing) private void Dispose(bool disposing)


+ 2
- 11
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs Ver fichero

@@ -16,22 +16,13 @@ namespace JT808.Gateway.NormalHosting.Impl
private readonly ILogger logger; private readonly ILogger logger;
private readonly JT808TransmitService jT808TransmitService; private readonly JT808TransmitService jT808TransmitService;
private readonly IJT808MsgLogging jT808MsgLogging; private readonly IJT808MsgLogging jT808MsgLogging;
private readonly IJT808MsgReplyProducer MsgReplyProducer;


public JT808CustomMessageHandlerImpl( public JT808CustomMessageHandlerImpl(
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IJT808MsgReplyProducer msgReplyProducer,
IJT808MsgLogging jT808MsgLogging, IJT808MsgLogging jT808MsgLogging,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
JT808TransmitService jT808TransmitService, JT808TransmitService jT808TransmitService,
IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor,
msgProducer,
msgReplyLoggingProducer,
jT808Config)
IJT808Config jT808Config) : base(jT808Config)
{ {
MsgReplyProducer = msgReplyProducer;
this.jT808TransmitService = jT808TransmitService; this.jT808TransmitService = jT808TransmitService;
this.jT808MsgLogging = jT808MsgLogging; this.jT808MsgLogging = jT808MsgLogging;
logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>(); logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>();
@@ -46,7 +37,7 @@ namespace JT808.Gateway.NormalHosting.Impl
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="session"></param> /// <param name="session"></param>
public override byte[] Processor(JT808HeaderPackage request)
public override byte[] Processor(in JT808HeaderPackage request)
{ {
//处理上行消息 //处理上行消息
var down = base.Processor(request); var down = base.Processor(request);


+ 1
- 1
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionProducer.cs Ver fichero

@@ -18,7 +18,7 @@ namespace JT808.Gateway.NormalHosting.Impl
JT808SessionService = jT808SessionService; JT808SessionService = jT808SessionService;
} }


public async ValueTask ProduceAsync(string notice,string terminalNo)
public async void ProduceAsync(string notice,string terminalNo)
{ {
await JT808SessionService.WriteAsync(notice, terminalNo); await JT808SessionService.WriteAsync(notice, terminalNo);
} }


+ 2
- 9
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs Ver fichero

@@ -16,17 +16,10 @@ namespace JT808.Gateway.QueueHosting.Impl
private readonly ILogger logger; private readonly ILogger logger;
public JT808CustomMessageHandlerImpl( public JT808CustomMessageHandlerImpl(
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor,
msgProducer,
msgReplyLoggingProducer,
IJT808Config jT808Config) : base(
jT808Config) jT808Config)
{ {
logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>(); logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>();
//过滤掉0x0200消息,通过服务服务进行下发应答,可以通过配置文件的方式进行增加修改(支持热更新)
//jT808ConfigurationOptionsMonitor.CurrentValue.IgnoreMsgIdReply.Add(0x0200);
//添加自定义消息 //添加自定义消息
HandlerDict.Add(0x9999, Msg0x9999); HandlerDict.Add(0x9999, Msg0x9999);
} }
@@ -37,7 +30,7 @@ namespace JT808.Gateway.QueueHosting.Impl
/// </summary> /// </summary>
/// <param name="request"></param> /// <param name="request"></param>
/// <param name="session"></param> /// <param name="session"></param>
public override byte[] Processor(JT808HeaderPackage request)
public override byte[] Processor(in JT808HeaderPackage request)
{ {
try try
{ {


+ 3
- 3
src/JT808.Gateway/Extensions/JT808HttpContextExtensions.cs Ver fichero

@@ -19,7 +19,7 @@ namespace JT808.Gateway.Extensions
context.Response.KeepAlive = false; context.Response.KeepAlive = false;
context.Response.ContentLength64 = b.Length; context.Response.ContentLength64 = b.Length;
var output = context.Response.OutputStream; var output = context.Response.OutputStream;
await output.WriteAsync(b, 0, b.Length);
await output.WriteAsync(b, CancellationToken.None);
context.Response.OutputStream.Close(); context.Response.OutputStream.Close();
context.Response.Close(); context.Response.Close();
} }
@@ -32,7 +32,7 @@ namespace JT808.Gateway.Extensions
context.Response.ContentType = jsonType; context.Response.ContentType = jsonType;
context.Response.ContentLength64 = b.Length; context.Response.ContentLength64 = b.Length;
var output = context.Response.OutputStream; var output = context.Response.OutputStream;
await output.WriteAsync(b, 0, b.Length);
await output.WriteAsync(b, CancellationToken.None);
context.Response.OutputStream.Close(); context.Response.OutputStream.Close();
context.Response.Close(); context.Response.Close();
} }
@@ -63,7 +63,7 @@ namespace JT808.Gateway.Extensions
context.Response.ContentType = jsonType; context.Response.ContentType = jsonType;
context.Response.ContentLength64 = b.Length; context.Response.ContentLength64 = b.Length;
var output = context.Response.OutputStream; var output = context.Response.OutputStream;
await output.WriteAsync(b, 0, b.Length);
await output.WriteAsync(b,CancellationToken.None);
context.Response.OutputStream.Close(); context.Response.OutputStream.Close();
context.Response.Close(); context.Response.Close();
} }


+ 2
- 2
src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs Ver fichero

@@ -15,9 +15,9 @@ namespace JT808.Gateway.Internal


} }


public ValueTask ProduceAsync(string terminalNo, byte[] data)
public void ProduceAsync(string terminalNo, byte[] data)
{ {
return default;
} }
} }
} }

+ 2
- 2
src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs Ver fichero

@@ -15,9 +15,9 @@ namespace JT808.Gateway.Internal


} }


public ValueTask ProduceAsync(string terminalNo, byte[] data)
public void ProduceAsync(string terminalNo, byte[] data)
{ {
return default;
} }
} }
} }

+ 4
- 2
src/JT808.Gateway/JT808.Gateway.xml Ver fichero

@@ -150,12 +150,14 @@
<param name="config"></param> <param name="config"></param>
<returns></returns> <returns></returns>
</member> </member>
<member name="M:JT808.Gateway.JT808TcpServer.#ctor(JT808.Gateway.Abstractions.JT808MessageHandler,Microsoft.Extensions.Options.IOptions{JT808.Gateway.Abstractions.Configurations.JT808Configuration},JT808.Protocol.IJT808Config,Microsoft.Extensions.Logging.ILoggerFactory,JT808.Gateway.Session.JT808SessionManager)">
<member name="M:JT808.Gateway.JT808TcpServer.#ctor(Microsoft.Extensions.Options.IOptionsMonitor{JT808.Gateway.Abstractions.Configurations.JT808Configuration},JT808.Gateway.Abstractions.IJT808MsgProducer,JT808.Gateway.Abstractions.IJT808MsgReplyLoggingProducer,JT808.Gateway.Abstractions.JT808MessageHandler,JT808.Protocol.IJT808Config,Microsoft.Extensions.Logging.ILoggerFactory,JT808.Gateway.Session.JT808SessionManager)">
<summary> <summary>
使用队列方式 使用队列方式
</summary> </summary>
<param name="configurationMonitor"></param>
<param name="msgProducer"></param>
<param name="msgReplyLoggingProducer"></param>
<param name="messageHandler"></param> <param name="messageHandler"></param>
<param name="jT808ConfigurationAccessor"></param>
<param name="jT808Config"></param> <param name="jT808Config"></param>
<param name="loggerFactory"></param> <param name="loggerFactory"></param>
<param name="jT808SessionManager"></param> <param name="jT808SessionManager"></param>


+ 8
- 3
src/JT808.Gateway/JT808HttpServer.cs Ver fichero

@@ -85,6 +85,11 @@ namespace JT808.Gateway
await context.Http401(); await context.Http401();
} }
} }
catch (AggregateException ex)
{
await context.Http500();
Logger.LogError(ex, ex.StackTrace);
}
catch (Exception ex) catch (Exception ex)
{ {
await context.Http500(); await context.Http500();
@@ -108,7 +113,7 @@ namespace JT808.Gateway
var index = context.Request.Url.AbsoluteUri.IndexOf('?'); var index = context.Request.Url.AbsoluteUri.IndexOf('?');
if (index > 0) if (index > 0)
{ {
var uriParamStr = context.Request.Url.AbsoluteUri.Substring(index + 1).ToString();
var uriParamStr = context.Request.Url.AbsoluteUri[(index + 1)..].ToString();
await context.HttpSend(router.Value(uriParamStr)); await context.HttpSend(router.Value(uriParamStr));
} }
else else
@@ -139,11 +144,11 @@ namespace JT808.Gateway
} }
catch (System.ObjectDisposedException ex) catch (System.ObjectDisposedException ex)
{ {
Logger.LogError(ex, "");
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, "");
} }
return Task.CompletedTask; return Task.CompletedTask;
} }


+ 57
- 20
src/JT808.Gateway/JT808TcpServer.cs Ver fichero

@@ -27,49 +27,59 @@ namespace JT808.Gateway


private readonly JT808Serializer Serializer; private readonly JT808Serializer Serializer;


private readonly JT808Configuration Configuration;

private readonly JT808MessageHandler MessageHandler; private readonly JT808MessageHandler MessageHandler;


private readonly IJT808MsgProducer MsgProducer;

private readonly IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;

private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;

/// <summary> /// <summary>
/// 使用队列方式 /// 使用队列方式
/// </summary> /// </summary>
/// <param name="configurationMonitor"></param>
/// <param name="msgProducer"></param>
/// <param name="msgReplyLoggingProducer"></param>
/// <param name="messageHandler"></param> /// <param name="messageHandler"></param>
/// <param name="jT808ConfigurationAccessor"></param>
/// <param name="jT808Config"></param> /// <param name="jT808Config"></param>
/// <param name="loggerFactory"></param> /// <param name="loggerFactory"></param>
/// <param name="jT808SessionManager"></param> /// <param name="jT808SessionManager"></param>
public JT808TcpServer( public JT808TcpServer(
IOptionsMonitor<JT808Configuration> configurationMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
JT808MessageHandler messageHandler, JT808MessageHandler messageHandler,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
IJT808Config jT808Config, IJT808Config jT808Config,
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager) JT808SessionManager jT808SessionManager)
{ {
MessageHandler = messageHandler; MessageHandler = messageHandler;
MsgProducer = msgProducer;
MsgReplyLoggingProducer = msgReplyLoggingProducer;
ConfigurationMonitor = configurationMonitor;
SessionManager = jT808SessionManager; SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger<JT808TcpServer>(); Logger = loggerFactory.CreateLogger<JT808TcpServer>();
Serializer = jT808Config.GetSerializer(); Serializer = jT808Config.GetSerializer();
Configuration = jT808ConfigurationAccessor.Value;
InitServer(); InitServer();
} }


private void InitServer() private void InitServer()
{ {
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort);
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, ConfigurationMonitor.CurrentValue.TcpPort);
server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
server.LingerState = new LingerOption(true, 0); server.LingerState = new LingerOption(true, 0);
server.Bind(IPEndPoint); server.Bind(IPEndPoint);
server.Listen(Configuration.SoBacklog);
server.Listen(ConfigurationMonitor.CurrentValue.SoBacklog);
} }


public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{Configuration.TcpPort}.");
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
Task.Factory.StartNew(async () => Task.Factory.StartNew(async () =>
{ {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
@@ -100,7 +110,7 @@ namespace JT808.Gateway
{ {
try try
{ {
Memory<byte> memory = writer.GetMemory(Configuration.MiniNumBufferSize);
Memory<byte> memory = writer.GetMemory(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
//设备多久没发数据就断开连接 Receive Timeout. //设备多久没发数据就断开连接 Receive Timeout.
int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token); int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token);
if (bytesRead == 0) if (bytesRead == 0)
@@ -111,18 +121,18 @@ namespace JT808.Gateway
} }
catch (OperationCanceledException ex) catch (OperationCanceledException ex)
{ {
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}");
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break; break;
} }
catch (System.Net.Sockets.SocketException ex) catch (System.Net.Sockets.SocketException ex)
{ {
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}");
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break; break;
} }
#pragma warning disable CA1031 // Do not catch general exception types #pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}");
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break; break;
} }
#pragma warning restore CA1031 // Do not catch general exception types #pragma warning restore CA1031 // Do not catch general exception types
@@ -157,7 +167,7 @@ namespace JT808.Gateway
#pragma warning disable CA1031 // Do not catch general exception types #pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}");
Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break; break;
} }
#pragma warning restore CA1031 // Do not catch general exception types #pragma warning restore CA1031 // Do not catch general exception types
@@ -194,19 +204,18 @@ namespace JT808.Gateway
if (contentSpan.Length > 14) if (contentSpan.Length > 14)
{ {
var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240); var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240);
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToArray().ToHexString()}");
SessionManager.TryLink(package.Header.TerminalPhoneNo, session); SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
var downData = MessageHandler.Processor(package);
session.SendAsync(downData);
Processor(session, package);
} }
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint}");
Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
} }
catch (JT808Exception ex) catch (JT808Exception ex)
{ {
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
} }
totalConsumed += (seqReader.Consumed - totalConsumed); totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break; if (seqReader.End) break;
@@ -229,6 +238,34 @@ namespace JT808.Gateway
consumed = buffer.GetPosition(totalConsumed); consumed = buffer.GetPosition(totalConsumed);
} }
} }
private void Processor(in IJT808Session session,in JT808HeaderPackage package)
{
try
{
MsgProducer?.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
var downData = MessageHandler.Processor(in package);
if (ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply != null && ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
{
if (!ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Contains(package.Header.MsgId))
{
session.SendAsync(downData);
}
}
else
{
session.SendAsync(downData);
}
if (MsgReplyLoggingProducer != null)
{
if (downData != null)
MsgReplyLoggingProducer.ProduceAsync(package.Header.TerminalPhoneNo, downData);
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToArray().ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
}
}
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
Logger.LogInformation("JT808 Tcp Server Stop"); Logger.LogInformation("JT808 Tcp Server Stop");


+ 44
- 9
src/JT808.Gateway/JT808UdpServer.cs Ver fichero

@@ -29,36 +29,43 @@ namespace JT808.Gateway


private readonly JT808Serializer Serializer; private readonly JT808Serializer Serializer;


private readonly JT808Configuration Configuration;

private readonly IPEndPoint LocalIPEndPoint; private readonly IPEndPoint LocalIPEndPoint;


private readonly JT808MessageHandler MessageHandler; private readonly JT808MessageHandler MessageHandler;


private readonly IJT808MsgProducer MsgProducer;

private readonly IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;

private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;
public JT808UdpServer( public JT808UdpServer(
IOptions<JT808Configuration> jT808ConfigurationAccessor,
IOptionsMonitor<JT808Configuration> configurationMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config, IJT808Config jT808Config,
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager, JT808SessionManager jT808SessionManager,
JT808MessageHandler messageHandler) JT808MessageHandler messageHandler)
{ {
SessionManager = jT808SessionManager; SessionManager = jT808SessionManager;
MsgProducer = msgProducer;
ConfigurationMonitor = configurationMonitor;
MsgReplyLoggingProducer = msgReplyLoggingProducer;
Logger = loggerFactory.CreateLogger<JT808UdpServer>(); Logger = loggerFactory.CreateLogger<JT808UdpServer>();
Serializer = jT808Config.GetSerializer(); Serializer = jT808Config.GetSerializer();
Configuration = jT808ConfigurationAccessor.Value;
MessageHandler = messageHandler; MessageHandler = messageHandler;
LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.UdpPort);
LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, ConfigurationMonitor.CurrentValue.UdpPort);
server = new Socket(LocalIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); server = new Socket(LocalIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
server.Bind(LocalIPEndPoint); server.Bind(LocalIPEndPoint);
} }


public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{Configuration.UdpPort}.");
Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.UdpPort}.");
Task.Run(async() => { Task.Run(async() => {
while (!cancellationToken.IsCancellationRequested) while (!cancellationToken.IsCancellationRequested)
{ {
var buffer = ArrayPool<byte>.Shared.Rent(Configuration.MiniNumBufferSize);
var buffer = ArrayPool<byte>.Shared.Rent(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
try try
{ {
var segment = new ArraySegment<byte>(buffer); var segment = new ArraySegment<byte>(buffer);
@@ -98,8 +105,7 @@ namespace JT808.Gateway
{ {
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}"); Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
} }
var downData = MessageHandler.Processor(package);
session.SendAsync(downData);
Processor(session, package);
} }
catch (NotImplementedException ex) catch (NotImplementedException ex)
{ {
@@ -116,6 +122,35 @@ namespace JT808.Gateway
} }
#pragma warning restore CA1031 // Do not catch general exception types #pragma warning restore CA1031 // Do not catch general exception types
} }

private void Processor(in IJT808Session session, in JT808HeaderPackage package)
{
try
{
MsgProducer?.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
var downData = MessageHandler.Processor(package);
if (ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply != null && ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Count > 0)
{
if (!ConfigurationMonitor.CurrentValue.IgnoreMsgIdReply.Contains(package.Header.MsgId))
{
session.SendAsync(downData);
}
}
else
{
session.SendAsync(downData);
}
if (MsgReplyLoggingProducer != null)
{
if (downData != null)
MsgReplyLoggingProducer.ProduceAsync(package.Header.TerminalPhoneNo, downData);
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToArray().ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
}
}
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
Logger.LogInformation("JT808 Udp Server Stop"); Logger.LogInformation("JT808 Udp Server Stop");


+ 20
- 5
src/JT808.Gateway/Session/JT808SessionManager.cs Ver fichero

@@ -74,7 +74,10 @@ namespace JT808.Gateway.Session
session.ActiveTime = curretDatetime; session.ActiveTime = curretDatetime;
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession); TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession);
//会话通知 //会话通知
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
if(JT808SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
} }
else else
{ {
@@ -88,7 +91,10 @@ namespace JT808.Gateway.Session
if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
{ {
//会话通知 //会话通知
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
if (JT808SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
} }
} }
} }
@@ -115,7 +121,10 @@ namespace JT808.Gateway.Session
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅 //有设备关联上来可以进行通知 例如:使用Redis发布订阅
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
if (JT808SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
return currentSession; return currentSession;
} }


@@ -187,7 +196,10 @@ namespace JT808.Gateway.Session
removeSession.Close(); removeSession.Close();
if (logger.IsEnabled(LogLevel.Information)) if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
if (JT808SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
}
} }
} }
} }
@@ -204,7 +216,10 @@ namespace JT808.Gateway.Session
TerminalPhoneNoSessions.TryRemove(item, out _); TerminalPhoneNoSessions.TryRemove(item, out _);
} }
var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
if (JT808SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
}
if (logger.IsEnabled(LogLevel.Information)) if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}"); logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}");
} }


+ 1
- 1
src/Version.props Ver fichero

@@ -1,6 +1,6 @@
<Project> <Project>
<PropertyGroup> <PropertyGroup>
<JT808DotNettyPackageVersion>2.3.2</JT808DotNettyPackageVersion> <JT808DotNettyPackageVersion>2.3.2</JT808DotNettyPackageVersion>
<JT808GatewayPackageVersion>1.1.0-preview2</JT808GatewayPackageVersion>
<JT808GatewayPackageVersion>1.1.0-preview4</JT808GatewayPackageVersion>
</PropertyGroup> </PropertyGroup>
</Project> </Project>

Cargando…
Cancelar
Guardar