diff --git a/.github/workflows/dotnetcore.yml b/.github/workflows/dotnetcore.yml index b5a5ef3..26467b8 100644 --- a/.github/workflows/dotnetcore.yml +++ b/.github/workflows/dotnetcore.yml @@ -36,6 +36,6 @@ jobs: - name: dotnet JT808.Gateway restore run: dotnet restore ./src/JT808.Gateway.sln - name: dotnet JT808.Gateway build - run: dotnet build ./src/JT808.Gateway.Test/JT808.Gateway.Test.csproj + run: dotnet build ./src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj - name: dotnet JT808.Gateway test - run: dotnet test ./src/JT808.Gateway.Test/JT808.Gateway.Test.csproj + run: dotnet test ./src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj diff --git a/README.md b/README.md index 490b51a..42ca695 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,9 @@ [了解JTNE协议进这边](https://github.com/SmallChi/JTNewEnergy) - [玩一玩压力测试](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/README.md) + [玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md) -[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE)[![Github Build status](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg)]() +[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)[![Github Build status](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg)]() ## 新网关的优势 @@ -25,7 +25,7 @@ ## 设计模型 -![design_model](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/design_model.png) +![design_model](https://github.com/SmallChi/JT808Gateway/blob/master/doc/img/design_model.png) ## 集成接口功能 @@ -85,8 +85,9 @@ | --------------------- | -------------------------------------------------- | --------------------------------------------------- | | Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) | | Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | -| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | +| Install-Package JT808.Gateway.Client| ![JT808.Gateway.Client](https://img.shields.io/nuget/v/JT808.Gateway.Client.svg) | ![JT808.Gateway.Client](https://img.shields.io/nuget/dt/JT808.Gateway.Client.svg) | | Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) | +| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | | Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) | | Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)| | Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)| @@ -94,106 +95,7 @@ | Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)| | Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)| -## 举个栗子1 - -``` demo1 -static async Task Main(string[] args) -{ - var serverHostBuilder = new HostBuilder() - .ConfigureAppConfiguration((hostingContext, config) => - { - config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); - config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); - }) - .ConfigureLogging((context, logging) => - { - logging.AddConsole(); - logging.SetMinimumLevel(LogLevel.Trace); - }) - .ConfigureServices((hostContext, services) => - { - services.AddSingleton(); - services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); - services.AddJT808Configure() - .AddJT808NettyCore(hostContext.Configuration) - .AddJT808TcpNettyHost() - .AddJT808UdpNettyHost() - .AddJT808WebApiNettyHost() - //扩展webapi JT808MsgIdHttpHandlerBase - //.ReplaceMsgIdHandler() - .Builder(); - //添加kafka插件 - //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) - //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) - //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) - //.Builder(); - //使用微软自带的webapi客户端 - //services.AddHttpClient("jt808webapi", c => - //{ - // c.BaseAddress = new Uri("http://localhost:828/"); - // c.DefaultRequestHeaders.Add("token", "123456); - //}) - //.AddTypedClient(); - //var client = services.BuildServiceProvider().GetRequiredService(); - //var result = client.GetTcpAtomicCounter(); - }); - - await serverHostBuilder.RunConsoleAsync(); -} -``` - -如图所示: -![demo1](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo1.png) - -## 举个栗子2 - -``` 1 -static async Task Main(string[] args) -{ - var serverHostBuilder = new HostBuilder() - .ConfigureAppConfiguration((hostingContext, config) => - { - config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory) - .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) - .AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true); - }) - .ConfigureLogging((context, logging) => - { - Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}"); - NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config"); - logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true }); - logging.SetMinimumLevel(LogLevel.Trace); - }) - .ConfigureServices((hostContext, services) => - { - services.AddSingleton(); - services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); - services.AddJT808Configure() - //.AddJT808Gateway(options => - //{ - // options.TcpPort = 808; - // options.UdpPort = 808; - //}) - .AddJT808Gateway(hostContext.Configuration) - .AddTcp() - .AddUdp() - .AddGrpc() - //InMemoryMQ - .AddJT808ServerInMemoryMQ() - .AddJT808InMemoryReplyMessage() - //kafka插件 - //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) - //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) - //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) - ; - //services.AddHostedService(); - }); - - await serverHostBuilder.RunConsoleAsync(); -} -``` - -## 举个栗子3 +## 举个栗子 1.打开项目进行还原编译生成 @@ -202,4 +104,4 @@ static async Task Main(string[] args) 3.进入JT808.DotNetty.SimpleClient项目下的Debug目录运行客户端 如图所示: -![demo2](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo2.png) +![demo2](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/demo2.png) diff --git a/doc/README.md b/doc/README.md index 0be2127..11d2e98 100644 --- a/doc/README.md +++ b/doc/README.md @@ -1,31 +1,15 @@ # 压力测试 -## 基于DotNetty - -[感谢泥水佬提供的压力测试工具](https://www.cnblogs.com/smark/p/4496660.html?utm_source=tuicool) - -| 操作系统 | 配置 | 使用 | -|:-------:|:-------:|:-------:| -| win server 2016 | 4c8g | 压力测试客户端 | -| centos7 | 4c8g | JT808服务端 | - -![performance_1000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_1000.png) - -![performance_2000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_2000.png) - -![performance_5000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_5000.png) - -![performance_10000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_10000.png) - ## 基于pipeline -**由于【参数配置】不同导致测试的效果可能不同,只是谁便测试玩玩,反正机器便宜。** +**只是谁便测试玩玩,反正机器便宜。** > 注意1:连接数和并发数要区分开; -> 注意2:阿里云的机器默认有连接数限制(5000),可以先创建一台,把该装的软件安装好,tcp参数内核调优后,在备份一个系统镜像在玩。 +> 注意2:阿里云的机器默认有连接数限制(5000),可以先创建一台,把该装的软件安装好,tcp参数内核调优后,在备份一个系统镜像在玩; +> 注意3: 使用的是内存队列(InMemoryMQ)进行测试。 ``` 1 -使用PM2托管 +//使用PM2托管 //服务端 cd /data/JT808.Gateway @@ -35,6 +19,7 @@ pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Productio cd /data/JT808Client pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log" +//可选的 修改wwwroot下index.html的webapi接口地址 127.0.0.1:15004/index.html ``` @@ -55,11 +40,22 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ "IP": "", "Port": 808, "DeviceCount": 10000, - "Interval": 10, + "Interval": 1000, "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } ``` +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 80960, + "SoBacklog": 102400 + } +``` + ![server_proccess_10k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_10k.png) ![server_network_10k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_10k.png) @@ -82,18 +78,29 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ "IP": "", "Port": 808, "DeviceCount": 20000, - "Interval": 300, + "Interval": 1000, "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } ``` +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 80960, + "SoBacklog": 102400 + } +``` + ![server_proccess_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_20k.png) ![server_network_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_20k.png) ![client_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_20k.png) -### 40K +### 30K | 操作系统 | 配置 | 使用 | |:-------:|:-------:|:-------:| @@ -108,39 +115,130 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ "ClientBenchmarkOptions": { "IP": "", "Port": 808, - "DeviceCount": 40000, + "DeviceCount": 30000, + "Interval": 1000, + "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 80960, + "SoBacklog": 102400 + } +``` + +![server_proccess_30k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_30k.png) + +![server_network_30k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_30k.png) + +![client_30k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_30k.png) + +### 40K + +| 操作系统 | 配置 | 使用 | +|:-------:|:-------:|:-------:| +| centos7 | 8c16g | JT808服务端 | +| centos7 | 8c16g | JT808客户端 | + +> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS + +客户端1参数配置appsettings.json + +``` 1 + "urls": "http://*:15004;", + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 20000, "Interval": 1000, "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } + 修改wwwroot下index.html的webapi接口地址 + 127.0.0.1:15004/index.html +``` + +``` 2 + "urls": "http://*:15005;", + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 20000, + "Interval": 1000, + "DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } + 修改wwwroot下index.html的webapi接口地址 + 127.0.0.1:15005/index.html +``` + +> 一个进程的线程是有限的,所以分两个进程进行测试 + +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 80960, + "SoBacklog": 102400 + } ``` ![server_proccess_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_40k.png) ![server_network_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_40k.png) -![client_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_40k.png) +> 由于资源被占满了,所以客户端的界面访问不到,但是不影响总体。 ### 60K | 操作系统 | 配置 | 使用 | |:-------:|:-------:|:-------:| | centos7 | 8c16g | JT808服务端 | -| centos7 | 8c16g | JT808客户端 | +| centos7 | 8c16g | JT808客户端1 | +| centos7 | 8c16g | JT808客户端2 | > 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS -客户端参数配置appsettings.json +客户端1参数配置appsettings.json ``` 1 "ClientBenchmarkOptions": { "IP": "", "Port": 808, - "DeviceCount": 60000, + "DeviceCount": 30000, "Interval": 1000, "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } ``` +客户端2参数配置appsettings.json + +``` 2 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 30000, + "Interval": 1000, + "DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 102400, + "SoBacklog": 102400 + } +``` + ![server_proccess_60k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_60k.png) ![server_network_60k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_60k.png) @@ -151,10 +249,13 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | 操作系统 | 配置 | 使用 | |:-------:|:-------:|:-------:| -| centos7 | 8c16g | JT808服务端 | +| centos7 | 16c24g | JT808服务端 | +| centos7 | 8c16g | JT808客户端 | +| centos7 | 8c16g | JT808客户端 | | centos7 | 8c16g | JT808客户端 | | centos7 | 8c16g | JT808客户端 | +> 计算网络增强型 sn1ne ecs.sn1ne.3xlarge 12 vCPU 24 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2.5 Gbps 130 万 PPS > 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS 客户端1的参数配置appsettings.json @@ -163,8 +264,8 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ "ClientBenchmarkOptions": { "IP": "", "Port": 808, - "DeviceCount": 40000, - "Interval": 3000, + "DeviceCount": 20000, + "Interval": 1000, "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } ``` @@ -175,14 +276,196 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ "ClientBenchmarkOptions": { "IP": "", "Port": 808, - "DeviceCount": 40000, - "Interval": 3000, + "DeviceCount": 20000, + "Interval": 1000, "DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 } ``` +客户端3的参数配置appsettings.json + +``` 3 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 20000, + "Interval": 1000, + "DeviceTemplate": 300000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +客户端3的参数配置appsettings.json + +``` 4 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 20000, + "Interval": 1000, + "DeviceTemplate": 400000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 204800, + "SoBacklog": 204800 + } +``` + ![server_proccess_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_80k.png) ![server_network_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_80k.png) -![client_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_80k.png) \ No newline at end of file +### 100K + +| 操作系统 | 配置 | 使用 | +|:-------:|:-------:|:-------:| +| centos7 | 16c24g | JT808服务端 | +| centos7 | 8c16g | JT808客户端 | +| centos7 | 8c16g | JT808客户端 | +| centos7 | 8c16g | JT808客户端 | +| centos7 | 8c16g | JT808客户端 | + +> 计算网络增强型 sn1ne ecs.sn1ne.3xlarge 12 vCPU 24 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2.5 Gbps 130 万 PPS +> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS + +客户端1的参数配置appsettings.json + +``` 1 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 25000, + "Interval": 1000, + "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +客户端2的参数配置appsettings.json + +``` 2 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 25000, + "Interval": 1000, + "DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +客户端3的参数配置appsettings.json + +``` 3 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 25000, + "Interval": 1000, + "DeviceTemplate": 300000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +客户端3的参数配置appsettings.json + +``` 4 + "ClientBenchmarkOptions": { + "IP": "", + "Port": 808, + "DeviceCount": 25000, + "Interval": 1000, + "DeviceTemplate": 400000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 + } +``` + +服务器参数配置appsettings.json + +``` 1 + "JT808Configuration": { + "TcpPort": 808, + "UdpPort": 808, + "MiniNumBufferSize": 204800, + "SoBacklog": 204800 + } +``` + +![server_proccess_100k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_100k.png) + +![server_network_100k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_100k.png) + +### 参考Centos7内核参数调优 + +``` 99 +vi /etc/security/limits.conf + +修改如下: +root soft nofile 204800 +root hard nofile 204800 +* soft nofile 204800 +* hard nofile 204800 + +vi /etc/pam.d/login +添加如下: +session required pam_limits.so + +vi /etc/sysctl.conf +添加如下: +vm.swappiness = 0 +net.ipv4.neigh.default.gc_stale_time = 120 +# see details in https://help.aliyun.com/knowledge_detail/39428.html +net.ipv4.conf.all.rp_filter = 0 +net.ipv4.conf.default.rp_filter = 0 +net.ipv4.conf.default.arp_announce = 2 +net.ipv4.conf.lo.arp_announce = 2 +net.ipv4.conf.all.arp_announce = 2 +# see details in https://help.aliyun.com/knowledge_detail/41334.html +net.ipv4.tcp_synack_retries = 2 +net.ipv4.tcp_syn_retries = 1 +net.ipv4.tcp_synack_retries = 1 +net.ipv4.tcp_keepalive_time = 600 +net.ipv4.tcp_keepalive_probes = 3 +net.ipv4.tcp_keepalive_intvl =15 +net.ipv4.tcp_retries2 = 5 +net.ipv4.tcp_fin_timeout = 1 +net.ipv4.tcp_max_tw_buckets = 65535 +net.ipv4.tcp_tw_recycle = 1 +net.ipv4.tcp_tw_reuse = 1 +net.ipv4.tcp_max_orphans = 32768 +net.ipv4.tcp_syncookies = 1 +net.ipv4.tcp_max_syn_backlog = 65535 +net.ipv4.tcp_wmem = 8192 131072 16777216 +net.ipv4.tcp_rmem = 32768 131072 16777216 +net.ipv4.tcp_mem = 94500000 915000000 927000000 +net.ipv4.ip_local_port_range = 1024 65000 +net.core.somaxconn = 65535 +net.core.netdev_max_backlog = 65535 +fs.file-max = 265535 +net.ipv6.conf.lo.disable_ipv6 = 1 +kernel.sysrq = 1 +net.ipv6.conf.all.disable_ipv6 = 1 +net.ipv6.conf.default.disable_ipv6 = 1 + +最后重启机器下: +reboot +``` + +## 基于DotNetty + +[感谢泥水佬提供的压力测试工具](https://www.cnblogs.com/smark/p/4496660.html?utm_source=tuicool) + +| 操作系统 | 配置 | 使用 | +|:-------:|:-------:|:-------:| +| win server 2016 | 4c8g | 压力测试客户端 | +| centos7 | 4c8g | JT808服务端 | + +![performance_1000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_1000.png) + +![performance_2000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_2000.png) + +![performance_5000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_5000.png) + +![performance_10000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_10000.png) diff --git a/doc/pipeline/client_10k.png b/doc/pipeline/client_10k.png index 70d54af..12ce724 100644 Binary files a/doc/pipeline/client_10k.png and b/doc/pipeline/client_10k.png differ diff --git a/doc/pipeline/client_20k.png b/doc/pipeline/client_20k.png index 1d132ce..cb71146 100644 Binary files a/doc/pipeline/client_20k.png and b/doc/pipeline/client_20k.png differ diff --git a/doc/pipeline/client_30k.png b/doc/pipeline/client_30k.png new file mode 100644 index 0000000..1fa72eb Binary files /dev/null and b/doc/pipeline/client_30k.png differ diff --git a/doc/pipeline/client_40k.png b/doc/pipeline/client_40k.png deleted file mode 100644 index 8fa106f..0000000 Binary files a/doc/pipeline/client_40k.png and /dev/null differ diff --git a/doc/pipeline/client_60k.png b/doc/pipeline/client_60k.png index 8ef3029..42e7006 100644 Binary files a/doc/pipeline/client_60k.png and b/doc/pipeline/client_60k.png differ diff --git a/doc/pipeline/client_80k.png b/doc/pipeline/client_80k.png deleted file mode 100644 index da88e90..0000000 Binary files a/doc/pipeline/client_80k.png and /dev/null differ diff --git a/doc/pipeline/server_network_100k.png b/doc/pipeline/server_network_100k.png new file mode 100644 index 0000000..85e1ce0 Binary files /dev/null and b/doc/pipeline/server_network_100k.png differ diff --git a/doc/pipeline/server_network_10k.png b/doc/pipeline/server_network_10k.png index 1e34e3f..bbb7a4b 100644 Binary files a/doc/pipeline/server_network_10k.png and b/doc/pipeline/server_network_10k.png differ diff --git a/doc/pipeline/server_network_20k.png b/doc/pipeline/server_network_20k.png index 26ffc34..4a4fd1b 100644 Binary files a/doc/pipeline/server_network_20k.png and b/doc/pipeline/server_network_20k.png differ diff --git a/doc/pipeline/server_network_30k.png b/doc/pipeline/server_network_30k.png new file mode 100644 index 0000000..35d2d8e Binary files /dev/null and b/doc/pipeline/server_network_30k.png differ diff --git a/doc/pipeline/server_network_40k.png b/doc/pipeline/server_network_40k.png index 85e94e5..316930a 100644 Binary files a/doc/pipeline/server_network_40k.png and b/doc/pipeline/server_network_40k.png differ diff --git a/doc/pipeline/server_network_60k.png b/doc/pipeline/server_network_60k.png index 17089e9..0749740 100644 Binary files a/doc/pipeline/server_network_60k.png and b/doc/pipeline/server_network_60k.png differ diff --git a/doc/pipeline/server_network_80k.png b/doc/pipeline/server_network_80k.png index 3e45653..4836250 100644 Binary files a/doc/pipeline/server_network_80k.png and b/doc/pipeline/server_network_80k.png differ diff --git a/doc/pipeline/server_proccess_100k.png b/doc/pipeline/server_proccess_100k.png new file mode 100644 index 0000000..7419e22 Binary files /dev/null and b/doc/pipeline/server_proccess_100k.png differ diff --git a/doc/pipeline/server_proccess_10k.png b/doc/pipeline/server_proccess_10k.png index c9e7949..25f3fa8 100644 Binary files a/doc/pipeline/server_proccess_10k.png and b/doc/pipeline/server_proccess_10k.png differ diff --git a/doc/pipeline/server_proccess_20k.png b/doc/pipeline/server_proccess_20k.png index 0d2a1bd..3c8ee22 100644 Binary files a/doc/pipeline/server_proccess_20k.png and b/doc/pipeline/server_proccess_20k.png differ diff --git a/doc/pipeline/server_proccess_30k.png b/doc/pipeline/server_proccess_30k.png new file mode 100644 index 0000000..fe27955 Binary files /dev/null and b/doc/pipeline/server_proccess_30k.png differ diff --git a/doc/pipeline/server_proccess_40k.png b/doc/pipeline/server_proccess_40k.png index 9fa1040..8474d7e 100644 Binary files a/doc/pipeline/server_proccess_40k.png and b/doc/pipeline/server_proccess_40k.png differ diff --git a/doc/pipeline/server_proccess_60k.png b/doc/pipeline/server_proccess_60k.png index 7da40a8..39bdaf3 100644 Binary files a/doc/pipeline/server_proccess_60k.png and b/doc/pipeline/server_proccess_60k.png differ diff --git a/doc/pipeline/server_proccess_80k.png b/doc/pipeline/server_proccess_80k.png index a56bd5e..e8a8ba4 100644 Binary files a/doc/pipeline/server_proccess_80k.png and b/doc/pipeline/server_proccess_80k.png differ diff --git a/publish.gateway.bat b/publish.gateway.bat index fb3f822..789f888 100644 --- a/publish.gateway.bat +++ b/publish.gateway.bat @@ -1,14 +1,15 @@ -dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj --no-build --output ../../nupkgs +dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Client\JT808.Gateway.Client.csproj -c Release --output nupkgs echo 'push service pacakge...' -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.MsgIdHandler\JT808.Gateway.MsgIdHandler.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.MsgLogging\JT808.Gateway.MsgLogging.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.ReplyMessage\JT808.Gateway.ReplyMessage.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.SessionNotice\JT808.Gateway.SessionNotice.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.Traffic\JT808.Gateway.Traffic.csproj --no-build --output ../../nupkgs -dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.Transmit\JT808.Gateway.Transmit.csproj --no-build --output ../../nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.MsgIdHandler\JT808.Gateway.MsgIdHandler.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.MsgLogging\JT808.Gateway.MsgLogging.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.ReplyMessage\JT808.Gateway.ReplyMessage.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.SessionNotice\JT808.Gateway.SessionNotice.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.Traffic\JT808.Gateway.Traffic.csproj -c Release --output nupkgs +dotnet pack .\src\JT808.Gateway.Services\JT808.Gateway.Transmit\JT808.Gateway.Transmit.csproj -c Release --output nupkgs pause \ No newline at end of file diff --git a/src/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj index 91f11a0..3045711 100644 --- a/src/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj +++ b/src/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj @@ -22,7 +22,9 @@ - + + + Always diff --git a/src/JT808.Gateway.CleintBenchmark/Program.cs b/src/JT808.Gateway.CleintBenchmark/Program.cs index b5d2b86..9bfc9e4 100644 --- a/src/JT808.Gateway.CleintBenchmark/Program.cs +++ b/src/JT808.Gateway.CleintBenchmark/Program.cs @@ -68,7 +68,7 @@ namespace JT808.Gateway.CleintBenchmark services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT808Configure() - .AddJT808Client(); + .AddClient(); services.AddHostedService(); }); await serverHostBuilder.RunConsoleAsync(); diff --git a/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs b/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs index ae064dd..94c64b1 100644 --- a/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs +++ b/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs @@ -25,6 +25,8 @@ namespace JT808.Gateway.CleintBenchmark.Services private readonly IJT808TcpClientFactory jT808TcpClientFactory; + private TaskFactory taskFactory; + public CleintBenchmarkHostedService( ILoggerFactory loggerFactory, IJT808TcpClientFactory jT808TcpClientFactory, @@ -33,6 +35,7 @@ namespace JT808.Gateway.CleintBenchmark.Services this.jT808TcpClientFactory = jT808TcpClientFactory; clientBenchmarkOptions = clientBenchmarkOptionsAccessor.Value; logger = loggerFactory.CreateLogger("CleintBenchmarkHostedService"); + } public Task StartAsync(CancellationToken cancellationToken) { @@ -41,24 +44,21 @@ namespace JT808.Gateway.CleintBenchmark.Services ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads); logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}"); logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}"); + taskFactory = new TaskFactory(cancellationToken); for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++) { - string deviceNo = (i + 1 + clientBenchmarkOptions.DeviceTemplate).ToString(); - var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo, - clientBenchmarkOptions.IP, - clientBenchmarkOptions.Port), cancellationToken); - } - ThreadPool.QueueUserWorkItem((state) => - { - while (!cancellationToken.IsCancellationRequested) - { - foreach (var item in jT808TcpClientFactory.GetAll()) + taskFactory.StartNew(async (state) => { + string deviceNo = ((int)state + 1 + clientBenchmarkOptions.DeviceTemplate).ToString(); + var client = await jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo, + clientBenchmarkOptions.IP, + clientBenchmarkOptions.Port), cancellationToken); + while (!cancellationToken.IsCancellationRequested) { try { int lat = new Random(1000).Next(100000, 180000); int Lng = new Random(1000).Next(100000, 180000); - item.Send(JT808MsgId.位置信息汇报.Create(item.DeviceConfig.TerminalPhoneNo, new JT808_0x0200() + await client.SendAsync(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200() { Lat = lat, Lng = Lng, @@ -74,10 +74,10 @@ namespace JT808.Gateway.CleintBenchmark.Services { logger.LogError(ex.Message); } + await Task.Delay(clientBenchmarkOptions.Interval); } - Thread.Sleep(clientBenchmarkOptions.Interval); - } - }); + }, i); + } return Task.CompletedTask; } diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj index f1ea1f2..af742a9 100644 --- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj +++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj @@ -30,5 +30,7 @@ - + + + diff --git a/src/JT808.Gateway.Client/JT808ClientExtensions.cs b/src/JT808.Gateway.Client/JT808ClientExtensions.cs index 7c249c8..c673447 100644 --- a/src/JT808.Gateway.Client/JT808ClientExtensions.cs +++ b/src/JT808.Gateway.Client/JT808ClientExtensions.cs @@ -10,7 +10,7 @@ namespace JT808.Gateway.Client { public static class JT808ClientExtensions { - public static IJT808Builder AddJT808Client(this IJT808Builder jT808Builder) + public static IJT808Builder AddClient(this IJT808Builder jT808Builder) { jT808Builder.Services.AddSingleton(); jT808Builder.Services.AddSingleton(); @@ -19,7 +19,7 @@ namespace JT808.Gateway.Client jT808Builder.Services.AddHostedService(); return jT808Builder; } - public static IJT808Builder AddJT808Client(this IJT808Builder jT808Builder, IConfiguration Configuration) + public static IJT808Builder AddClient(this IJT808Builder jT808Builder, IConfiguration Configuration) { jT808Builder.Services.AddSingleton(); jT808Builder.Services.AddSingleton(); @@ -29,7 +29,7 @@ namespace JT808.Gateway.Client return jT808Builder; } - public static IJT808Builder AddJT808Client(this IJT808Builder jT808Builder, Action reportOptions) + public static IJT808Builder AddClient(this IJT808Builder jT808Builder, Action reportOptions) { jT808Builder.Services.AddSingleton(); jT808Builder.Services.AddSingleton(); diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs index 10de68d..82ea98e 100644 --- a/src/JT808.Gateway.Client/JT808TcpClient.cs +++ b/src/JT808.Gateway.Client/JT808TcpClient.cs @@ -176,7 +176,7 @@ namespace JT808.Gateway.Client consumed = buffer.GetPosition(totalConsumed); } } - public void Send(JT808ClientRequest message) + public async ValueTask SendAsync(JT808ClientRequest message) { if (disposed) return; if (IsOpen && socketState) @@ -185,8 +185,9 @@ namespace JT808.Gateway.Client { try { - var sendData = JT808Serializer.SerializeReadOnlySpan(message.Package, minBufferSize: message.MinBufferSize); - clientSocket.Send(sendData); + var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize); + //clientSocket.Send(sendData); + await clientSocket.SendAsync(sendData, SocketFlags.None); SendAtomicCounterService.MsgSuccessIncrement(); } catch (System.Net.Sockets.SocketException ex) diff --git a/src/JT808.Gateway.Client/JT808TcpClientExtensions.cs b/src/JT808.Gateway.Client/JT808TcpClientExtensions.cs index c45f57b..36cf96a 100644 --- a/src/JT808.Gateway.Client/JT808TcpClientExtensions.cs +++ b/src/JT808.Gateway.Client/JT808TcpClientExtensions.cs @@ -6,16 +6,17 @@ using System.Text; using JT808.Protocol.Enums; using JT808.Protocol.Extensions; using JT808.Gateway.Client.Metadata; +using System.Threading.Tasks; namespace JT808.Gateway.Client { public static class JT808TcpClientExtensions { - public static void Send(this JT808TcpClient client, JT808Package package, int minBufferSize = 4096) + public static async ValueTask SendAsync(this JT808TcpClient client, JT808Package package, int minBufferSize = 4096) { package.Header.TerminalPhoneNo = client.DeviceConfig.TerminalPhoneNo; JT808ClientRequest request = new JT808ClientRequest(package, minBufferSize); - client.Send(request); + await client.SendAsync(request); } } diff --git a/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs b/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs index 69a9375..402ca86 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs @@ -1,7 +1,6 @@ using JT808.Gateway.Abstractions; using JT808.Gateway.InMemoryMQ.Services; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; namespace JT808.Gateway.InMemoryMQ { @@ -12,14 +11,14 @@ namespace JT808.Gateway.InMemoryMQ /// /// /// - public static IJT808GatewayBuilder AddJT808ServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder) + public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder) { jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); - jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgConsumer), typeof(JT808MsgConsumer), ServiceLifetime.Singleton)); - jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyProducer), typeof(JT808MsgReplyProducer), ServiceLifetime.Singleton)); - jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); return jT808GatewayBuilder; } } diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs index e58b34d..0ed1ee8 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs @@ -21,11 +21,6 @@ namespace JT808.Gateway.InMemoryMQ.Services await _channel.Writer.WriteAsync((terminalNo, data)); } - public bool TryRead(out (string TerminalNo, byte[] Data) item) - { - return _channel.Reader.TryRead(out item); - } - public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) { return await _channel.Reader.ReadAsync(cancellationToken); diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs index d6ef7fa..028e2fd 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs @@ -20,12 +20,6 @@ namespace JT808.Gateway.InMemoryMQ.Services { await _channel.Writer.WriteAsync((terminalNo, data)); } - - public bool TryRead(out (string TerminalNo, byte[] Data) item) - { - return _channel.Reader.TryRead(out item); - } - public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) { return await _channel.Reader.ReadAsync(cancellationToken); diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs index 0875b36..754c401 100644 --- a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs +++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs @@ -10,7 +10,7 @@ namespace JT808.Gateway.Kafka { public static class JT808ClientKafkaExtensions { - public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder) + public static IJT808ClientBuilder AddClientKafka(this IJT808Builder builder) { return new JT808ClientBuilderDefault(builder); } @@ -47,7 +47,7 @@ namespace JT808.Gateway.Kafka public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) { jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); - jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); return jT808ClientBuilder; } /// diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs index c70a6a0..3d9d9b0 100644 --- a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs +++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs @@ -14,7 +14,7 @@ namespace JT808.Gateway.Kafka /// /// GetSection("JT808MsgProducerConfig") /// - public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + public static IJT808GatewayBuilder AddServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) { jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgProducerConfig")); jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); @@ -26,7 +26,7 @@ namespace JT808.Gateway.Kafka /// /// GetSection("JT808MsgReplyConsumerConfig") /// - public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + public static IJT808GatewayBuilder AddServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) { jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); @@ -38,7 +38,7 @@ namespace JT808.Gateway.Kafka /// /// GetSection("JT808SessionProducerConfig") /// - public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + public static IJT808GatewayBuilder AddServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) { jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionProducerConfig")); jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs index 2675ba8..88befbf 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs @@ -51,7 +51,7 @@ namespace JT808.Gateway.ReplyMessage /// /// /// - public static IJT808GatewayBuilder AddJT808InMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) + public static IJT808GatewayBuilder AddInMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) { jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); diff --git a/src/JT808.Gateway.TestHosting/appsettings.json b/src/JT808.Gateway.TestHosting/appsettings.json deleted file mode 100644 index b81fc6d..0000000 --- a/src/JT808.Gateway.TestHosting/appsettings.json +++ /dev/null @@ -1,8 +0,0 @@ -{ - "JT808Configuration": { - "TcpPort": 808, - "UdpPort": 808, - "MiniNumBufferSize": 51200, - "SoBacklog": 65535 - } -} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj new file mode 100644 index 0000000..0c9eaf0 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj @@ -0,0 +1,20 @@ + + + + netcoreapp3.1 + + false + + + + + + + + + + + + + + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808MsgServiceTest.cs b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808MsgServiceTest.cs new file mode 100644 index 0000000..84bd004 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808MsgServiceTest.cs @@ -0,0 +1,22 @@ +using JT808.Gateway.InMemoryMQ.Services; +using System; +using System.Collections.Concurrent; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace JT808.Gateway.InMemoryMQ.Test.Services +{ + public class JT808MsgServiceTest + { + [Fact] + public void Test1() + { + JT808MsgService jT808MsgService = new JT808MsgService(); + jT808MsgService.WriteAsync("132", new byte[] { 1, 2, 3 }).GetAwaiter().GetResult(); + var result = jT808MsgService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); + Assert.Equal("132", result.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3 }, result.Data); + } + } +} diff --git a/src/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj similarity index 92% rename from src/JT808.Gateway.Test/JT808.Gateway.Test.csproj rename to src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj index e2363f8..63742bf 100644 --- a/src/JT808.Gateway.Test/JT808.Gateway.Test.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj @@ -22,7 +22,7 @@ - + diff --git a/src/JT808.Gateway.Test/PipeTest.cs b/src/JT808.Gateway.Tests/JT808.Gateway.Test/PipeTest.cs similarity index 100% rename from src/JT808.Gateway.Test/PipeTest.cs rename to src/JT808.Gateway.Tests/JT808.Gateway.Test/PipeTest.cs diff --git a/src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs b/src/JT808.Gateway.Tests/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs similarity index 96% rename from src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs rename to src/JT808.Gateway.Tests/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs index eccf00e..fc0c638 100644 --- a/src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs @@ -158,7 +158,7 @@ namespace JT808.Gateway.Test.Session var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); var result1 = jT808SessionManager.TryAdd(session); jT808SessionManager.TryLink(tno, session); - jT808SessionManager.TrySendByTerminalPhoneNo(tno, new byte[] { 0x7e, 0, 0, 0x7e }); + jT808SessionManager.TrySendByTerminalPhoneNoAsync(tno, new byte[] { 0x7e, 0, 0, 0x7e }).GetAwaiter().GetResult(); }); } @@ -180,8 +180,8 @@ namespace JT808.Gateway.Test.Session Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno1)); Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno2)); var sessions = jT808SessionManager.GetTcpAll(); - Assert.Equal(session1.SessionID, sessions[0].SessionID); - Assert.Equal(session2.SessionID, sessions[1].SessionID); + Assert.Contains(sessions, (item) => item.SessionID == session1.SessionID); + Assert.Contains(sessions, (item) => item.SessionID == session2.SessionID); } } } diff --git a/src/JT808.Gateway.TestHosting/Configs/NLog.xsd b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/NLog.xsd similarity index 100% rename from src/JT808.Gateway.TestHosting/Configs/NLog.xsd rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/NLog.xsd diff --git a/src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/nlog.Unix.config similarity index 100% rename from src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/nlog.Unix.config diff --git a/src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config similarity index 100% rename from src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config diff --git a/src/JT808.Gateway.TestHosting/Dockerfile b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Dockerfile similarity index 100% rename from src/JT808.Gateway.TestHosting/Dockerfile rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Dockerfile diff --git a/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj similarity index 64% rename from src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj index 2c5c190..346ce12 100644 --- a/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj @@ -4,6 +4,8 @@ Exe netcoreapp3.1 Linux + @@ -14,11 +16,11 @@ - - - - - + + + + + diff --git a/src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs similarity index 100% rename from src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs diff --git a/src/JT808.Gateway.TestHosting/Jobs/UpJob.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/UpJob.cs similarity index 94% rename from src/JT808.Gateway.TestHosting/Jobs/UpJob.cs rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/UpJob.cs index 3b7e1c8..bb2437e 100644 --- a/src/JT808.Gateway.TestHosting/Jobs/UpJob.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/UpJob.cs @@ -38,7 +38,7 @@ namespace JT808.Gateway.TestHosting.Jobs { int lat = new Random(1000).Next(100000, 180000); int Lng = new Random(1000).Next(100000, 180000); - client.Send(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200() + client.SendAsync(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200() { Lat = lat, Lng = Lng, diff --git a/src/JT808.Gateway.TestHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs similarity index 75% rename from src/JT808.Gateway.TestHosting/Program.cs rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs index 885e839..e12423a 100644 --- a/src/JT808.Gateway.TestHosting/Program.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs @@ -37,23 +37,24 @@ namespace JT808.Gateway.TestHosting services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT808Configure() - //.AddJT808Client() - //.AddJT808Gateway(options => - //{ - // options.TcpPort = 808; - // options.UdpPort = 808; - //}) - .AddJT808Gateway(hostContext.Configuration) + //添加客户端工具 + //.AddClient() + //.AddGateway(options => + ////{ + //// options.TcpPort = 808; + //// options.UdpPort = 808; + ////}) + .AddGateway(hostContext.Configuration) .AddTcp() .AddUdp() .AddGrpc() //InMemoryMQ - .AddJT808ServerInMemoryMQ() - .AddJT808InMemoryReplyMessage() + .AddServerInMemoryMQ() + .AddInMemoryReplyMessage() //kafka插件 - //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) - //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) - //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) + //.AddServerKafkaMsgProducer(hostContext.Configuration) + //.AddServerKafkaMsgReplyConsumer(hostContext.Configuration) + //.AddServerKafkaSessionProducer(hostContext.Configuration) ; //grpc客户端调用 //services.AddHostedService(); diff --git a/src/JT808.Gateway.TestHosting/startup.txt b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/startup.txt similarity index 100% rename from src/JT808.Gateway.TestHosting/startup.txt rename to src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/startup.txt diff --git a/src/JT808.Gateway.sln b/src/JT808.Gateway.sln index 8ea63be..4c077e8 100644 --- a/src/JT808.Gateway.sln +++ b/src/JT808.Gateway.sln @@ -5,8 +5,6 @@ VisualStudioVersion = 16.0.29409.12 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway", "JT808.Gateway\JT808.Gateway.csproj", "{4C8A2546-8333-416D-B123-91062B630087}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{AE40AFE0-0950-442C-A74C-10CDF53E9F36}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Kafka", "JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj", "{274C048E-A8E3-4422-A578-A10A97DF36F2}" @@ -31,10 +29,14 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Client", "JT8 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{7CBAACEE-19BF-499A-8C41-36A1324D45E9}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808.Gateway.Test\JT808.Gateway.Test.csproj", "{1E230D1B-BDFC-4D0A-9B34-592280A723BD}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.CleintBenchmark", "JT808.Gateway.CleintBenchmark\JT808.Gateway.CleintBenchmark.csproj", "{0FBC9E4F-8585-4820-ACF1-145A14B3A727}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808.Gateway.Tests\JT808.Gateway.Test\JT808.Gateway.Test.csproj", "{E3DC260E-0B55-4993-B051-402E44D4E883}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.Tests\JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{69C815FE-3C32-473E-99C9-F3C4B3BCFF81}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.InMemoryMQ.Test", "JT808.Gateway.Tests\JT808.Gateway.InMemoryMQ.Test\JT808.Gateway.InMemoryMQ.Test.csproj", "{E103E89D-3069-4AAE-99CE-2AD633AD351E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -45,10 +47,6 @@ Global {4C8A2546-8333-416D-B123-91062B630087}.Debug|Any CPU.Build.0 = Debug|Any CPU {4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.ActiveCfg = Release|Any CPU {4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.Build.0 = Release|Any CPU - {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.Build.0 = Debug|Any CPU - {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.ActiveCfg = Release|Any CPU - {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.Build.0 = Release|Any CPU {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -89,27 +87,36 @@ Global {AC3070AC-A938-4213-A562-C079BB4A3F9E}.Debug|Any CPU.Build.0 = Debug|Any CPU {AC3070AC-A938-4213-A562-C079BB4A3F9E}.Release|Any CPU.ActiveCfg = Release|Any CPU {AC3070AC-A938-4213-A562-C079BB4A3F9E}.Release|Any CPU.Build.0 = Release|Any CPU - {1E230D1B-BDFC-4D0A-9B34-592280A723BD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {1E230D1B-BDFC-4D0A-9B34-592280A723BD}.Debug|Any CPU.Build.0 = Debug|Any CPU - {1E230D1B-BDFC-4D0A-9B34-592280A723BD}.Release|Any CPU.ActiveCfg = Release|Any CPU - {1E230D1B-BDFC-4D0A-9B34-592280A723BD}.Release|Any CPU.Build.0 = Release|Any CPU {0FBC9E4F-8585-4820-ACF1-145A14B3A727}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {0FBC9E4F-8585-4820-ACF1-145A14B3A727}.Debug|Any CPU.Build.0 = Debug|Any CPU {0FBC9E4F-8585-4820-ACF1-145A14B3A727}.Release|Any CPU.ActiveCfg = Release|Any CPU {0FBC9E4F-8585-4820-ACF1-145A14B3A727}.Release|Any CPU.Build.0 = Release|Any CPU + {E3DC260E-0B55-4993-B051-402E44D4E883}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E3DC260E-0B55-4993-B051-402E44D4E883}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E3DC260E-0B55-4993-B051-402E44D4E883}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E3DC260E-0B55-4993-B051-402E44D4E883}.Release|Any CPU.Build.0 = Release|Any CPU + {69C815FE-3C32-473E-99C9-F3C4B3BCFF81}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {69C815FE-3C32-473E-99C9-F3C4B3BCFF81}.Debug|Any CPU.Build.0 = Debug|Any CPU + {69C815FE-3C32-473E-99C9-F3C4B3BCFF81}.Release|Any CPU.ActiveCfg = Release|Any CPU + {69C815FE-3C32-473E-99C9-F3C4B3BCFF81}.Release|Any CPU.Build.0 = Release|Any CPU + {E103E89D-3069-4AAE-99CE-2AD633AD351E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {E103E89D-3069-4AAE-99CE-2AD633AD351E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {E103E89D-3069-4AAE-99CE-2AD633AD351E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {E103E89D-3069-4AAE-99CE-2AD633AD351E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution - {AE40AFE0-0950-442C-A74C-10CDF53E9F36} = {7CBAACEE-19BF-499A-8C41-36A1324D45E9} {D62E3054-6924-4F1A-9BEF-E52B191F16B6} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} {A242A839-4F00-4434-A7E8-7E3BEBA5B75C} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} {1CB84599-5F56-4461-A451-DF16E3854AB9} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} {604BB5CF-9ED1-4D78-9328-59436E2B4EB4} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} {598E445A-AF2E-42F0-98F4-18EC22E473FC} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} {8FCC6D65-8A49-4AE7-8B19-F255100849D6} = {3EF8490D-C993-49D8-8A3D-493B7F259D70} - {1E230D1B-BDFC-4D0A-9B34-592280A723BD} = {7CBAACEE-19BF-499A-8C41-36A1324D45E9} + {E3DC260E-0B55-4993-B051-402E44D4E883} = {7CBAACEE-19BF-499A-8C41-36A1324D45E9} + {69C815FE-3C32-473E-99C9-F3C4B3BCFF81} = {7CBAACEE-19BF-499A-8C41-36A1324D45E9} + {E103E89D-3069-4AAE-99CE-2AD633AD351E} = {7CBAACEE-19BF-499A-8C41-36A1324D45E9} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {AA9303A7-6FB3-4572-88AA-3302E85330D1} diff --git a/src/JT808.Gateway/JT808GatewayExtensions.cs b/src/JT808.Gateway/JT808GatewayExtensions.cs index 884a3e7..2c0f1f5 100644 --- a/src/JT808.Gateway/JT808GatewayExtensions.cs +++ b/src/JT808.Gateway/JT808GatewayExtensions.cs @@ -16,7 +16,7 @@ namespace JT808.Gateway { public static partial class JT808GatewayExtensions { - public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder,Action config) + public static IJT808GatewayBuilder AddGateway(this IJT808Builder jt808Builder,Action config) { IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); server.JT808Builder.Services.Configure(config); @@ -24,7 +24,7 @@ namespace JT808.Gateway return server; } - public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder, IConfiguration configuration) + public static IJT808GatewayBuilder AddGateway(this IJT808Builder jt808Builder, IConfiguration configuration) { IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); server.JT808Builder.Services.Configure(configuration.GetSection("JT808Configuration")); diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index 04c056a..2b40548 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -21,7 +21,7 @@ using Microsoft.Extensions.Options; namespace JT808.Gateway { - public class JT808TcpServer:IHostedService + public class JT808TcpServer : IHostedService { private readonly Socket server; @@ -44,34 +44,38 @@ namespace JT808.Gateway JT808SessionManager jT808SessionManager, IJT808MsgProducer jT808MsgProducer, JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory) - { - SessionManager = jT808SessionManager; - Logger = loggerFactory.CreateLogger("JT808TcpServer"); - Serializer = jT808Config.GetSerializer(); - MsgProducer = jT808MsgProducer; - AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp); - Configuration = jT808ConfigurationAccessor.Value; - var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort); - server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); - server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); - server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); - server.LingerState = new LingerOption(false, 0); - server.Bind(IPEndPoint); - server.Listen(Configuration.SoBacklog); - } + { + + SessionManager = jT808SessionManager; + Logger = loggerFactory.CreateLogger("JT808TcpServer"); + Serializer = jT808Config.GetSerializer(); + MsgProducer = jT808MsgProducer; + AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp); + Configuration = jT808ConfigurationAccessor.Value; + var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort); + server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); + server.LingerState = new LingerOption(false, 0); + server.Bind(IPEndPoint); + server.Listen(Configuration.SoBacklog); + } public Task StartAsync(CancellationToken cancellationToken) { Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{Configuration.TcpPort}."); - Task.Run(async() => { + Task.Factory.StartNew(async () => + { while (!cancellationToken.IsCancellationRequested) { var socket = await server.AcceptAsync(); JT808TcpSession jT808TcpSession = new JT808TcpSession(socket); + SessionManager.TryAdd(jT808TcpSession); await Task.Factory.StartNew(async (state) => { var session = (JT808TcpSession)state; - SessionManager.TryAdd(session); if (Logger.IsEnabled(LogLevel.Information)) { Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); @@ -86,7 +90,7 @@ namespace JT808.Gateway }, cancellationToken); return Task.CompletedTask; } - private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer) + private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer) { while (true) { @@ -101,7 +105,7 @@ namespace JT808.Gateway } writer.Advance(bytesRead); } - catch(OperationCanceledException ex) + catch (OperationCanceledException ex) { Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); break; @@ -126,7 +130,7 @@ namespace JT808.Gateway } writer.Complete(); } - private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader) + private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader) { while (true) { @@ -143,13 +147,13 @@ namespace JT808.Gateway if (result.IsCanceled) break; if (buffer.Length > 0) { - ReaderBuffer(ref buffer, session,out consumed, out examined); + ReaderBuffer(ref buffer, session, out consumed, out examined); } } #pragma warning disable CA1031 // Do not catch general exception types catch (Exception ex) { - SessionManager.RemoveBySessionId(session.SessionID); + Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}"); break; } #pragma warning restore CA1031 // Do not catch general exception types @@ -160,7 +164,7 @@ namespace JT808.Gateway } reader.Complete(); } - private void ReaderBuffer(ref ReadOnlySequence buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined) + private void ReaderBuffer(ref ReadOnlySequence buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined) { consumed = buffer.Start; examined = buffer.End; @@ -177,22 +181,22 @@ namespace JT808.Gateway { if (mark == 1) { - ReadOnlySpan contentSpan=ReadOnlySpan.Empty; + ReadOnlySpan contentSpan = ReadOnlySpan.Empty; try { contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan; - var package = Serializer.HeaderDeserialize(contentSpan,minBufferSize:10240); + var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240); AtomicCounterService.MsgSuccessIncrement(); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); SessionManager.TryLink(package.Header.TerminalPhoneNo, session); - MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); + MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); } catch (JT808Exception ex) { AtomicCounterService.MsgFailIncrement(); if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); - Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}"); + Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}"); } totalConsumed += (seqReader.Consumed - totalConsumed); if (seqReader.End) break; @@ -206,7 +210,7 @@ namespace JT808.Gateway seqReader.Advance(1); } } - if (seqReader.Length== totalConsumed) + if (seqReader.Length == totalConsumed) { examined = consumed = buffer.End; } diff --git a/src/JT808.Gateway/Services/JT808GatewayService.cs b/src/JT808.Gateway/Services/JT808GatewayService.cs index cf0b7d5..32e541c 100644 --- a/src/JT808.Gateway/Services/JT808GatewayService.cs +++ b/src/JT808.Gateway/Services/JT808GatewayService.cs @@ -93,17 +93,17 @@ namespace JT808.Gateway.Services return Task.FromResult(reply); } - public override Task UnificationSend(UnificationSendRequest request, ServerCallContext context) + public override async Task UnificationSend(UnificationSendRequest request, ServerCallContext context) { Auth(context); try { - var flag = jT808SessionManager.TrySendByTerminalPhoneNo(request.TerminalPhoneNo, request.Data.ToByteArray()); - return Task.FromResult(new UnificationSendReply { Success = flag }); + var flag = await jT808SessionManager.TrySendByTerminalPhoneNoAsync(request.TerminalPhoneNo, request.Data.ToByteArray()); + return new UnificationSendReply { Success = flag }; } catch (Exception) { - return Task.FromResult(new UnificationSendReply { Success = false }); + return new UnificationSendReply { Success = false }; } } diff --git a/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs index 3d5ed4f..6af5c31 100644 --- a/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs +++ b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs @@ -2,6 +2,9 @@ using JT808.Gateway.Configurations; using JT808.Gateway.Session; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; @@ -12,6 +15,7 @@ namespace JT808.Gateway.Services private readonly JT808SessionManager JT808SessionManager; private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer; + public JT808MsgReplyHostedService( IJT808MsgReplyConsumer jT808MsgReplyConsumer, JT808SessionManager jT808SessionManager) @@ -22,9 +26,9 @@ namespace JT808.Gateway.Services public Task StartAsync(CancellationToken cancellationToken) { - JT808MsgReplyConsumer.OnMessage(item => + JT808MsgReplyConsumer.OnMessage(async(item) => { - JT808SessionManager.TrySendByTerminalPhoneNo(item.TerminalNo, item.Data); + await JT808SessionManager.TrySendByTerminalPhoneNoAsync(item.TerminalNo, item.Data); }); JT808MsgReplyConsumer.Subscribe(); return Task.CompletedTask; diff --git a/src/JT808.Gateway/Session/JT808SessionManager.cs b/src/JT808.Gateway/Session/JT808SessionManager.cs index 9acb4b9..ddb1289 100644 --- a/src/JT808.Gateway/Session/JT808SessionManager.cs +++ b/src/JT808.Gateway/Session/JT808SessionManager.cs @@ -8,6 +8,7 @@ using System.Collections.Generic; using System.Linq; using System.Net; using System.Net.Sockets; +using System.Threading.Tasks; namespace JT808.Gateway.Session { @@ -66,36 +67,40 @@ namespace JT808.Gateway.Session internal void TryLink(string terminalPhoneNo, IJT808Session session) { DateTime curretDatetime= DateTime.Now; - session.ActiveTime = curretDatetime; - session.TerminalPhoneNo = terminalPhoneNo; - Sessions.TryUpdate(session.SessionID, session, session); - TerminalPhoneNoSessions.AddOrUpdate(terminalPhoneNo, session, (key, oldValue)=> + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out IJT808Session cacheSession)) { - if(session.SessionID!= oldValue.SessionID) + if (session.SessionID != cacheSession.SessionID) { + //从转发到直连的数据需要更新缓存 + session.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession); //会话通知 - JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, key); - return session; + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); } else { - oldValue.StartTime = curretDatetime; + cacheSession.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, cacheSession, cacheSession); } - return oldValue; - }); + } + else + { + if(TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) + { + //会话通知 + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); + } + } } public IJT808Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint) { if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT808Session currentSession)) { - if (Sessions.TryGetValue(currentSession.SessionID, out IJT808Session sessionInfo)) - { - sessionInfo.ActiveTime = DateTime.Now; - sessionInfo.TerminalPhoneNo = terminalPhoneNo; - sessionInfo.RemoteEndPoint = remoteEndPoint; - Sessions.TryUpdate(currentSession.SessionID, sessionInfo, sessionInfo); - } + currentSession.ActiveTime = DateTime.Now; + currentSession.TerminalPhoneNo = terminalPhoneNo; + currentSession.RemoteEndPoint = remoteEndPoint; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, currentSession, currentSession); } else { @@ -118,17 +123,17 @@ namespace JT808.Gateway.Session return Sessions.TryAdd(session.SessionID, session); } - public bool TrySendByTerminalPhoneNo(string terminalPhoneNo, byte[] data) + public async ValueTask TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data) { if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session)) { if (session.TransportProtocolType == JT808TransportProtocolType.tcp) { - session.Client.Send(data, SocketFlags.None); + await session.Client.SendAsync(data, SocketFlags.None); } else { - session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint); + await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); } return true; } @@ -138,17 +143,17 @@ namespace JT808.Gateway.Session } } - public bool TrySendBySessionId(string sessionId, byte[] data) + public async ValueTask TrySendBySessionIdAsync(string sessionId, byte[] data) { if (Sessions.TryGetValue(sessionId, out var session)) { if(session.TransportProtocolType== JT808TransportProtocolType.tcp) { - session.Client.Send(data, SocketFlags.None); + await session.Client.SendAsync(data, SocketFlags.None); } else { - session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint); + await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); } return true; } @@ -208,12 +213,12 @@ namespace JT808.Gateway.Session public List GetTcpAll() { - return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Select(s => (JT808TcpSession)s.Value).ToList(); + return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Select(s => (JT808TcpSession)s.Value).ToList(); } public List GetUdpAll() { - return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Select(s => (JT808UdpSession)s.Value).ToList(); + return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Select(s => (JT808UdpSession)s.Value).ToList(); } } }