Browse Source

将现有的项目整合到一起

tags/v2.2.1
smallchi 5 years ago
parent
commit
0bc93cafde
91 changed files with 7054 additions and 0 deletions
  1. +18
    -0
      src/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
  2. +46
    -0
      src/JT808.Gateway.SimpleClient/Program.cs
  3. +68
    -0
      src/JT808.Gateway.SimpleClient/Services/GrpcClientService.cs
  4. +68
    -0
      src/JT808.Gateway.SimpleClient/Services/UpService.cs
  5. +3106
    -0
      src/JT808.Gateway.SimpleServer/Configs/NLog.xsd
  6. +36
    -0
      src/JT808.Gateway.SimpleServer/Configs/nlog.Unix.config
  7. +36
    -0
      src/JT808.Gateway.SimpleServer/Configs/nlog.Win32NT.config
  8. +36
    -0
      src/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
  9. +70
    -0
      src/JT808.Gateway.SimpleServer/Program.cs
  10. +37
    -0
      src/JT808.Gateway.sln
  11. +28
    -0
      src/JT808.Gateway/Client/DeviceConfig.cs
  12. +23
    -0
      src/JT808.Gateway/Client/JT808ClientDotnettyExtensions.cs
  13. +16
    -0
      src/JT808.Gateway/Client/JT808ClientMsgSNDistributedImpl.cs
  14. +112
    -0
      src/JT808.Gateway/Client/JT808TcpClient.cs
  15. +103
    -0
      src/JT808.Gateway/Client/JT808TcpClientExtensions.cs
  16. +62
    -0
      src/JT808.Gateway/Client/JT808TcpClientFactory.cs
  17. +20
    -0
      src/JT808.Gateway/Codecs/JT808ClientTcpDecoder.cs
  18. +52
    -0
      src/JT808.Gateway/Codecs/JT808ClientTcpEncoder.cs
  19. +32
    -0
      src/JT808.Gateway/Codecs/JT808TcpDecoder.cs
  20. +52
    -0
      src/JT808.Gateway/Codecs/JT808TcpEncoder.cs
  21. +33
    -0
      src/JT808.Gateway/Codecs/JT808UdpDecoder.cs
  22. +31
    -0
      src/JT808.Gateway/Configurations/JT808ClientConfiguration.cs
  23. +41
    -0
      src/JT808.Gateway/Configurations/JT808Configuration.cs
  24. +29
    -0
      src/JT808.Gateway/Converters/JsonByteArrayHexConverter.cs
  25. +26
    -0
      src/JT808.Gateway/Converters/JsonIPAddressConverter.cs
  26. +32
    -0
      src/JT808.Gateway/Converters/JsonIPEndPointConverter.cs
  27. +12
    -0
      src/JT808.Gateway/Dtos/JT808AtomicCounterDto.cs
  28. +15
    -0
      src/JT808.Gateway/Dtos/JT808DefaultResultDto.cs
  29. +37
    -0
      src/JT808.Gateway/Dtos/JT808IPAddressDto.cs
  30. +29
    -0
      src/JT808.Gateway/Dtos/JT808ResultDto.cs
  31. +24
    -0
      src/JT808.Gateway/Dtos/JT808TcpSessionInfoDto.cs
  32. +24
    -0
      src/JT808.Gateway/Dtos/JT808UdpSessionInfoDto.cs
  33. +11
    -0
      src/JT808.Gateway/Dtos/JT808UnificationSendRequestDto.cs
  34. +15
    -0
      src/JT808.Gateway/Enums/JT808TransportProtocolType.cs
  35. +91
    -0
      src/JT808.Gateway/Handlers/JT808MsgIdHttpHandlerBase.cs
  36. +96
    -0
      src/JT808.Gateway/Handlers/JT808TcpClientConnectionHandler.cs
  37. +31
    -0
      src/JT808.Gateway/Handlers/JT808TcpClientHandler.cs
  38. +104
    -0
      src/JT808.Gateway/Handlers/JT808TcpConnectionHandler.cs
  39. +77
    -0
      src/JT808.Gateway/Handlers/JT808TcpServerHandler.cs
  40. +79
    -0
      src/JT808.Gateway/Handlers/JT808UdpServerHandler.cs
  41. +14
    -0
      src/JT808.Gateway/IJT808ClientBuilder.cs
  42. +14
    -0
      src/JT808.Gateway/IJT808GatewayBuilder.cs
  43. +18
    -0
      src/JT808.Gateway/Impls/JT808DatagramPacketImpl.cs
  44. +25
    -0
      src/JT808.Gateway/Impls/JT808GatewayBuilderDefault.cs
  45. +30
    -0
      src/JT808.Gateway/Impls/JT808MsgProducerDefaultImpl.cs
  46. +193
    -0
      src/JT808.Gateway/Impls/JT808MsgReplyConsumerDefaultImpl.cs
  47. +32
    -0
      src/JT808.Gateway/Impls/JT808SessionProducerDefaultImpl.cs
  48. +13
    -0
      src/JT808.Gateway/Interfaces/IJT808DatagramPacket.cs
  49. +17
    -0
      src/JT808.Gateway/Interfaces/IJT808Reply.cs
  50. +20
    -0
      src/JT808.Gateway/Interfaces/IJT808Session.cs
  51. +30
    -0
      src/JT808.Gateway/Interfaces/IJT808SessionService.cs
  52. +12
    -0
      src/JT808.Gateway/Interfaces/IJT808UnificationSendService.cs
  53. +42
    -0
      src/JT808.Gateway/JT808.Gateway.csproj
  54. +48
    -0
      src/JT808.Gateway/JT808GatewayConstants.cs
  55. +96
    -0
      src/JT808.Gateway/JT808GatewayExtensions.cs
  56. +49
    -0
      src/JT808.Gateway/Metadata/JT808AtomicCounter.cs
  57. +16
    -0
      src/JT808.Gateway/Metadata/JT808ClientReport.cs
  58. +30
    -0
      src/JT808.Gateway/Metadata/JT808ClientRequest.cs
  59. +22
    -0
      src/JT808.Gateway/Metadata/JT808HttpRequest.cs
  60. +22
    -0
      src/JT808.Gateway/Metadata/JT808HttpResponse.cs
  61. +23
    -0
      src/JT808.Gateway/Metadata/JT808Request.cs
  62. +31
    -0
      src/JT808.Gateway/Metadata/JT808Response.cs
  63. +32
    -0
      src/JT808.Gateway/Metadata/JT808TcpSession.cs
  64. +20
    -0
      src/JT808.Gateway/Metadata/JT808UdpPackage.cs
  65. +38
    -0
      src/JT808.Gateway/Metadata/JT808UdpSession.cs
  66. +63
    -0
      src/JT808.Gateway/Protos/JT808Gateway.proto
  67. +15
    -0
      src/JT808.Gateway/PubSub/IJT808MsgConsumer.cs
  68. +17
    -0
      src/JT808.Gateway/PubSub/IJT808MsgProducer.cs
  69. +15
    -0
      src/JT808.Gateway/PubSub/IJT808MsgReplyConsumer.cs
  70. +17
    -0
      src/JT808.Gateway/PubSub/IJT808MsgReplyProducer.cs
  71. +11
    -0
      src/JT808.Gateway/PubSub/IJT808PubSub.cs
  72. +18
    -0
      src/JT808.Gateway/PubSub/IJT808SessionConsumer.cs
  73. +13
    -0
      src/JT808.Gateway/PubSub/IJT808SessionProducer.cs
  74. +52
    -0
      src/JT808.Gateway/Services/JT808AtomicCounterService.cs
  75. +30
    -0
      src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs
  76. +35
    -0
      src/JT808.Gateway/Services/JT808ClientReceiveAtomicCounterService.cs
  77. +37
    -0
      src/JT808.Gateway/Services/JT808ClientReportHostedService.cs
  78. +43
    -0
      src/JT808.Gateway/Services/JT808ClientReportService.cs
  79. +35
    -0
      src/JT808.Gateway/Services/JT808ClientSendAtomicCounterService.cs
  80. +96
    -0
      src/JT808.Gateway/Services/JT808GatewayService.cs
  81. +42
    -0
      src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs
  82. +11
    -0
      src/JT808.Gateway/Services/JT808MsgService.cs
  83. +98
    -0
      src/JT808.Gateway/Services/JT808SessionService.cs
  84. +45
    -0
      src/JT808.Gateway/Services/JT808UnificationSendService.cs
  85. +304
    -0
      src/JT808.Gateway/Session/JT808SessionManager.cs
  86. +50
    -0
      src/JT808.Gateway/Simples/JT808SimpleTcpClient.cs
  87. +48
    -0
      src/JT808.Gateway/Simples/JT808SimpleUdpClient.cs
  88. +21
    -0
      src/JT808.Gateway/Tcp/JT808TcpDotnettyExtensions.cs
  89. +95
    -0
      src/JT808.Gateway/Tcp/JT808TcpServerHost.cs
  90. +22
    -0
      src/JT808.Gateway/Udp/JT808UdpDotnettyExtensions.cs
  91. +76
    -0
      src/JT808.Gateway/Udp/JT808UdpServerHost.cs

+ 18
- 0
src/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj View File

@@ -0,0 +1,18 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.Net.ClientFactory" Version="2.24.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>

</Project>

+ 46
- 0
src/JT808.Gateway.SimpleClient/Program.cs View File

@@ -0,0 +1,46 @@
using JT808.Gateway.Client;
using JT808.Gateway.SimpleClient.Services;
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
using Grpc.Net.Client;
using JT808.Gateway.GrpcService;

namespace JT808.Gateway.SimpleClient
{
class Program
{
static async Task Main(string[] args)
{
//ref https://docs.microsoft.com/zh-cn/aspnet/core/grpc/troubleshoot?view=aspnetcore-3.0#call-insecure-grpc-services-with-net-core-client
AppContext.SetSwitch("System.Net.Http.SocketsHttpHandler.Http2UnencryptedSupport", true);
var serverHostBuilder = new HostBuilder()
.ConfigureLogging((context, logging) =>
{
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Trace);
})
.ConfigureServices((hostContext, services) =>
{
services.AddGrpcClient<JT808Gateway.JT808GatewayClient>(o =>
{
o.Address = new Uri("https://127.0.0.1:5001");
});
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddLogging(options => {
options.AddConsole();
options.SetMinimumLevel(LogLevel.Trace);
});
services.AddJT808Configure()
.AddJT808Client();
services.AddHostedService<UpService>();
services.AddHostedService<GrpcClientService>();
});
await serverHostBuilder.RunConsoleAsync();
}
}
}

+ 68
- 0
src/JT808.Gateway.SimpleClient/Services/GrpcClientService.cs View File

@@ -0,0 +1,68 @@
using JT808.Gateway.Client;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.GrpcService;
using static JT808.Gateway.GrpcService.JT808Gateway;
using Google.Protobuf;
using System.Text.Json;
using JT808.Protocol.Extensions;

namespace JT808.Gateway.SimpleClient.Services
{
public class GrpcClientService : IHostedService
{
private readonly ILogger logger;
private readonly JT808GatewayClient client;

public GrpcClientService(
ILoggerFactory loggerFactory,
JT808GatewayClient jT808GatewayClient)
{
this.client = jT808GatewayClient;
logger = loggerFactory.CreateLogger("GrpcClientService");
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(() => {
while (!cancellationToken.IsCancellationRequested)
{
Thread.Sleep(1000 * 10);
var result1 = client.GetTcpAtomicCounter(new Empty());
var result2 = client.GetUdpAtomicCounter(new Empty());
var result3 = client.GetTcpSessionAll(new Empty());
var result4 = client.GetUdpSessionAll(new Empty());
var result5 = client.UnificationSend(new UnificationSendRequest()
{
TerminalPhoneNo= "12345678910",
Data= ByteString.CopyFrom("7E 02 00 00 26 12 34 56 78 90 12 00 7D 02 00 00 00 01 00 00 00 02 00 BA 7F 0E 07 E4 F1 1C 00 28 00 3C 00 00 18 10 15 10 10 10 01 04 00 00 00 64 02 02 00 7D 01 13 7E".ToHexBytes())
});
var result6 = client.RemoveSessionByTerminalPhoneNo(new SessionRemoveRequest()
{
TerminalPhoneNo= "12345678910"
});

logger.LogDebug(JsonSerializer.Serialize(result1));
logger.LogDebug(JsonSerializer.Serialize(result2));
logger.LogDebug(JsonSerializer.Serialize(result3));
logger.LogDebug(JsonSerializer.Serialize(result4));
logger.LogDebug(JsonSerializer.Serialize(result5));
logger.LogDebug(JsonSerializer.Serialize(result6));
}
}, cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 68
- 0
src/JT808.Gateway.SimpleClient/Services/UpService.cs View File

@@ -0,0 +1,68 @@
using JT808.Gateway.Client;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.SimpleClient.Services
{
public class UpService : IHostedService
{
private readonly IJT808TcpClientFactory jT808TcpClientFactory;

public UpService(IJT808TcpClientFactory jT808TcpClientFactory)
{
this.jT808TcpClientFactory = jT808TcpClientFactory;
}

public Task StartAsync(CancellationToken cancellationToken)
{
JT808TcpClient client1 = jT808TcpClientFactory.Create(new DeviceConfig("12345678910", "127.0.0.1", 808));
//1.终端注册
client1.Send(new JT808_0x0100()
{
PlateNo = "粤A12345",
PlateColor = 2,
AreaID = 0,
CityOrCountyId = 0,
MakerId = "Koike001",
TerminalId = "Koike001",
TerminalModel = "Koike001"
});
//2.终端鉴权
client1.Send(new JT808_0x0102()
{
Code = "1234"
});
Task.Run(() => {
while (true)
{
var i = 0;
//3.每5000秒发一次
client1.Send(new JT808_0x0200()
{
Lat = 110000 + i,
Lng = 100000 + i,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
});
i++;
Thread.Sleep(5000);
}
});
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 3106
- 0
src/JT808.Gateway.SimpleServer/Configs/NLog.xsd
File diff suppressed because it is too large
View File


+ 36
- 0
src/JT808.Gateway.SimpleServer/Configs/nlog.Unix.config View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html
autoReload:自动再配置
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。
<nlog throwExceptions="true" />
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。
-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="/data/gps_services/serviceslogs/JT808.Gateway.SimpleServer/internalLog.txt"
internalLogLevel="Debug" >
<variable name="Directory" value="/data/gps_services/serviceslogs/JT808.Gateway.SimpleServer"/>
<targets>
<target name="SimpleServer" xsi:type="File"
fileName="${Directory}/SimpleServer.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="SimpleServer"/>
</rules>
</nlog>

+ 36
- 0
src/JT808.Gateway.SimpleServer/Configs/nlog.Win32NT.config View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html
autoReload:自动再配置
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。
<nlog throwExceptions="true" />
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。
-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="wwwroot/logs/JT808.Gateway.SimpleServer/internalLog.txt"
internalLogLevel="Debug" >
<variable name="Directory" value="/wwwroot/logs/JT808.Gateway.SimpleServer"/>
<targets>
<target name="SimpleServer" xsi:type="File"
fileName="${Directory}/SimpleServer.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" maxlevel="Fatal" writeTo="SimpleServer,console"/>
</rules>
</nlog>

+ 36
- 0
src/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj View File

@@ -0,0 +1,36 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Grpc.AspNetCore" Version="2.24.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.Development.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\nlog.Unix.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\nlog.Win32NT.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\NLog.xsd">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>


+ 70
- 0
src/JT808.Gateway.SimpleServer/Program.cs View File

@@ -0,0 +1,70 @@
using JT808.Gateway.Services;
using JT808.Gateway.Tcp;
using JT808.Gateway.Udp;
using JT808.Protocol;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Server.Kestrel.Core;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using NLog.Extensions.Logging;
using System;
using System.Net;

namespace JT808.Gateway.SimpleServer
{
class Program
{
static void Main(string[] args)
{
Host.CreateDefaultBuilder(args)
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
})
.ConfigureLogging((hostContext, configLogging) => {
Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}");
NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config");
configLogging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true });
configLogging.SetMinimumLevel(LogLevel.Trace);
})

.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder
//.ConfigureKestrel(options =>
//{
// options.Listen(IPAddress.Any, 5001, listenOptions =>
// {
// listenOptions.Protocols = HttpProtocols.Http2;
// });
//})
.Configure(app =>
{
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<JT808GatewayService>();
});
});
})
.ConfigureServices((hostContext,services) =>
{
services.Configure<KestrelServerOptions>(hostContext.Configuration.GetSection("Kestrel"));
services.AddGrpc();
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808Gateway(hostContext.Configuration)
.AddJT808GatewayTcpHost()
.AddJT808GatewayUdpHost()
.Builder();
})
.Build()
.Run();
}
}
}

+ 37
- 0
src/JT808.Gateway.sln View File

@@ -0,0 +1,37 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29411.108
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway", "JT808.Gateway\JT808.Gateway.csproj", "{A42A396F-D32B-4AC2-B554-735AA7E2DBA8}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.SimpleServer", "JT808.Gateway.SimpleServer\JT808.Gateway.SimpleServer.csproj", "{F9ABFDDB-84A2-44C8-A162-A1FE4EA4D332}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.SimpleClient", "JT808.Gateway.SimpleClient\JT808.Gateway.SimpleClient.csproj", "{886D4937-7265-40DC-87CC-85CE35553214}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{A42A396F-D32B-4AC2-B554-735AA7E2DBA8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A42A396F-D32B-4AC2-B554-735AA7E2DBA8}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A42A396F-D32B-4AC2-B554-735AA7E2DBA8}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A42A396F-D32B-4AC2-B554-735AA7E2DBA8}.Release|Any CPU.Build.0 = Release|Any CPU
{F9ABFDDB-84A2-44C8-A162-A1FE4EA4D332}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F9ABFDDB-84A2-44C8-A162-A1FE4EA4D332}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F9ABFDDB-84A2-44C8-A162-A1FE4EA4D332}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F9ABFDDB-84A2-44C8-A162-A1FE4EA4D332}.Release|Any CPU.Build.0 = Release|Any CPU
{886D4937-7265-40DC-87CC-85CE35553214}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{886D4937-7265-40DC-87CC-85CE35553214}.Debug|Any CPU.Build.0 = Debug|Any CPU
{886D4937-7265-40DC-87CC-85CE35553214}.Release|Any CPU.ActiveCfg = Release|Any CPU
{886D4937-7265-40DC-87CC-85CE35553214}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {F5671BD2-B44A-4A7C-80EA-E060A512992D}
EndGlobalSection
EndGlobal

+ 28
- 0
src/JT808.Gateway/Client/DeviceConfig.cs View File

@@ -0,0 +1,28 @@
using JT808.Protocol;
using JT808.Protocol.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Client
{
public class DeviceConfig
{
public DeviceConfig(string terminalPhoneNo, string tcpHost,int tcpPort)
{
TerminalPhoneNo = terminalPhoneNo;
TcpHost = tcpHost;
TcpPort = tcpPort;
MsgSNDistributed = new JT808ClientMsgSNDistributedImpl();
}
public string TerminalPhoneNo { get; private set; }
public string TcpHost { get; private set; }
public int TcpPort { get; private set; }
/// <summary>
/// 心跳时间(秒)
/// </summary>
public int Heartbeat { get; set; } = 30;

public IJT808MsgSNDistributed MsgSNDistributed { get; }
}
}

+ 23
- 0
src/JT808.Gateway/Client/JT808ClientDotnettyExtensions.cs View File

@@ -0,0 +1,23 @@

using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Protocol;
using JT808.Gateway.Services;

namespace JT808.Gateway.Client
{
public static class JT808ClientDotnettyExtensions
{
public static IJT808Builder AddJT808Client(this IJT808Builder jT808Builder)
{
jT808Builder.Services.AddSingleton<JT808ClientSendAtomicCounterService>();
jT808Builder.Services.AddSingleton<JT808ClientReceiveAtomicCounterService>();
jT808Builder.Services.AddSingleton<IJT808TcpClientFactory, JT808TcpClientFactory>();
jT808Builder.Services.AddSingleton<JT808ClientReportService>();
jT808Builder.Services.AddHostedService<JT808ClientReportHostedService>();
return jT808Builder;
}
}
}

+ 16
- 0
src/JT808.Gateway/Client/JT808ClientMsgSNDistributedImpl.cs View File

@@ -0,0 +1,16 @@
using JT808.Protocol;
using JT808.Protocol.Interfaces;
using System.Threading;

namespace JT808.Gateway.Client
{
internal class JT808ClientMsgSNDistributedImpl : IJT808MsgSNDistributed
{
int _counter = 0;

public ushort Increment()
{
return (ushort)Interlocked.Increment(ref _counter);
}
}
}

+ 112
- 0
src/JT808.Gateway/Client/JT808TcpClient.cs View File

@@ -0,0 +1,112 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Microsoft.Extensions.Logging;
using System;
using System.Runtime.InteropServices;
using Microsoft.Extensions.DependencyInjection;
using System.Net;
using JT808.Protocol;
using JT808.Gateway.Services;
using JT808.Gateway.Codecs;
using JT808.Gateway.Handlers;
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Client
{
public sealed class JT808TcpClient : IDisposable
{
private MultithreadEventLoopGroup group;

private IChannel clientChannel;

private bool disposed = false;

public DeviceConfig DeviceConfig { get; private set; }

public ILoggerFactory LoggerFactory { get; private set; }

public JT808TcpClient(DeviceConfig deviceConfig, IServiceProvider serviceProvider)
{
DeviceConfig = deviceConfig;
LoggerFactory = serviceProvider.GetRequiredService<ILoggerFactory>();
JT808ClientSendAtomicCounterService jT808SendAtomicCounterService = serviceProvider.GetRequiredService<JT808ClientSendAtomicCounterService>();
JT808ClientReceiveAtomicCounterService jT808ReceiveAtomicCounterService = serviceProvider.GetRequiredService<JT808ClientReceiveAtomicCounterService>();
IJT808Config jT808Config = serviceProvider.GetRequiredService<IJT808Config>();
group = new MultithreadEventLoopGroup(1);
Bootstrap bootstrap = new Bootstrap();
bootstrap.Group(group);
bootstrap.Channel<TcpSocketChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) || RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap.Option(ChannelOption.SoReuseport, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, 8192)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast("jt808TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }),
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag })));
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler(60, deviceConfig.Heartbeat, 3600));
channel.Pipeline.AddLast("jt808TcpDecode", new JT808ClientTcpDecoder());
channel.Pipeline.AddLast("jt808TcpEncode", new JT808ClientTcpEncoder(jT808Config,jT808SendAtomicCounterService, LoggerFactory));
channel.Pipeline.AddLast("jt808TcpClientConnection", new JT808TcpClientConnectionHandler(this));
channel.Pipeline.AddLast("jt808TcpService", new JT808TcpClientHandler(jT808ReceiveAtomicCounterService,this));
}));
clientChannel = bootstrap.ConnectAsync(IPAddress.Parse(DeviceConfig.TcpHost), DeviceConfig.TcpPort).Result;
}

public async void Send(JT808ClientRequest request)
{
if (disposed) return;
if (clientChannel == null) throw new NullReferenceException("Channel is empty.");
if (request == null) throw new ArgumentNullException("JT808ClientRequest Parameter is empty.");
if (clientChannel.Active && clientChannel.Open)
{
await clientChannel.WriteAndFlushAsync(request);
}
}

public bool IsOpen
{
get
{
if (clientChannel == null) return false;
return clientChannel.Active && clientChannel.Open;
}
}

private void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
// 清理托管资源
group.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1));
}
disposed = true;
}

~JT808TcpClient()
{
//必须为false
//这表明,隐式清理时,只要处理非托管资源就可以了。
Dispose(false);
}

public void Dispose()
{
//必须为true
Dispose(true);
//通知垃圾回收机制不再调用终结器(析构器)
GC.SuppressFinalize(this);
}
}
}

+ 103
- 0
src/JT808.Gateway/Client/JT808TcpClientExtensions.cs View File

@@ -0,0 +1,103 @@
using JT808.Protocol;
using JT808.Protocol.MessageBody;
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Client
{
public static class JT808TcpClientExtensions
{
public static void Send(this JT808TcpClient client, JT808Header header, JT808Bodies bodies, int minBufferSize = 1024)
{
JT808Package package = new JT808Package();
package.Header = header;
package.Bodies = bodies;
package.Header.TerminalPhoneNo = client.DeviceConfig.TerminalPhoneNo;
package.Header.MsgNum = client.DeviceConfig.MsgSNDistributed.Increment();
JT808ClientRequest request = new JT808ClientRequest(package, minBufferSize);
client.Send(request);
}

/// <summary>
/// 终端通用应答
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0001 bodies, int minBufferSize = 100)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.终端通用应答.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}

/// <summary>
/// 终端心跳
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0002 bodies, int minBufferSize = 100)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.终端心跳.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}

/// <summary>
/// 终端注销
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0003 bodies, int minBufferSize = 100)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.终端注销.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}

/// <summary>
/// 终端鉴权
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0102 bodies, int minBufferSize = 100)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.终端鉴权.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}

/// <summary>
/// 终端注册
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0100 bodies, int minBufferSize = 100)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.终端注册.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}

/// <summary>
/// 位置信息汇报
/// </summary>
/// <param name="client"></param>
/// <param name="bodies"></param>
/// <param name="minBufferSize"></param>
public static void Send(this JT808TcpClient client, JT808_0x0200 bodies, int minBufferSize = 200)
{
JT808Header header = new JT808Header();
header.MsgId = JT808MsgId.位置信息汇报.ToUInt16Value();
client.Send(header, bodies, minBufferSize);
}
}
}

+ 62
- 0
src/JT808.Gateway/Client/JT808TcpClientFactory.cs View File

@@ -0,0 +1,62 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Client
{
public interface IJT808TcpClientFactory : IDisposable
{
JT808TcpClient Create(DeviceConfig deviceConfig);

List<JT808TcpClient> GetAll();
}

public class JT808TcpClientFactory: IJT808TcpClientFactory
{
private readonly ConcurrentDictionary<string, JT808TcpClient> dict;

private readonly IServiceProvider serviceProvider;

public JT808TcpClientFactory(IServiceProvider serviceProvider)
{
dict = new ConcurrentDictionary<string, JT808TcpClient>(StringComparer.OrdinalIgnoreCase);
this.serviceProvider = serviceProvider;
}

public JT808TcpClient Create(DeviceConfig deviceConfig)
{
if(dict.TryGetValue(deviceConfig.TerminalPhoneNo,out var client))
{
return client;
}
else
{
JT808TcpClient jT808TcpClient = new JT808TcpClient(deviceConfig, serviceProvider);
dict.TryAdd(deviceConfig.TerminalPhoneNo, jT808TcpClient);
return jT808TcpClient;
}
}

public void Dispose()
{
foreach(var client in dict)
{
try
{
client.Value.Dispose();
}
catch
{
}
}
}

public List<JT808TcpClient> GetAll()
{
return dict.Values.ToList();
}
}
}

+ 20
- 0
src/JT808.Gateway/Codecs/JT808ClientTcpDecoder.cs View File

@@ -0,0 +1,20 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using System.Collections.Generic;
using JT808.Protocol;
using DotNetty.Transport.Channels;

namespace JT808.Gateway.Codecs
{
public class JT808ClientTcpDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] buffer = new byte[input.Capacity + 2];
input.ReadBytes(buffer, 1, input.Capacity);
buffer[0] = JT808Package.BeginFlag;
buffer[input.Capacity + 1] = JT808Package.EndFlag;
output.Add(buffer);
}
}
}

+ 52
- 0
src/JT808.Gateway/Codecs/JT808ClientTcpEncoder.cs View File

@@ -0,0 +1,52 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using JT808.Protocol;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Logging;
using JT808.Gateway.Metadata;
using JT808.Gateway.Services;

namespace JT808.Gateway.Codecs
{
public class JT808ClientTcpEncoder : MessageToByteEncoder<JT808ClientRequest>
{
private readonly ILogger<JT808ClientTcpEncoder> logger;
private readonly JT808ClientSendAtomicCounterService jT808SendAtomicCounterService;
private readonly JT808Serializer JT808Serializer;

public JT808ClientTcpEncoder(
IJT808Config jT808Config,
JT808ClientSendAtomicCounterService jT808SendAtomicCounterService,ILoggerFactory loggerFactory)
{
logger=loggerFactory.CreateLogger<JT808ClientTcpEncoder>();
this.jT808SendAtomicCounterService = jT808SendAtomicCounterService;
JT808Serializer = jT808Config.GetSerializer();
}

protected override void Encode(IChannelHandlerContext context, JT808ClientRequest message, IByteBuffer output)
{
if (message.Package != null)
{
try
{
var sendData = JT808Serializer.Serialize(message.Package, message.MinBufferSize);
output.WriteBytes(sendData);
jT808SendAtomicCounterService.MsgSuccessIncrement();
}
catch (JT808.Protocol.Exceptions.JT808Exception ex)
{
logger.LogError(ex, context.Channel.Id.AsShortText());
}
catch (System.Exception ex)
{
logger.LogError(ex, context.Channel.Id.AsShortText());
}
}
else if (message.HexData != null)
{
output.WriteBytes(message.HexData);
jT808SendAtomicCounterService.MsgSuccessIncrement();
}
}
}
}

+ 32
- 0
src/JT808.Gateway/Codecs/JT808TcpDecoder.cs View File

@@ -0,0 +1,32 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using System.Collections.Generic;
using JT808.Protocol;
using DotNetty.Transport.Channels;

namespace JT808.Gateway.Codecs
{
public class JT808TcpDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
//过滤掉不是808标准包
//不包括头尾标识
//(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1
if (input.Capacity < 12)
{
byte[] buffer = new byte[input.Capacity];
input.ReadBytes(buffer, 0, input.Capacity);
return;
}
else
{
byte[] buffer = new byte[input.Capacity + 2];
input.ReadBytes(buffer, 1, input.Capacity);
buffer[0] = JT808Package.BeginFlag;
buffer[input.Capacity + 1] = JT808Package.EndFlag;
output.Add(buffer);
}
}
}
}

+ 52
- 0
src/JT808.Gateway/Codecs/JT808TcpEncoder.cs View File

@@ -0,0 +1,52 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using JT808.Protocol;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Logging;
using JT808.Protocol.Interfaces;
using JT808.Gateway.Interfaces;

namespace JT808.Gateway.Codecs
{
/// <summary>
/// tcp统一下发出口
/// </summary>
public class JT808TcpEncoder : MessageToByteEncoder<IJT808Reply>
{
private readonly ILogger<JT808TcpEncoder> logger;

private readonly JT808Serializer JT808Serializer;

public JT808TcpEncoder(
IJT808Config jT808Config,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT808TcpEncoder>();
this.JT808Serializer = jT808Config.GetSerializer();
}

protected override void Encode(IChannelHandlerContext context, IJT808Reply message, IByteBuffer output)
{
if (message.Package != null)
{
try
{
var sendData = JT808Serializer.Serialize(message.Package, message.MinBufferSize);
output.WriteBytes(Unpooled.WrappedBuffer(sendData));
}
catch (JT808.Protocol.Exceptions.JT808Exception ex)
{
logger.LogError(ex, context.Channel.Id.AsShortText());
}
catch (System.Exception ex)
{
logger.LogError(ex, context.Channel.Id.AsShortText());
}
}
else if (message.HexData != null)
{
output.WriteBytes(Unpooled.WrappedBuffer(message.HexData));
}
}
}
}

+ 33
- 0
src/JT808.Gateway/Codecs/JT808UdpDecoder.cs View File

@@ -0,0 +1,33 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using System.Collections.Generic;
using DotNetty.Transport.Channels.Sockets;
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Codecs
{
public class JT808UdpDecoder : MessageToMessageDecoder<DatagramPacket>
{
protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List<object> output)
{
if (!message.Content.IsReadable()) return;
IByteBuffer byteBuffer = message.Content;
//过滤掉非808标准包
//不包括头尾标识
//(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1
if (byteBuffer.ReadableBytes < 12)
{
byte[] buffer = new byte[byteBuffer.ReadableBytes];
byteBuffer.ReadBytes(buffer, 0, byteBuffer.ReadableBytes);
return;
}
else
{
byte[] buffer = new byte[byteBuffer.ReadableBytes];
byteBuffer.ReadBytes(buffer);
output.Add(new JT808UdpPackage(buffer, message.Sender));
}
}
}
}

+ 31
- 0
src/JT808.Gateway/Configurations/JT808ClientConfiguration.cs View File

@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT808.Gateway.Configurations
{
public class JT808ClientConfiguration
{
public string Host { get; set; }

public int Port { get; set; }

private EndPoint endPoint;

public EndPoint EndPoint
{
get
{
if (endPoint == null)
{
if (IPAddress.TryParse(Host, out IPAddress ip))
{
endPoint = new IPEndPoint(ip, Port);
}
}
return endPoint;
}
}
}
}

+ 41
- 0
src/JT808.Gateway/Configurations/JT808Configuration.cs View File

@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configurations
{
public class JT808Configuration
{
public int TcpPort { get; set; } = 808;

public int UdpPort { get; set; } = 808;

public int QuietPeriodSeconds { get; set; } = 1;

public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds);

public int ShutdownTimeoutSeconds { get; set; } = 3;

public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds);

public int SoBacklog { get; set; } = 8192;

public int EventLoopCount { get; set; } = Environment.ProcessorCount;

public int ReaderIdleTimeSeconds { get; set; } = 3600;

public int WriterIdleTimeSeconds { get; set; } = 3600;

public int AllIdleTimeSeconds { get; set; } = 3600;

/// <summary>
/// 转发远程地址 (可选项)知道转发的地址有利于提升性能
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括:
// 1.消息的序列化
// 2.消息的下发
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle
// 就跟神兽貔貅一样。。。
/// </summary>
public List<string> ForwardingRemoteIPAddress { get; set; }
}
}

+ 29
- 0
src/JT808.Gateway/Converters/JsonByteArrayHexConverter.cs View File

@@ -0,0 +1,29 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Converters
{
class ByteArrayHexConverter : JsonConverter
{
public override bool CanConvert(Type objectType) => objectType == typeof(byte[]);

public override bool CanRead => false;
public override bool CanWrite => true;

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) => throw new NotImplementedException();

private readonly string _separator;

public ByteArrayHexConverter(string separator = " ") => _separator = separator;

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
var hexString = string.Join(_separator, ((byte[])value).Select(p => p.ToString("X2")));
writer.WriteValue(hexString);
}
}
}

+ 26
- 0
src/JT808.Gateway/Converters/JsonIPAddressConverter.cs View File

@@ -0,0 +1,26 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT808.Gateway.Converters
{
public class JsonIPAddressConverter : JsonConverter
{
public override bool CanConvert(Type objectType)
{
return (objectType == typeof(IPAddress));
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
writer.WriteValue(value.ToString());
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
return IPAddress.Parse((string)reader.Value);
}
}
}

+ 32
- 0
src/JT808.Gateway/Converters/JsonIPEndPointConverter.cs View File

@@ -0,0 +1,32 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Net;

namespace JT808.Gateway.Converters
{
public class JsonIPEndPointConverter: JsonConverter
{
public override bool CanConvert(Type objectType)
{
return (objectType == typeof(IPEndPoint));
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
IPEndPoint ep = (IPEndPoint)value;
JObject jo = new JObject();
jo.Add("Host", JToken.FromObject(ep.Address, serializer));
jo.Add("Port", ep.Port);
jo.WriteTo(writer);
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
JObject jo = JObject.Load(reader);
IPAddress address = jo["Host"].ToObject<IPAddress>(serializer);
int port = (int)jo["Port"];
return new IPEndPoint(address, port);
}
}
}

+ 12
- 0
src/JT808.Gateway/Dtos/JT808AtomicCounterDto.cs View File

@@ -0,0 +1,12 @@
namespace JT808.Gateway.Dtos
{
/// <summary>
/// 包计数器服务
/// </summary>
public class JT808AtomicCounterDto
{
public long MsgSuccessCount { get; set; }

public long MsgFailCount { get; set; }
}
}

+ 15
- 0
src/JT808.Gateway/Dtos/JT808DefaultResultDto.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Dtos
{
public class JT808DefaultResultDto: JT808ResultDto<string>
{
public JT808DefaultResultDto()
{
Data = "Hello,JT808 WebAPI";
Code = JT808ResultCode.Ok;
}
}
}

+ 37
- 0
src/JT808.Gateway/Dtos/JT808IPAddressDto.cs View File

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Runtime.Serialization;
using System.Text;

namespace JT808.Gateway.Dtos
{
public class JT808IPAddressDto
{
public string Host { get; set; }

public int Port { get; set; }

public EndPoint endPoint;

public EndPoint EndPoint
{
get
{
if (endPoint == null)
{
if (IPAddress.TryParse(Host, out IPAddress ip))
{
endPoint = new IPEndPoint(ip, Port);
}
else
{
endPoint = new DnsEndPoint(Host, Port);
}
}
return endPoint;
}
set { }
}
}
}

+ 29
- 0
src/JT808.Gateway/Dtos/JT808ResultDto.cs View File

@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Dtos
{
public class JT808ResultDto<T>
{
public JT808ResultDto()
{
Code = JT808ResultCode.Ok;
}

public string Message { get; set; }

public int Code { get; set; }

public T Data { get; set; }
}

public class JT808ResultCode
{
public const int Ok = 200;
public const int Empty = 201;
public const int NotFound = 404;
public const int Fail = 400;
public const int Error = 500;
}
}

+ 24
- 0
src/JT808.Gateway/Dtos/JT808TcpSessionInfoDto.cs View File

@@ -0,0 +1,24 @@
using System;

namespace JT808.Gateway.Dtos
{
public class JT808TcpSessionInfoDto
{
/// <summary>
/// 最后上线时间
/// </summary>
public DateTime LastActiveTime { get; set; }
/// <summary>
/// 上线时间
/// </summary>
public DateTime StartTime { get; set; }
/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }
/// <summary>
/// 远程ip地址
/// </summary>
public string RemoteAddressIP { get; set; }
}
}

+ 24
- 0
src/JT808.Gateway/Dtos/JT808UdpSessionInfoDto.cs View File

@@ -0,0 +1,24 @@
using System;

namespace JT808.Gateway.Dtos
{
public class JT808UdpSessionInfoDto
{
/// <summary>
/// 最后上线时间
/// </summary>
public DateTime LastActiveTime { get; set; }
/// <summary>
/// 上线时间
/// </summary>
public DateTime StartTime { get; set; }
/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }
/// <summary>
/// 远程ip地址
/// </summary>
public string RemoteAddressIP { get; set; }
}
}

+ 11
- 0
src/JT808.Gateway/Dtos/JT808UnificationSendRequestDto.cs View File

@@ -0,0 +1,11 @@
namespace JT808.Gateway.Dtos
{
/// <summary>
/// 统一下发请求参数
/// </summary>
public class JT808UnificationSendRequestDto
{
public string TerminalPhoneNo { get; set; }
public byte[] Data { get; set; }
}
}

+ 15
- 0
src/JT808.Gateway/Enums/JT808TransportProtocolType.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Enums
{
/// <summary>
/// 传输协议类型
/// </summary>
public enum JT808TransportProtocolType
{
tcp=1,
udp = 2
}
}

+ 91
- 0
src/JT808.Gateway/Handlers/JT808MsgIdHttpHandlerBase.cs View File

@@ -0,0 +1,91 @@
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Gateway.Dtos;
using JT808.Gateway.Metadata;
using Newtonsoft.Json;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// 基于webapi http模式抽象消息处理业务
/// 自定义消息处理业务
/// 注意:
/// 1.ConfigureServices:
/// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHttpHandlerBase),typeof(JT808MsgIdCustomHttpHandlerImpl),ServiceLifetime.Singleton));
/// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize<T>
/// </summary>
public abstract class JT808MsgIdHttpHandlerBase
{
/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT808MsgIdHttpHandlerBase()
{
HandlerDict = new Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>>();
}

protected void CreateRoute(string url, Func<JT808HttpRequest, JT808HttpResponse> func)
{
if (!HandlerDict.ContainsKey(url))
{
HandlerDict.Add(url, func);
}
else
{
// 替换
HandlerDict[url] = func;
}
}

public Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>> HandlerDict { get; }

protected JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject)
{
byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject));
return new JT808HttpResponse()
{
Data = data
};
}

public JT808HttpResponse DefaultHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808DefaultResultDto()));
return new JT808HttpResponse(json);
}

public JT808HttpResponse EmptyHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code = JT808ResultCode.Empty,
Message = "内容为空",
Data = "Content Empty"
}));
return new JT808HttpResponse(json);
}

public JT808HttpResponse NotFoundHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code = JT808ResultCode.NotFound,
Message = "没有该服务",
Data = "没有该服务"
}));
return new JT808HttpResponse(json);
}

public JT808HttpResponse ErrorHttpResponse(Exception ex)
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code = JT808ResultCode.Error,
Message = JsonConvert.SerializeObject(ex),
Data = ex.Message
}));
return new JT808HttpResponse(json);
}
}
}

+ 96
- 0
src/JT808.Gateway/Handlers/JT808TcpClientConnectionHandler.cs View File

@@ -0,0 +1,96 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT808.Gateway.Client;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// JT808客户端连接通道处理程序
/// </summary>
public class JT808TcpClientConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT808TcpClientConnectionHandler> logger;
private readonly JT808TcpClient jT808TcpClient;

public JT808TcpClientConnectionHandler(
JT808TcpClient jT808TcpClient)
{
logger = jT808TcpClient.LoggerFactory.CreateLogger<JT808TcpClientConnectionHandler>();
this.jT808TcpClient = jT808TcpClient;
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 设备主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");

return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if(idleStateEvent.State== IdleState.WriterIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
jT808TcpClient.Send(new JT808_0x0002());
}
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception,$"{channelId} {exception.Message}" );

context.CloseAsync();
}
}
}


+ 31
- 0
src/JT808.Gateway/Handlers/JT808TcpClientHandler.cs View File

@@ -0,0 +1,31 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.DependencyInjection;
using JT808.Gateway.Services;
using JT808.Gateway.Client;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// JT808客户端处理程序
/// </summary>
internal class JT808TcpClientHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly ILogger<JT808TcpClientHandler> logger;
private readonly JT808ClientReceiveAtomicCounterService jT808ReceiveAtomicCounterService;
public JT808TcpClientHandler(JT808ClientReceiveAtomicCounterService jT808ReceiveAtomicCounterService,JT808TcpClient jT808TcpClient)
{
logger = jT808TcpClient.LoggerFactory.CreateLogger<JT808TcpClientHandler>();
this.jT808ReceiveAtomicCounterService= jT808ReceiveAtomicCounterService;
}

protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
if(logger.IsEnabled(LogLevel.Trace))
logger.LogTrace("accept msg<<<" + ByteBufferUtil.HexDump(msg));
jT808ReceiveAtomicCounterService.MsgSuccessIncrement();
}
}
}

+ 104
- 0
src/JT808.Gateway/Handlers/JT808TcpConnectionHandler.cs View File

@@ -0,0 +1,104 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT808.Gateway.Session;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// JT808服务通道处理程序
/// </summary>
internal class JT808TcpConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT808TcpConnectionHandler> logger;

private readonly JT808SessionManager jT808SessionManager;

public JT808TcpConnectionHandler(
JT808SessionManager jT808SessionManager,
ILoggerFactory loggerFactory)
{
this.jT808SessionManager = jT808SessionManager;
logger = loggerFactory.CreateLogger<JT808TcpConnectionHandler>();
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 设备主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
jT808SessionManager.RemoveSessionByChannel(context.Channel);
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
jT808SessionManager.RemoveSessionByChannel(context.Channel);
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if(idleStateEvent.State== IdleState.ReaderIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
jT808SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括:
// 1.消息的序列化
// 2.消息的下发
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle
// 就跟神兽貔貅一样。。。
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception,$"{channelId} {exception.Message}" );
jT808SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
}
}


+ 77
- 0
src/JT808.Gateway/Handlers/JT808TcpServerHandler.cs View File

@@ -0,0 +1,77 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT808.Protocol;
using System;
using Microsoft.Extensions.Logging;
using JT808.Protocol.Exceptions;
using JT808.Gateway.Session;
using JT808.Gateway.Services;
using JT808.Gateway.PubSub;
using JT808.Gateway.Enums;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// JT808服务端处理程序
/// </summary>
internal class JT808TcpServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT808SessionManager jT808SessionManager;

private readonly JT808AtomicCounterService jT808AtomicCounterService;

private readonly ILogger<JT808TcpServerHandler> logger;

private readonly JT808Serializer JT808Serializer;

private readonly IJT808MsgProducer JT808MsgProducer;

public JT808TcpServerHandler(
IJT808MsgProducer jT808MsgProducer,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory,
JT808SessionManager jT808SessionManager)
{
this.jT808SessionManager = jT808SessionManager;
this.JT808MsgProducer = jT808MsgProducer;
this.jT808AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp);
this.JT808Serializer = jT808Config.GetSerializer();
logger = loggerFactory.CreateLogger<JT808TcpServerHandler>();
}

protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
//解析到头部,然后根据具体的消息Id通过队列去进行消费
//要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息,
//解析具体的消息体,具体调用JT808Serializer.Deserialize<T>
JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize<JT808HeaderPackage>(msg);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace($"accept package success count=>{jT808AtomicCounterService.MsgSuccessCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg)}");
}
jT808AtomicCounterService.MsgSuccessIncrement();
jT808SessionManager.TryAdd(jT808HeaderPackage.Header.TerminalPhoneNo,ctx.Channel);
JT808MsgProducer.ProduceAsync(jT808HeaderPackage.Header.TerminalPhoneNo, msg);
}
catch (JT808Exception ex)
{
jT808AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError(ex,$"accept package fail count=>{jT808AtomicCounterService.MsgFailCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg)}");
}
}
catch (Exception ex)
{
jT808AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError(ex, $"accept package fail count=>{jT808AtomicCounterService.MsgFailCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg)}");
}
}
}
}
}

+ 79
- 0
src/JT808.Gateway/Handlers/JT808UdpServerHandler.cs View File

@@ -0,0 +1,79 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT808.Protocol;
using System;
using Microsoft.Extensions.Logging;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Gateway.PubSub;
using JT808.Gateway.Metadata;
using JT808.Gateway.Enums;

namespace JT808.Gateway.Handlers
{
/// <summary>
/// JT808 Udp服务端处理程序
/// </summary>
internal class JT808UdpServerHandler : SimpleChannelInboundHandler<JT808UdpPackage>
{
private readonly JT808AtomicCounterService jT808AtomicCounterService;

private readonly ILogger<JT808UdpServerHandler> logger;

private readonly JT808SessionManager jT808UdpSessionManager;

private readonly JT808Serializer JT808Serializer;

private readonly IJT808MsgProducer JT808MsgProducer;

public JT808UdpServerHandler(
IJT808MsgProducer jT808MsgProducer,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory,
JT808SessionManager jT808UdpSessionManager)
{
this.JT808MsgProducer = jT808MsgProducer;
this.jT808AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp);
this.jT808UdpSessionManager = jT808UdpSessionManager;
logger = loggerFactory.CreateLogger<JT808UdpServerHandler>();
JT808Serializer = jT808Config.GetSerializer();
}

protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UdpPackage msg)
{
try
{
//解析到头部,然后根据具体的消息Id通过队列去进行消费
//要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息,
//解析具体的消息体,具体调用JT808Serializer.Deserialize<T>
JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize<JT808HeaderPackage>(msg.Buffer);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace($"accept package success count=>{jT808AtomicCounterService.MsgFailCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg.Buffer)}");
}
jT808AtomicCounterService.MsgSuccessIncrement();
jT808UdpSessionManager.TryAdd(ctx.Channel, msg.Sender, jT808HeaderPackage.Header.TerminalPhoneNo);
JT808MsgProducer.ProduceAsync(jT808HeaderPackage.Header.TerminalPhoneNo, msg.Buffer);
}
catch (JT808.Protocol.Exceptions.JT808Exception ex)
{
jT808AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError(ex, $"accept package fail count=>{jT808AtomicCounterService.MsgFailCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg.Buffer)}");
}
}
catch (Exception ex)
{
jT808AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError(ex, $"accept package fail count=>{jT808AtomicCounterService.MsgFailCount.ToString()},accept msg=>{ByteBufferUtil.HexDump(msg.Buffer)}");
}
}
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
}
}

+ 14
- 0
src/JT808.Gateway/IJT808ClientBuilder.cs View File

@@ -0,0 +1,14 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway
{
public interface IJT808ClientBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 14
- 0
src/JT808.Gateway/IJT808GatewayBuilder.cs View File

@@ -0,0 +1,14 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway
{
public interface IJT808GatewayBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 18
- 0
src/JT808.Gateway/Impls/JT808DatagramPacketImpl.cs View File

@@ -0,0 +1,18 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels.Sockets;
using JT808.Gateway.Interfaces;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT808.Gateway.Impls
{
class JT808DatagramPacketImpl : IJT808DatagramPacket
{
public DatagramPacket Create(byte[] message, EndPoint recipient)
{
return new DatagramPacket(Unpooled.WrappedBuffer(message), recipient);
}
}
}

+ 25
- 0
src/JT808.Gateway/Impls/JT808GatewayBuilderDefault.cs View File

@@ -0,0 +1,25 @@
using JT808.Gateway;
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Impls
{
internal class JT808GatewayBuilderDefault : IJT808GatewayBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808GatewayBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 30
- 0
src/JT808.Gateway/Impls/JT808MsgProducerDefaultImpl.cs View File

@@ -0,0 +1,30 @@
using JT808.Gateway;
using JT808.Gateway.PubSub;
using JT808.Gateway.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Impls
{
internal class JT808MsgProducerDefaultImpl : IJT808MsgProducer
{
private readonly JT808MsgService JT808MsgService;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgProducerDefaultImpl(JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
}
public void Dispose()
{
}

public Task ProduceAsync(string terminalNo, byte[] data)
{
JT808MsgService.MsgQueue.Add((terminalNo, data));
return Task.CompletedTask;
}
}
}

+ 193
- 0
src/JT808.Gateway/Impls/JT808MsgReplyConsumerDefaultImpl.cs View File

@@ -0,0 +1,193 @@
using JT808.Gateway;
using JT808.Gateway.PubSub;
using JT808.Gateway.Services;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Impls
{
internal class JT808MsgReplyConsumerDefaultImpl : IJT808MsgReplyConsumer
{
private readonly JT808MsgService JT808MsgService;
private readonly JT808Serializer JT808Serializer;
private Dictionary<ushort, Func<JT808HeaderPackage, byte[]>> HandlerDict;
public JT808MsgReplyConsumerDefaultImpl(
IJT808Config jT808Config,
JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
this.JT808Serializer = jT808Config.GetSerializer();
HandlerDict = new Dictionary<ushort, Func<JT808HeaderPackage, byte[]>> {
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001},
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102},
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002},
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003},
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100},
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 },
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 },
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 }
};
}
public CancellationTokenSource Cts =>new CancellationTokenSource();

public string TopicName => JT808GatewayConstants.MsgReplyTopic;

public void Dispose()
{
Cts.Dispose();
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
foreach(var item in JT808MsgService.MsgQueue.GetConsumingEnumerable())
{
try
{
var package = JT808Serializer.HeaderDeserialize(item.Data);
if (HandlerDict.TryGetValue(package.Header.MsgId, out var func))
{
var buffer = func(package);
if (buffer != null)
{
callback((item.TerminalNo, buffer));
}
}
}
catch (Exception ex)
{

}
}
}, Cts.Token);
}

public void Subscribe()
{
}

public void Unsubscribe()
{
Cts.Cancel();
}

/// <summary>
/// 终端通用应答
/// 平台无需回复
/// 实现自己的业务
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0001(JT808HeaderPackage request)
{
return null;
}
/// <summary>
/// 终端心跳
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0002(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 终端注销
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0003(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 终端注册
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0100(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8100()
{
Code = "J" + request.Header.TerminalPhoneNo,
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 终端鉴权
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0102(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 位置信息汇报
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0200(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 定位数据批量上传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0704(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 数据上行透传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0900(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
}
}

+ 32
- 0
src/JT808.Gateway/Impls/JT808SessionProducerDefaultImpl.cs View File

@@ -0,0 +1,32 @@
using JT808.Gateway;
using JT808.Gateway.PubSub;
using Microsoft.Extensions.Logging;
using System.Threading.Tasks;

namespace JT808.Gateway.Impls
{
internal class JT808SessionProducerDefaultImpl : IJT808SessionProducer
{
private readonly ILogger<JT808SessionProducerDefaultImpl> logger;
public JT808SessionProducerDefaultImpl(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT808SessionProducerDefaultImpl>();
}

public string TopicName => JT808GatewayConstants.SessionTopic;

public void Dispose()
{
}

public Task ProduceAsync(string terminalNo, string notice)
{
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"{terminalNo}-{notice}");
}
return Task.CompletedTask;
}
}
}

+ 13
- 0
src/JT808.Gateway/Interfaces/IJT808DatagramPacket.cs View File

@@ -0,0 +1,13 @@
using DotNetty.Transport.Channels.Sockets;
using System.Net;

namespace JT808.Gateway.Interfaces
{
/// <summary>
/// 基于udp的创建发送包
/// </summary>
public interface IJT808DatagramPacket
{
DatagramPacket Create(byte[] message, EndPoint recipient);
}
}

+ 17
- 0
src/JT808.Gateway/Interfaces/IJT808Reply.cs View File

@@ -0,0 +1,17 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Interfaces
{
public interface IJT808Reply
{
JT808Package Package { get; set; }
byte[] HexData { get; set; }
/// <summary>
/// 根据实际情况适当调整包的大小
/// </summary>
int MinBufferSize { get; set; }
}
}

+ 20
- 0
src/JT808.Gateway/Interfaces/IJT808Session.cs View File

@@ -0,0 +1,20 @@
using DotNetty.Transport.Channels;
using JT808.Gateway.Enums;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Interfaces
{
public interface IJT808Session
{
/// <summary>
/// 终端手机号
/// </summary>
string TerminalPhoneNo { get; set; }
IChannel Channel { get; set; }
DateTime LastActiveTime { get; set; }
DateTime StartTime { get; set; }
JT808TransportProtocolType TransportProtocolType { get; set; }
}
}

+ 30
- 0
src/JT808.Gateway/Interfaces/IJT808SessionService.cs View File

@@ -0,0 +1,30 @@
using JT808.Gateway.Dtos;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Interfaces
{
/// <summary>
/// JT808会话服务
/// </summary>
public interface IJT808SessionService
{
/// <summary>
/// 获取udp会话集合
/// </summary>
/// <returns></returns>
JT808ResultDto<List<JT808UdpSessionInfoDto>> GetUdpAll();
/// <summary>
/// 获取tcp会话集合
/// </summary>
/// <returns></returns>
JT808ResultDto<List<JT808TcpSessionInfoDto>> GetTcpAll();
/// <summary>
/// 通过设备终端号移除对应会话
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <returns></returns>
JT808ResultDto<bool> RemoveByTerminalPhoneNo(string terminalPhoneNo);
}
}

+ 12
- 0
src/JT808.Gateway/Interfaces/IJT808UnificationSendService.cs View File

@@ -0,0 +1,12 @@
using JT808.Gateway.Dtos;

namespace JT808.Gateway.Interfaces
{
/// <summary>
/// JT808统一下发命令服务
/// </summary>
public interface IJT808UnificationSendService
{
JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data);
}
}

+ 42
- 0
src/JT808.Gateway/JT808.Gateway.csproj View File

@@ -0,0 +1,42 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.0</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2018.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<RepositoryUrl>https://github.com/SmallChi/JT808DotNetty</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT808DotNetty</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</license>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Version>1.0.0-preview1</Version>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
</PropertyGroup>

<ItemGroup>
<Protobuf Include="Protos\JT808Gateway.proto"/>
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.10.0" />
<PackageReference Include="Grpc.Core" Version="2.24.0" />
<PackageReference Include="Grpc.Tools" Version="2.24.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="JT808" Version="2.1.7" />
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.0.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.0.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>

</Project>

+ 48
- 0
src/JT808.Gateway/JT808GatewayConstants.cs View File

@@ -0,0 +1,48 @@
namespace JT808.Gateway
{
public static class JT808GatewayConstants
{
public const string SessionOnline= "JT808SessionOnline";

public const string SessionOffline = "JT808SessionOffline";
public const string SessionTopic = "jt808session";
public const string MsgTopic = "jt808msgdefault";
public const string MsgReplyTopic = "jt808msgreplydefault";

public static class JT808WebApiRouteTable
{
public const string RouteTablePrefix = "/jt808api";

public const string SessionPrefix = "Session";

public const string TcpPrefix = "Tcp";

public const string UdpPrefix = "Udp";

/// <summary>
/// 基于Tcp的包计数器
/// </summary>
public static string GetTcpAtomicCounter = $"{RouteTablePrefix}/{TcpPrefix}/GetAtomicCounter";
/// <summary>
/// 基于Tcp的会话服务集合
/// </summary>
public static string SessionTcpGetAll = $"{RouteTablePrefix}/{TcpPrefix}/{SessionPrefix}/GetAll";
/// <summary>
/// 会话服务-通过设备终端号移除对应会话
/// </summary>
public static string SessionRemoveByTerminalPhoneNo = $"{RouteTablePrefix}/{SessionPrefix}/RemoveByTerminalPhoneNo";
/// <summary>
/// 统一下发信息
/// </summary>
public static string UnificationSend = $"{RouteTablePrefix}/UnificationSend";
/// <summary>
/// 获取Udp包计数器
/// </summary>
public static string GetUdpAtomicCounter = $"{RouteTablePrefix}/{UdpPrefix}/GetAtomicCounter";
/// <summary>
/// 基于Udp的会话服务集合
/// </summary>
public static string SessionUdpGetAll = $"{RouteTablePrefix}/{UdpPrefix}/{SessionPrefix}/GetAll";
}
}
}

+ 96
- 0
src/JT808.Gateway/JT808GatewayExtensions.cs View File

@@ -0,0 +1,96 @@
using JT808.Gateway;
using JT808.Gateway.Configurations;
using JT808.Gateway.Converters;
using JT808.Gateway.Impls;
using JT808.Gateway.Interfaces;
using JT808.Gateway.PubSub;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Runtime.CompilerServices;

//[assembly: InternalsVisibleTo("JT808.DotNetty.Core.Test")]

namespace JT808.Gateway
{
public static class JT808GatewayExtensions
{
static JT808GatewayExtensions()
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
Newtonsoft.Json.JsonSerializerSettings settings = new Newtonsoft.Json.JsonSerializerSettings();
//日期类型默认格式化处理
settings.DateFormatHandling = Newtonsoft.Json.DateFormatHandling.MicrosoftDateFormat;
settings.DateFormatString = "yyyy-MM-dd HH:mm:ss";
//空值处理
settings.NullValueHandling = NullValueHandling.Ignore;
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
settings.Converters.Add(new ByteArrayHexConverter());
return settings;
});
}

public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder, IConfiguration configuration, Newtonsoft.Json.JsonSerializerSettings settings=null)
{
if (settings != null)
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
settings.Converters.Add(new ByteArrayHexConverter());
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
return settings;
});
}
IJT808GatewayBuilder nettyBuilder = new JT808GatewayBuilderDefault(jt808Builder);
nettyBuilder.JT808Builder.Services.Configure<JT808Configuration>(configuration.GetSection("JT808Configuration"));
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808AtomicCounterServiceFactory>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808SessionManager>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808UnificationSendService, JT808UnificationSendService>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionService, JT808SessionService>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgProducer, JT808MsgProducerDefaultImpl>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumerDefaultImpl>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionProducer, JT808SessionProducerDefaultImpl>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808MsgService>();
nettyBuilder.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
return nettyBuilder;
}

public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder, Action<JT808Configuration> jt808Options, Newtonsoft.Json.JsonSerializerSettings settings = null)
{
if (settings != null)
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
settings.Converters.Add(new ByteArrayHexConverter());
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
return settings;
});
}
IJT808GatewayBuilder nettyBuilder = new JT808GatewayBuilderDefault(jt808Builder);
nettyBuilder.JT808Builder.Services.Configure(jt808Options);
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808AtomicCounterServiceFactory>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808SessionManager>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808UnificationSendService, JT808UnificationSendService>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionService, JT808SessionService>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgProducer, JT808MsgProducerDefaultImpl>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumerDefaultImpl>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<JT808MsgService>();
nettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionProducer, JT808SessionProducerDefaultImpl>();
nettyBuilder.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
return nettyBuilder;
}
}
}

+ 49
- 0
src/JT808.Gateway/Metadata/JT808AtomicCounter.cs View File

@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Metadata
{
/// <summary>
///
/// <see cref="Grpc.Core.Internal"/>
/// </summary>
internal class JT808AtomicCounter
{
long counter = 0;

public JT808AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public void Reset()
{
Interlocked.Exchange(ref counter, 0);
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Add(long len)
{
return Interlocked.Add(ref counter,len);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 16
- 0
src/JT808.Gateway/Metadata/JT808ClientReport.cs View File

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Metadata
{
public class JT808ClientReport
{
public long SendTotalCount { get; set; }
public long ReceiveTotalCount { get; set; }
public DateTime CurrentDate { get; set; }
public int Connections { get; set; }
public int OnlineConnections { get; set; }
public int OfflineConnections { get; set; }
}
}

+ 30
- 0
src/JT808.Gateway/Metadata/JT808ClientRequest.cs View File

@@ -0,0 +1,30 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.Gateway.Metadata
{
public class JT808ClientRequest
{
public JT808Package Package { get; }

public byte[] HexData { get; }

/// <summary>
/// 根据实际情况适当调整包的大小
/// </summary>
public int MinBufferSize { get;}

public JT808ClientRequest(JT808Package package,int minBufferSize=1024)
{
Package = package;
MinBufferSize = minBufferSize;
}

public JT808ClientRequest(byte[] hexData)
{
HexData = hexData;
}
}
}

+ 22
- 0
src/JT808.Gateway/Metadata/JT808HttpRequest.cs View File

@@ -0,0 +1,22 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.Gateway.Metadata
{
public class JT808HttpRequest
{
public string Json { get; set; }

public JT808HttpRequest()
{

}

public JT808HttpRequest(string json)
{
Json = json;
}
}
}

+ 22
- 0
src/JT808.Gateway/Metadata/JT808HttpResponse.cs View File

@@ -0,0 +1,22 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.Gateway.Metadata
{
public class JT808HttpResponse
{
public byte[] Data { get; set; }

public JT808HttpResponse()
{

}

public JT808HttpResponse(byte[] data)
{
this.Data = data;
}
}
}

+ 23
- 0
src/JT808.Gateway/Metadata/JT808Request.cs View File

@@ -0,0 +1,23 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.Gateway.Metadata
{
public class JT808Request
{
public JT808HeaderPackage Package { get; }

/// <summary>
/// 用于消息发送
/// </summary>
public byte[] OriginalPackage { get;}

public JT808Request(JT808HeaderPackage package, byte[] originalPackage)
{
Package = package;
OriginalPackage = originalPackage;
}
}
}

+ 31
- 0
src/JT808.Gateway/Metadata/JT808Response.cs View File

@@ -0,0 +1,31 @@
using JT808.Gateway.Interfaces;
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.Gateway.Metadata
{
public class JT808Response: IJT808Reply
{
public JT808Package Package { get; set; }
public byte[] HexData { get; set; }
public int MinBufferSize { get; set; }

public JT808Response()
{

}

public JT808Response(JT808Package package, int minBufferSize = 1024)
{
Package = package;
MinBufferSize = minBufferSize;
}

public JT808Response(byte[] hexData)
{
HexData = hexData;
}
}
}

+ 32
- 0
src/JT808.Gateway/Metadata/JT808TcpSession.cs View File

@@ -0,0 +1,32 @@
using DotNetty.Transport.Channels;
using JT808.Gateway.Enums;
using JT808.Gateway.Interfaces;
using System;

namespace JT808.Gateway.Metadata
{
public class JT808TcpSession: IJT808Session
{
public JT808TcpSession(IChannel channel, string terminalPhoneNo)
{
Channel = channel;
TerminalPhoneNo = terminalPhoneNo;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
}

public JT808TcpSession() { }

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }

public IChannel Channel { get; set; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; set; }
public JT808TransportProtocolType TransportProtocolType { get; set; } = JT808TransportProtocolType.tcp;
}
}

+ 20
- 0
src/JT808.Gateway/Metadata/JT808UdpPackage.cs View File

@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT808.Gateway.Metadata
{
public class JT808UdpPackage
{
public JT808UdpPackage(byte[] buffer, EndPoint sender)
{
Buffer = buffer;
Sender = sender;
}

public byte[] Buffer { get; }

public EndPoint Sender { get; }
}
}

+ 38
- 0
src/JT808.Gateway/Metadata/JT808UdpSession.cs View File

@@ -0,0 +1,38 @@
using DotNetty.Transport.Channels;
using JT808.Gateway.Enums;
using JT808.Gateway.Interfaces;
using System;
using System.Net;

namespace JT808.Gateway.Metadata
{
public class JT808UdpSession: IJT808Session
{
public JT808UdpSession(IChannel channel,
EndPoint sender,
string terminalPhoneNo)
{
Channel = channel;
TerminalPhoneNo = terminalPhoneNo;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
Sender = sender;
}

public EndPoint Sender { get; set; }

public JT808UdpSession() { }

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }

public IChannel Channel { get; set; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; set; }
public JT808TransportProtocolType TransportProtocolType { get; set; } = JT808TransportProtocolType.udp;
}
}

+ 63
- 0
src/JT808.Gateway/Protos/JT808Gateway.proto View File

@@ -0,0 +1,63 @@
syntax = "proto3";

option csharp_namespace = "JT808.Gateway.GrpcService";

package JT808GatewayGrpc;

service JT808Gateway{
// 会话服务-获取会话服务集合
rpc GetTcpSessionAll(Empty) returns (TcpSessionInfoReply);
// 会话服务-通过设备终端号移除对应会话
rpc RemoveSessionByTerminalPhoneNo(SessionRemoveRequest) returns (SessionRemoveReply);
// 统一下发信息
rpc UnificationSend(UnificationSendRequest) returns (UnificationSendReply);
// 获取Tcp包计数器
rpc GetTcpAtomicCounter(Empty) returns (TcpAtomicCounterReply);
// 会话服务-获取会话服务集合
rpc GetUdpSessionAll(Empty) returns (UdpSessionInfoReply);
// 获取Udp包计数器
rpc GetUdpAtomicCounter(Empty) returns (UdpAtomicCounterReply);
}

message Empty{}

message TcpSessionInfoReply{
repeated SessionInfo TcpSessions=1;
}
message UdpSessionInfoReply{
repeated SessionInfo UdpSessions=1;
}

message SessionInfo{
string StartTime=1;
string LastActiveTime=2;
string TerminalPhoneNo=3;
string RemoteAddressIP=4;
}

message SessionRemoveRequest{
string TerminalPhoneNo=1;
}

message SessionRemoveReply{
bool Success = 1;
}

message UnificationSendRequest{
string TerminalPhoneNo=1;
bytes Data=2;
}

message UnificationSendReply{
bool Success = 1;
}

message TcpAtomicCounterReply{
int64 MsgSuccessCount=1;
int64 MsgFailCount=2;
}

message UdpAtomicCounterReply{
int64 MsgSuccessCount=1;
int64 MsgFailCount=2;
}

+ 15
- 0
src/JT808.Gateway/PubSub/IJT808MsgConsumer.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.PubSub
{
public interface IJT808MsgConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string TerminalNo, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 17
- 0
src/JT808.Gateway/PubSub/IJT808MsgProducer.cs View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.PubSub
{
public interface IJT808MsgProducer : IJT808PubSub, IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param>
Task ProduceAsync(string terminalNo, byte[] data);
}
}

+ 15
- 0
src/JT808.Gateway/PubSub/IJT808MsgReplyConsumer.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.PubSub
{
public interface IJT808MsgReplyConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string TerminalNo, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 17
- 0
src/JT808.Gateway/PubSub/IJT808MsgReplyProducer.cs View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.PubSub
{
public interface IJT808MsgReplyProducer : IJT808PubSub, IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param>
Task ProduceAsync(string terminalNo, byte[] data);
}
}

+ 11
- 0
src/JT808.Gateway/PubSub/IJT808PubSub.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.PubSub
{
public interface IJT808PubSub
{
string TopicName { get; }
}
}

+ 18
- 0
src/JT808.Gateway/PubSub/IJT808SessionConsumer.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.PubSub
{
/// <summary>
/// 会话通知(在线/离线)
/// </summary>
public interface IJT808SessionConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string Notice, string TerminalNo)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 13
- 0
src/JT808.Gateway/PubSub/IJT808SessionProducer.cs View File

@@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;

namespace JT808.Gateway.PubSub
{
/// <summary>
/// 会话通知(在线/离线)
/// </summary>
public interface IJT808SessionProducer : IJT808PubSub, IDisposable
{
Task ProduceAsync(string notice,string terminalNo);
}
}

+ 52
- 0
src/JT808.Gateway/Services/JT808AtomicCounterService.cs View File

@@ -0,0 +1,52 @@
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Services
{
/// <summary>
/// 计数包服务
/// </summary>
public class JT808AtomicCounterService
{
private readonly JT808AtomicCounter MsgSuccessCounter;

private readonly JT808AtomicCounter MsgFailCounter;

public JT808AtomicCounterService()
{
MsgSuccessCounter=new JT808AtomicCounter();
MsgFailCounter = new JT808AtomicCounter();
}

public void Reset()
{
MsgSuccessCounter.Reset();
MsgFailCounter.Reset();
}

public long MsgSuccessIncrement()
{
return MsgSuccessCounter.Increment();
}

public long MsgSuccessCount
{
get
{
return MsgSuccessCounter.Count;
}
}

public long MsgFailIncrement()
{
return MsgFailCounter.Increment();
}

public long MsgFailCount
{
get
{
return MsgFailCounter.Count;
}
}
}
}

+ 30
- 0
src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs View File

@@ -0,0 +1,30 @@
using JT808.Gateway.Enums;
using System;
using System.Collections.Concurrent;

namespace JT808.Gateway.Services
{
public class JT808AtomicCounterServiceFactory
{
private readonly ConcurrentDictionary<JT808TransportProtocolType, JT808AtomicCounterService> cache;

public JT808AtomicCounterServiceFactory()
{
cache = new ConcurrentDictionary<JT808TransportProtocolType, JT808AtomicCounterService>();
}

public JT808AtomicCounterService Create(JT808TransportProtocolType type)
{
if(cache.TryGetValue(type,out var service))
{
return service;
}
else
{
var serviceNew = new JT808AtomicCounterService();
cache.TryAdd(type, serviceNew);
return serviceNew;
}
}
}
}

+ 35
- 0
src/JT808.Gateway/Services/JT808ClientReceiveAtomicCounterService.cs View File

@@ -0,0 +1,35 @@
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Services
{
/// <summary>
/// 接收计数包服务
/// </summary>
public class JT808ClientReceiveAtomicCounterService
{
private readonly JT808AtomicCounter MsgSuccessCounter;

public JT808ClientReceiveAtomicCounterService()
{
MsgSuccessCounter=new JT808AtomicCounter();
}

public void Reset()
{
MsgSuccessCounter.Reset();
}

public long MsgSuccessIncrement()
{
return MsgSuccessCounter.Increment();
}

public long MsgSuccessCount
{
get
{
return MsgSuccessCounter.Count;
}
}
}
}

+ 37
- 0
src/JT808.Gateway/Services/JT808ClientReportHostedService.cs View File

@@ -0,0 +1,37 @@
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
public class JT808ClientReportHostedService : IHostedService
{
private readonly JT808ClientReportService jT808ReportService;
private CancellationTokenSource cts = new CancellationTokenSource();
public JT808ClientReportHostedService(JT808ClientReportService jT808ReportService)
{
this.jT808ReportService = jT808ReportService;
}
public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(() =>
{
while (!cts.IsCancellationRequested)
{
jT808ReportService.Create();
Thread.Sleep(1000);
//Task.Delay(TimeSpan.FromSeconds(1), cts.Token);
}
}, cts.Token);
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
cts.Cancel();
return Task.CompletedTask;
}
}
}

+ 43
- 0
src/JT808.Gateway/Services/JT808ClientReportService.cs View File

@@ -0,0 +1,43 @@
using JT808.Gateway.Client;
using JT808.Gateway.Metadata;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace JT808.Gateway.Services
{
public class JT808ClientReportService
{
private readonly JT808ClientReceiveAtomicCounterService jT808ReceiveAtomicCounterService;
private readonly JT808ClientSendAtomicCounterService jT808SendAtomicCounterService;
private readonly IJT808TcpClientFactory jT808TcpClientFactory;

public List<JT808ClientReport> JT808Reports { get; private set; }

public JT808ClientReportService(
JT808ClientReceiveAtomicCounterService jT808ReceiveAtomicCounterService,
JT808ClientSendAtomicCounterService jT808SendAtomicCounterService,
IJT808TcpClientFactory jT808TcpClientFactory)
{
this.jT808ReceiveAtomicCounterService = jT808ReceiveAtomicCounterService;
this.jT808SendAtomicCounterService = jT808SendAtomicCounterService;
this.jT808TcpClientFactory = jT808TcpClientFactory;
JT808Reports = new List<JT808ClientReport>();
}

public void Create()
{
var clients = jT808TcpClientFactory.GetAll();
JT808Reports.Add(new JT808ClientReport()
{
SendTotalCount= jT808SendAtomicCounterService.MsgSuccessCount,
ReceiveTotalCount= jT808ReceiveAtomicCounterService.MsgSuccessCount,
CurrentDate=DateTime.Now,
Connections= clients.Count,
OnlineConnections= clients.Where(w => w.IsOpen).Count(),
OfflineConnections= clients.Where(w => !w.IsOpen).Count(),
});
}
}
}

+ 35
- 0
src/JT808.Gateway/Services/JT808ClientSendAtomicCounterService.cs View File

@@ -0,0 +1,35 @@
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Services
{
/// <summary>
/// 发送计数包服务
/// </summary>
public class JT808ClientSendAtomicCounterService
{
private readonly JT808AtomicCounter MsgSuccessCounter;

public JT808ClientSendAtomicCounterService()
{
MsgSuccessCounter=new JT808AtomicCounter();
}

public void Reset()
{
MsgSuccessCounter.Reset();
}

public long MsgSuccessIncrement()
{
return MsgSuccessCounter.Increment();
}

public long MsgSuccessCount
{
get
{
return MsgSuccessCounter.Count;
}
}
}
}

+ 96
- 0
src/JT808.Gateway/Services/JT808GatewayService.cs View File

@@ -0,0 +1,96 @@
using JT808.Gateway.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Gateway.Enums;
using JT808.Gateway.GrpcService;
using Grpc.Core;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
public class JT808GatewayService: JT808Gateway.JT808GatewayBase
{
private readonly JT808AtomicCounterService jT808TcpAtomicCounterService;

private readonly JT808AtomicCounterService jT808UdpAtomicCounterService;

private readonly IJT808SessionService jT808SessionService;

private readonly IJT808UnificationSendService jT808UnificationSendService;

public JT808GatewayService(
IJT808UnificationSendService jT808UnificationSendService,
IJT808SessionService jT808SessionService,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory
)
{
this.jT808UnificationSendService = jT808UnificationSendService;
this.jT808SessionService = jT808SessionService;
this.jT808TcpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp);
this.jT808UdpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp);
}

public override Task<TcpSessionInfoReply> GetTcpSessionAll(Empty request, ServerCallContext context)
{
var result = jT808SessionService.GetTcpAll();
TcpSessionInfoReply reply = new TcpSessionInfoReply();
foreach(var item in result.Data)
{
reply.TcpSessions.Add(new SessionInfo
{
LastActiveTime= item.LastActiveTime.ToString("yyyy-MM-dd HH:mm:ss"),
StartTime= item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"),
RemoteAddressIP= item.RemoteAddressIP,
TerminalPhoneNo=item.TerminalPhoneNo
});
}
return Task.FromResult(reply);
}

public override Task<SessionRemoveReply> RemoveSessionByTerminalPhoneNo(SessionRemoveRequest request, ServerCallContext context)
{
var result = jT808SessionService.RemoveByTerminalPhoneNo(request.TerminalPhoneNo);
return Task.FromResult(new SessionRemoveReply { Success = result.Data });
}

public override Task<UdpSessionInfoReply> GetUdpSessionAll(Empty request, ServerCallContext context)
{
var result = jT808SessionService.GetUdpAll();
UdpSessionInfoReply reply = new UdpSessionInfoReply();
foreach (var item in result.Data)
{
reply.UdpSessions.Add(new SessionInfo
{
LastActiveTime = item.LastActiveTime.ToString("yyyy-MM-dd HH:mm:ss"),
StartTime = item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"),
RemoteAddressIP = item.RemoteAddressIP,
TerminalPhoneNo = item.TerminalPhoneNo
});
}
return Task.FromResult(reply);
}

public override Task<UnificationSendReply> UnificationSend(UnificationSendRequest request, ServerCallContext context)
{
var result = jT808UnificationSendService.Send(request.TerminalPhoneNo, request.Data.ToByteArray());
return Task.FromResult(new UnificationSendReply { Success = result.Data });
}

public override Task<TcpAtomicCounterReply> GetTcpAtomicCounter(Empty request, ServerCallContext context)
{
TcpAtomicCounterReply reply = new TcpAtomicCounterReply();
reply.MsgFailCount=jT808TcpAtomicCounterService.MsgFailCount;
reply.MsgSuccessCount=jT808TcpAtomicCounterService.MsgSuccessCount;
return Task.FromResult(reply);
}

public override Task<UdpAtomicCounterReply> GetUdpAtomicCounter(Empty request, ServerCallContext context)
{
UdpAtomicCounterReply reply = new UdpAtomicCounterReply();
reply.MsgFailCount = jT808UdpAtomicCounterService.MsgFailCount;
reply.MsgSuccessCount = jT808UdpAtomicCounterService.MsgSuccessCount;
return Task.FromResult(reply);
}
}
}

+ 42
- 0
src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs View File

@@ -0,0 +1,42 @@
using JT808.Gateway.PubSub;
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
internal class JT808MsgReplyHostedService : IHostedService
{
private readonly JT808SessionManager JT808SessionManager;

private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer;

public JT808MsgReplyHostedService(
IJT808MsgReplyConsumer jT808MsgReplyConsumer,
JT808SessionManager jT808SessionManager)
{
JT808MsgReplyConsumer = jT808MsgReplyConsumer;
JT808SessionManager = jT808SessionManager;
}

public Task StartAsync(CancellationToken cancellationToken)
{
JT808MsgReplyConsumer.OnMessage(item =>
{
JT808SessionManager.Send(item.TerminalNo, item.Data);
});
JT808MsgReplyConsumer.Subscribe();
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
JT808MsgReplyConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 11
- 0
src/JT808.Gateway/Services/JT808MsgService.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Services
{
internal class JT808MsgService
{
public System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)> MsgQueue { get; set; } = new System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)>();
}
}

+ 98
- 0
src/JT808.Gateway/Services/JT808SessionService.cs View File

@@ -0,0 +1,98 @@
using JT808.Gateway.Dtos;
using JT808.Gateway.Interfaces;
using JT808.Gateway.Session;
using System;
using System.Collections.Generic;
using System.Linq;


namespace JT808.Gateway.Services
{
internal class JT808SessionService : IJT808SessionService
{
private readonly JT808SessionManager jT808SessionManager;

public JT808SessionService(
JT808SessionManager jT808SessionManager)
{
this.jT808SessionManager = jT808SessionManager;
}

public JT808ResultDto<List<JT808TcpSessionInfoDto>> GetTcpAll()
{
JT808ResultDto<List<JT808TcpSessionInfoDto>> resultDto = new JT808ResultDto<List<JT808TcpSessionInfoDto>>();
try
{
resultDto.Data = jT808SessionManager.GetAll().Select(s => new JT808TcpSessionInfoDto
{
LastActiveTime = s.LastActiveTime,
StartTime = s.StartTime,
TerminalPhoneNo = s.TerminalPhoneNo,
RemoteAddressIP = s.Channel.RemoteAddress.ToString(),
}).ToList();
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = null;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
}

public JT808ResultDto<List<JT808UdpSessionInfoDto>> GetUdpAll()
{
JT808ResultDto<List<JT808UdpSessionInfoDto>> resultDto = new JT808ResultDto<List<JT808UdpSessionInfoDto>>();
try
{
resultDto.Data = jT808SessionManager.GetUdpAll().Select(s => new JT808UdpSessionInfoDto
{
LastActiveTime = s.LastActiveTime,
StartTime = s.StartTime,
TerminalPhoneNo = s.TerminalPhoneNo,
RemoteAddressIP = s.Sender.ToString()
}).ToList();
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = null;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
}

public JT808ResultDto<bool> RemoveByTerminalPhoneNo(string terminalPhoneNo)
{
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
var session = jT808SessionManager.RemoveSession(terminalPhoneNo);
if (session != null)
{
if(session.Channel.Open)
{
session.Channel.CloseAsync();
}
}
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
catch (AggregateException ex)
{
resultDto.Data = false;
resultDto.Code = 500;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
}
}
}

+ 45
- 0
src/JT808.Gateway/Services/JT808UnificationSendService.cs View File

@@ -0,0 +1,45 @@
using JT808.Gateway.Dtos;
using JT808.Gateway.Interfaces;
using JT808.Gateway.Session;
using System;

namespace JT808.Gateway.Services
{
internal class JT808UnificationSendService : IJT808UnificationSendService
{
private readonly JT808SessionManager jT808SessionManager;

public JT808UnificationSendService(
JT808SessionManager jT808SessionManager)
{
this.jT808SessionManager = jT808SessionManager;
}

public JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data)
{
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
if(jT808SessionManager.TrySend(terminalPhoneNo, data, out var message))
{
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
resultDto.Message = message;
}
else
{
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = false;
resultDto.Message = message;
}
}
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
}
}
}

+ 304
- 0
src/JT808.Gateway/Session/JT808SessionManager.cs View File

@@ -0,0 +1,304 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT808.Gateway.Enums;
using JT808.Gateway.Interfaces;
using JT808.Gateway.Metadata;
using JT808.Gateway.PubSub;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;

namespace JT808.Gateway.Session
{
public class JT808SessionManager
{
private readonly ILogger<JT808SessionManager> logger;

private readonly IJT808DatagramPacket jT808DatagramPacket;
public IJT808SessionProducer JT808SessionProducer { get; }

public ConcurrentDictionary<string, IJT808Session> Sessions { get; }

public JT808SessionManager(
IJT808SessionProducer jT808SessionProducer,
ILoggerFactory loggerFactory
)
{
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
JT808SessionProducer = jT808SessionProducer;
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

public JT808SessionManager(
IJT808SessionProducer jT808SessionProducer,
ILoggerFactory loggerFactory,
IJT808DatagramPacket jT808DatagramPacket)
{
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
JT808SessionProducer = jT808SessionProducer;
logger = loggerFactory.CreateLogger<JT808SessionManager>();
this.jT808DatagramPacket = jT808DatagramPacket;
}

public int SessionCount
{
get
{
return Sessions.Count;
}
}
public IJT808Session GetSessionByTerminalPhoneNo(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (Sessions.TryGetValue(terminalPhoneNo, out IJT808Session targetSession))
{
return targetSession;
}
else
{
return default;
}
}
public JT808TcpSession GetTcpSessionByTerminalPhoneNo(string terminalPhoneNo)
{
return (JT808TcpSession)GetSessionByTerminalPhoneNo(terminalPhoneNo);
}
public JT808UdpSession GetUdpSessionByTerminalPhoneNo(string terminalPhoneNo)
{
return (JT808UdpSession)GetSessionByTerminalPhoneNo(terminalPhoneNo);
}
public void Heartbeat(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo)) return;
if (Sessions.TryGetValue(terminalPhoneNo, out IJT808Session oldjT808Session))
{
oldjT808Session.LastActiveTime = DateTime.Now;
Sessions.TryUpdate(terminalPhoneNo, oldjT808Session, oldjT808Session);
}
}
public bool TrySend(string terminalPhoneNo, byte[] data, out string message)
{
bool isSuccessed;
var session = GetSessionByTerminalPhoneNo(terminalPhoneNo);
if (session != null)
{
//判断转发数据是下发不了消息的
if (Sessions.Select(s => s.Value).Count(c => c.Channel.Id == session.Channel.Id) > 1)
{
isSuccessed = false;
message = "not support transmit data send.";
}
else
{
if(session.TransportProtocolType== JT808TransportProtocolType.tcp)
{
session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
isSuccessed = true;
message = "ok";
}
else if (session.TransportProtocolType == JT808TransportProtocolType.udp)
{
isSuccessed = true;
message = "ok";
session.Channel.WriteAndFlushAsync(jT808DatagramPacket.Create(data, ((JT808UdpSession)session).Sender));
}
else
{
isSuccessed = false;
message = "unknow type";
}
}
}
else
{
isSuccessed = false;
message = "offline";
}
return isSuccessed;
}
internal void Send(string terminalPhoneNo, byte[] data)
{
var session = GetSessionByTerminalPhoneNo(terminalPhoneNo);
if (session != null)
{
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else if (session.TransportProtocolType == JT808TransportProtocolType.udp)
{
session.Channel.WriteAndFlushAsync(jT808DatagramPacket.Create(data, ((JT808UdpSession)session).Sender));
}
}
}
public bool TrySend(string terminalPhoneNo, IJT808Reply reply, out string message)
{
bool isSuccessed;
var session = GetSessionByTerminalPhoneNo(terminalPhoneNo);
if (session != null)
{
//判断转发数据是下发不了消息的
if (Sessions.Select(s => s.Value).Count(c => c.Channel.Id == session.Channel.Id) > 1)
{
isSuccessed = false;
message = "not support transmit data send.";
}
else
{
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
isSuccessed = true;
message = "ok";
session.Channel.WriteAndFlushAsync(reply);
}
else if (session.TransportProtocolType == JT808TransportProtocolType.udp)
{
isSuccessed = true;
message = "ok";
session.Channel.WriteAndFlushAsync(jT808DatagramPacket.Create(reply.HexData, ((JT808UdpSession)session).Sender));
}
else
{
isSuccessed = false;
message = "unknow type";
}
}
}
else
{
isSuccessed = false;
message = "offline";
}
return isSuccessed;
}
public void TryAdd(string terminalPhoneNo, IChannel channel)
{
// 解决了设备号跟通道绑定到一起,不需要用到通道本身的SessionId
// 不管设备下发更改了设备终端号,只要是没有在内存中就当是新的
// 存在的问题:
// 1.原先老的如何销毁
// 2.这时候用的通道是相同的,设备终端是不同的
// 当设备主动或者服务器断开以后,可以释放,这点内存忽略不计,况且更改设备号不是很频繁。

//修复第一次通过转发过来的数据,再次通过直连后通道没有改变导致下发不成功,所以每次进行通道的更新操作。
if (Sessions.TryGetValue(terminalPhoneNo, out IJT808Session oldJT808Session))
{
oldJT808Session.LastActiveTime = DateTime.Now;
oldJT808Session.Channel = channel;
Sessions.TryUpdate(terminalPhoneNo, oldJT808Session, oldJT808Session);
}
else
{
JT808TcpSession jT808TcpSession = new JT808TcpSession(channel, terminalPhoneNo);
if (Sessions.TryAdd(terminalPhoneNo, jT808TcpSession))
{
//使用场景:
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline,jT808TcpSession.TerminalPhoneNo);
}
}
}
public void TryAdd(IChannel channel, EndPoint sender, string terminalPhoneNo)
{
//1.先判断是否在缓存里面
if (Sessions.TryGetValue(terminalPhoneNo, out IJT808Session jT808UdpSession))
{
if(jT808UdpSession is JT808UdpSession convertSession)
{
convertSession.LastActiveTime = DateTime.Now;
convertSession.Sender = sender;
convertSession.Channel = channel;
Sessions.TryUpdate(terminalPhoneNo, convertSession, convertSession);
}
}
else
{
//添加缓存
//使用场景:
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅
Sessions.TryAdd(terminalPhoneNo, new JT808UdpSession(channel, sender, terminalPhoneNo));
}
//移动是个大的内网,不跟随下发,根本就发不出来
//移动很多卡,存储的那个socket地址端口,有效期非常短
//不速度快点下发,那个socket地址端口就可能映射到别的对应卡去了
//所以此处采用跟随设备消息下发指令
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline,terminalPhoneNo);
}
public IJT808Session RemoveSession(string terminalPhoneNo)
{
//设备离线可以进行通知
//使用Redis 发布订阅
if (string.IsNullOrEmpty(terminalPhoneNo)) return default;
if (!Sessions.TryGetValue(terminalPhoneNo, out IJT808Session jT808Session))
{
return default;
}
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号
var terminalPhoneNos = Sessions.Where(w => w.Value.Channel.Id == jT808Session.Channel.Id).Select(s => s.Key).ToList();
//2.存在则一个个移除
if (terminalPhoneNos.Count > 1)
{
//3.移除包括当前的设备号
foreach (var key in terminalPhoneNos)
{
Sessions.TryRemove(key, out IJT808Session jT808SessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove.");
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, nos);
return jT808Session;
}
else
{
if (Sessions.TryRemove(terminalPhoneNo, out IJT808Session jT808SessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, terminalPhoneNo);
return jT808SessionRemove;
}
else
{
return default;
}
}
}
public void RemoveSessionByChannel(IChannel channel)
{
//设备离线可以进行通知
//使用Redis 发布订阅
var terminalPhoneNos = Sessions.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var key in terminalPhoneNos)
{
Sessions.TryRemove(key, out IJT808Session jT808SessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, nos);
}
}
public IEnumerable<IJT808Session> GetAll()
{
return Sessions.Select(s => s.Value).ToList();
}
public IEnumerable<JT808TcpSession> GetTcpAll()
{
return Sessions.Select(s => (JT808TcpSession)s.Value).Where(w => w.TransportProtocolType == JT808TransportProtocolType.tcp).ToList();
}
public IEnumerable<JT808UdpSession> GetUdpAll()
{
return Sessions.Select(s => (JT808UdpSession)s.Value).Where(w => w.TransportProtocolType == JT808TransportProtocolType.udp).ToList();
}
}
}

+ 50
- 0
src/JT808.Gateway/Simples/JT808SimpleTcpClient.cs View File

@@ -0,0 +1,50 @@
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.DotNetty.Core
{
internal class JT808SimpleTcpClient
{
private TcpClient tcpClient;

public JT808SimpleTcpClient(IPEndPoint remoteAddress)
{
tcpClient = new TcpClient();
tcpClient.Connect(remoteAddress);
Task.Run(()=> {
while (true)
{
try
{
byte[] buffer = new byte[100];
tcpClient.GetStream().Read(buffer, 0, 100);
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + string.Join(" ", buffer));
}
catch
{

}
Thread.Sleep(1000);
}
});
}

public void WriteAsync(byte[] data)
{
tcpClient.GetStream().WriteAsync(data, 0, data.Length);
}

public void Down()
{
tcpClient.Close();
}
}
}

+ 48
- 0
src/JT808.Gateway/Simples/JT808SimpleUdpClient.cs View File

@@ -0,0 +1,48 @@
using System;
using System.Diagnostics;
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.DotNetty.Core
{
internal class JT808SimpleUdpClient
{
private UdpClient udpClient;

public JT808SimpleUdpClient(IPEndPoint remoteAddress)
{
udpClient = new UdpClient();
udpClient.Connect(remoteAddress);
Task.Run(() =>
{
while (true)
{
try
{
string tmp = string.Join(" ", udpClient.Receive(ref remoteAddress));
Console.WriteLine(Thread.CurrentThread.ManagedThreadId + " " + tmp);
Thread.Sleep(1000);
}
catch
{


}
Thread.Sleep(1000);
}
});
}

public void WriteAsync(byte[] data)
{
udpClient.SendAsync(data, data.Length);
}

public void Down()
{
udpClient.Close();
}
}
}

+ 21
- 0
src/JT808.Gateway/Tcp/JT808TcpDotnettyExtensions.cs View File

@@ -0,0 +1,21 @@
using JT808.Gateway.Codecs;
using JT808.Gateway.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;

namespace JT808.Gateway.Tcp
{
public static class JT808TcpDotnettyExtensions
{
public static IJT808GatewayBuilder AddJT808GatewayTcpHost(this IJT808GatewayBuilder jT808NettyBuilder)
{
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808TcpConnectionHandler>();
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808TcpEncoder>();
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808TcpDecoder>();
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808TcpServerHandler>();
jT808NettyBuilder.JT808Builder.Services.AddHostedService<JT808TcpServerHost>();
return jT808NettyBuilder;
}
}
}

+ 95
- 0
src/JT808.Gateway/Tcp/JT808TcpServerHost.cs View File

@@ -0,0 +1,95 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT808.Gateway.Codecs;
using JT808.Gateway.Configurations;
using JT808.Gateway.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Tcp
{
/// <summary>
/// JT808 Tcp网关服务
/// </summary>
internal class JT808TcpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT808Configuration configuration;
private readonly ILogger<JT808TcpServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;
private IByteBufferAllocator serverBufferAllocator;

public JT808TcpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT808Configuration> jT808ConfigurationAccessor)
{
serviceProvider = provider;
configuration = jT808ConfigurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT808TcpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, configuration.SoBacklog)
.ChildOption(ChannelOption.Allocator, serverBufferAllocator)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
channel.Pipeline.AddLast("jt808TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }),
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag })));
channel.Pipeline.AddLast("jt808TcpDecode", scope.ServiceProvider.GetRequiredService<JT808TcpDecoder>());
channel.Pipeline.AddLast("jt808TcpEncode", scope.ServiceProvider.GetRequiredService<JT808TcpEncoder>());
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler(
configuration.ReaderIdleTimeSeconds,
configuration.WriterIdleTimeSeconds,
configuration.AllIdleTimeSeconds));
channel.Pipeline.AddLast("jt808TcpConnection", scope.ServiceProvider.GetRequiredService<JT808TcpConnectionHandler>());
channel.Pipeline.AddLast("jt808TcpService", scope.ServiceProvider.GetRequiredService<JT808TcpServerHandler>());
}
}));
logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}.");
return bootstrap.BindAsync(configuration.TcpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 22
- 0
src/JT808.Gateway/Udp/JT808UdpDotnettyExtensions.cs View File

@@ -0,0 +1,22 @@
using JT808.Gateway.Codecs;
using JT808.Gateway.Handlers;
using JT808.Gateway.Impls;
using JT808.Gateway.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;

namespace JT808.Gateway.Udp
{
public static class JT808UdpDotnettyExtensions
{
public static IJT808GatewayBuilder AddJT808GatewayUdpHost(this IJT808GatewayBuilder jT808NettyBuilder)
{
jT808NettyBuilder.JT808Builder.Services.TryAddSingleton<IJT808DatagramPacket, JT808DatagramPacketImpl>();
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808UdpDecoder>();
jT808NettyBuilder.JT808Builder.Services.TryAddScoped<JT808UdpServerHandler>();
jT808NettyBuilder.JT808Builder.Services.AddHostedService<JT808UdpServerHost>();
return jT808NettyBuilder;
}
}
}

+ 76
- 0
src/JT808.Gateway/Udp/JT808UdpServerHost.cs View File

@@ -0,0 +1,76 @@
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT808.Gateway.Codecs;
using JT808.Gateway.Configurations;
using JT808.Gateway.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Udp
{
/// <summary>
/// JT808 Udp网关服务
/// </summary>
internal class JT808UdpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT808Configuration configuration;
private readonly ILogger<JT808UdpServerHost> logger;
private MultithreadEventLoopGroup group;
private IChannel bootstrapChannel;

public JT808UdpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT808Configuration> jT808ConfigurationAccessor)
{
serviceProvider = provider;
configuration = jT808ConfigurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT808UdpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
group = new MultithreadEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.Group(group);
bootstrap.Channel<SocketDatagramChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true);
}
bootstrap
.Option(ChannelOption.SoBroadcast, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("jt808UdpDecoder", scope.ServiceProvider.GetRequiredService<JT808UdpDecoder>());
pipeline.AddLast("jt808UdpService", scope.ServiceProvider.GetRequiredService<JT808UdpServerHandler>());
}
}));
logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}.");
return bootstrap.BindAsync(configuration.UdpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

Loading…
Cancel
Save