From be2ed1af8d576ffc19d96ae2370f0a1cd9a84f97 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Sat, 3 Sep 2022 19:44:27 +0800 Subject: [PATCH] pipeline-1.1.8-preview2 1.Add a paging session query interface to avoid querying all sessions; 2.Replace Task.Run of some jobs with Thread; --- api/README_Pipeline.md | 102 ++++++++++++++++++ simples/JT808.Gateway.SimpleServer/Program.cs | 99 +++++++---------- .../Dtos/JT808PageResult.cs | 16 +++ .../JT808.Gateway.Abstractions.csproj | 2 +- .../JT808.Gateway.Abstractions.xml | 10 ++ .../JT808GatewayConstants.cs | 12 ++- .../JT808.Gateway.CleintBenchmark.csproj | 2 +- .../Services/CleintBenchmarkHostedService.cs | 6 +- .../JT808.Gateway.ServerBenchmark.csproj | 4 +- .../JT808.Gateway.Client.csproj | 2 +- .../Services/JT808RetryClientHostedService.cs | 5 +- .../JT808.Gateway.Kafka.csproj | 2 +- src/JT808.Gateway.Kafka/JT808MsgConsumer.cs | 14 +-- .../JT808MsgReplyConsumer.cs | 14 +-- .../JT808MsgReplyLoggingConsumer.cs | 7 +- .../JT808SessionConsumer.cs | 11 +- .../JT808ReplyMessageHostedService.cs | 2 +- .../Impl/JT808MsgReplyConsumer.cs | 4 +- .../Impl/JT808SessionConsumer.cs | 4 +- .../JT808.Gateway.NormalHosting.csproj | 6 +- .../JT808.Gateway.QueueHosting.csproj | 6 +- .../JT808.Gateway.Test.csproj | 6 +- .../JT808.Gateway.WebApiClientTool.xml | 12 +++ .../JT808HttpClient.cs | 26 +++++ src/JT808.Gateway/JT808.Gateway.xml | 2 + src/JT808.Gateway/JT808WebApi.cs | 91 ++++++++++++++++ .../Session/JT808SessionManager.cs | 21 ++++ src/PipelineInfo.props | 2 +- 28 files changed, 385 insertions(+), 105 deletions(-) create mode 100644 src/JT808.Gateway.Abstractions/Dtos/JT808PageResult.cs diff --git a/api/README_Pipeline.md b/api/README_Pipeline.md index 230acd4..2414df6 100644 --- a/api/README_Pipeline.md +++ b/api/README_Pipeline.md @@ -35,6 +35,7 @@ |请求Url|请求方式|说明| |:------|:------|:------| | 127.0.0.1:828/jt808api/Tcp/Session/GetAll| GET| 基于Tcp管理会话服务-获取会话集合| +| 127.0.0.1:828/jt808api/Tcp/Session/SessionTcpByPage?pageIndex=0&pageSize10| GET| 基于Tcp管理会话服务-获取会话分页集合| | 127.0.0.1:828/jt808api/Tcp/Session/QuerySessionByTerminalPhoneNo| POST| 基于Tcp管理会话服务-通过设备终端号查询对应会话| | 127.0.0.1:828/jt808api/Tcp/Session/RemoveByTerminalPhoneNo| POST| 基于Tcp管理会话服务-通过设备终端号移除对应会话| @@ -43,6 +44,7 @@ |请求Url|请求方式|说明| |:------|:------|:------| | 127.0.0.1:828/jt808api/Udp/Session/GetAll| GET| 基于Udp管理会话服务-获取会话集合| +| 127.0.0.1:828/jt808api/Udp/Session/SessionUdpByPage?pageIndex=0&pageSize10| GET| 基于Tcp管理会话服务-获取会话分页集合| | 127.0.0.1:828/jt808api/Udp/Session/QuerySessionByTerminalPhoneNo| POST| 基于Udp管理会话服务-通过设备终端号查询对应会话| | 127.0.0.1:828/jt808api/Udp/Session/RemoveByTerminalPhoneNo| POST| 基于Udp管理会话服务-通过设备终端号移除对应会话| @@ -225,6 +227,56 @@ } ``` +#### 4.获取会话分页集合 + +请求地址:Tcp/Session/SessionTcpByPage + +请求方式:GET + +请求参数: + +|属性|数据类型|参数说明| +|:------:|:------:|:------| +| pageIndex| int| 当前页(默认0)| +| pageSize| int| 页容量(默认10)| + +返回数据: + +|属性|数据类型|参数说明| +|:------:|:------:|:------| +| Data| List\ | 实际会话信息集合 | +| PageIndex| int | 当前页(默认0) | +| PageSize| int | 页容量(默认10) | +| Total| int | 总数 | + +返回结果: + +``` session1 +{ + "message":null, + "code":200, + "data":{ + "pageIndex":0, + "pageSize":10, + "total":2, + "data":[ + { + "lastActiveTime":"2022-09-03T19:34:07.8733605+08:00", + "startTime":"2022-09-03T19:34:07.8733615+08:00", + "terminalPhoneNo":"123456789012", + "remoteAddressIP":"127.0.0.1:9826" + }, + { + "lastActiveTime":"2022-09-03T19:34:05.135997+08:00", + "startTime":"2022-09-03T19:34:05.136035+08:00", + "terminalPhoneNo":"123456789013", + "remoteAddressIP":"127.0.0.1:9825" + } + ] + } +} +``` + ### 基于Udp管理会话服务 #### 统一会话信息对象返回 JT808UdpSessionInfoDto @@ -343,6 +395,56 @@ } ``` +#### 4.获取会话分页集合 + +请求地址:Udp/Session/SessionUdpByPage + +请求方式:GET + +请求参数: + +|属性|数据类型|参数说明| +|:------:|:------:|:------| +| pageIndex| int| 当前页(默认0)| +| pageSize| int| 页容量(默认10)| + +返回数据: + +|属性|数据类型|参数说明| +|:------:|:------:|:------| +| Data| List\ | 实际会话信息集合 | +| PageIndex| int | 当前页(默认0) | +| PageSize| int | 页容量(默认10) | +| Total| int | 总数 | + +返回结果: + +``` session1 +{ + "message":null, + "code":200, + "data":{ + "pageIndex":0, + "pageSize":10, + "total":2, + "data":[ + { + "lastActiveTime":"2022-09-03T19:34:07.8733605+08:00", + "startTime":"2022-09-03T19:34:07.8733615+08:00", + "terminalPhoneNo":"123456789012", + "remoteAddressIP":"127.0.0.1:9826" + }, + { + "lastActiveTime":"2022-09-03T19:34:05.135997+08:00", + "startTime":"2022-09-03T19:34:05.136035+08:00", + "terminalPhoneNo":"123456789013", + "remoteAddressIP":"127.0.0.1:9825" + } + ] + } +} +``` + ### SIM黑名单管理服务 #### 1.添加sim卡黑名单 diff --git a/simples/JT808.Gateway.SimpleServer/Program.cs b/simples/JT808.Gateway.SimpleServer/Program.cs index 65818b9..ab18d21 100644 --- a/simples/JT808.Gateway.SimpleServer/Program.cs +++ b/simples/JT808.Gateway.SimpleServer/Program.cs @@ -22,70 +22,51 @@ namespace JT808.Gateway.SimpleServer { class Program { - static async Task Main(string[] args) + static void Main(string[] args) { - var serverHostBuilder = new HostBuilder() - .ConfigureAppConfiguration((hostingContext, config) => - { - config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); - config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); - }) - .ConfigureLogging((context, logging) => + var builder = WebApplication.CreateBuilder(); + builder.Host.ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); + }) + .ConfigureLogging((context, logging) => + { + logging.AddConsole(); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + //使用内存队列实现会话通知 + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); + services.AddJT808Configure() + .AddGateway(hostContext.Configuration) + .AddMessageHandler() + .AddMsgLogging() + .AddSessionNotice() + .AddTransmit(hostContext.Configuration) + .AddTcp() + .AddUdp() + .Builder(); + }); + + builder.WebHost.UseKestrel((app, serverOptions) => { - logging.AddConsole(); + //1.配置webapi端口监听 + var jT808Configuration = app.Configuration.GetSection(nameof(JT808Configuration)).Get(); + serverOptions.ListenAnyIP(jT808Configuration.WebApiPort); }) - .ConfigureServices((hostContext, services) => - { - services.AddSingleton(); - services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); - //使用内存队列实现会话通知 - services.AddSingleton(); - services.AddSingleton(); - services.AddSingleton(); - services.AddJT808Configure() - .AddGateway(hostContext.Configuration) - .AddMessageHandler() - .AddMsgLogging() - .AddSessionNotice() - .AddTransmit(hostContext.Configuration) - .AddTcp() - .AddUdp() - .Builder(); - }).ConfigureWebHostDefaults(webBuilder => { - webBuilder.UseKestrel((app, ksOptions) => - { - //1.配置webapi端口监听 - var jT808Configuration = app.Configuration.GetSection(nameof(JT808Configuration)).Get(); - ksOptions.ListenAnyIP(jT808Configuration.WebApiPort); - }) - .UseStartup(); - }); - - await serverHostBuilder.RunConsoleAsync(); - } - } - public class Startup - { - public Startup(IConfiguration configuration) - { - Configuration = configuration; - } - - public IConfiguration Configuration { get; } - - public void ConfigureServices(IServiceCollection services) - { - services.AddControllers(); - } - - public void Configure(IApplicationBuilder app, IWebHostEnvironment env) - { - app.UseRouting(); - - app.UseEndpoints(endpoints => + .ConfigureServices((hostContext, services) => { - endpoints.MapControllers(); + services.AddControllers(); }); + var app = builder.Build(); + app.UseRouting(); + app.MapControllers(); + app.Run(); } } } diff --git a/src/JT808.Gateway.Abstractions/Dtos/JT808PageResult.cs b/src/JT808.Gateway.Abstractions/Dtos/JT808PageResult.cs new file mode 100644 index 0000000..89e29b8 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/Dtos/JT808PageResult.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Abstractions.Dtos +{ + public class JT808PageResult + { + public int PageIndex { get; set; } = 1; + public int PageSize { get; set; } = 10; + public int Total { get; set; } = 0; + public T Data { get; set; } + } +} diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj index ef826c4..79375ed 100644 --- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj @@ -16,7 +16,7 @@ - + diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml index 392acdf..7c9da0a 100644 --- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml @@ -222,6 +222,11 @@ 基于Tcp的会话服务集合 + + + 基于Tcp的会话服务集合 + + 会话服务-通过设备终端号移除对应会话 @@ -242,6 +247,11 @@ 基于Udp的虚拟会话服务集合 + + + 基于Udp的虚拟会话服务集合 + + 会话服务-通过设备终端号移除对应会话 diff --git a/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs b/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs index 3637c53..b4a0cdf 100644 --- a/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs +++ b/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs @@ -20,7 +20,11 @@ /// /// 基于Tcp的会话服务集合 /// - public static string SessionTcpGetAll = $"{RouteTablePrefix}/{TcpPrefix}/{SessionPrefix}/GetAll"; + public static string SessionTcpGetAll = $"{RouteTablePrefix}/{TcpPrefix}/{SessionPrefix}/GetAll"; + /// + /// 基于Tcp的会话服务集合 + /// + public static string SessionTcpByPage = $"{RouteTablePrefix}/{TcpPrefix}/{SessionPrefix}/SessionTcpByPage"; /// /// 会话服务-通过设备终端号移除对应会话 /// @@ -36,7 +40,11 @@ /// /// 基于Udp的虚拟会话服务集合 /// - public static string SessionUdpGetAll = $"{RouteTablePrefix}/{UdpPrefix}/{SessionPrefix}/GetAll"; + public static string SessionUdpGetAll = $"{RouteTablePrefix}/{UdpPrefix}/{SessionPrefix}/GetAll"; + /// + /// 基于Udp的虚拟会话服务集合 + /// + public static string SessionUdpByPage = $"{RouteTablePrefix}/{UdpPrefix}/{SessionPrefix}/SessionUdpByPage"; /// /// 会话服务-通过设备终端号移除对应会话 /// diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj index 4181b50..5ab1b93 100644 --- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj +++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj @@ -20,7 +20,7 @@ - + diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs index 949db93..fc59fef 100644 --- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs +++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs @@ -45,7 +45,7 @@ namespace JT808.Gateway.CleintBenchmark.Services logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}"); logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}"); taskFactory = new TaskFactory(cancellationToken); - Task.Run(() => { + new Thread(() => { for (int i = 0; i < clientBenchmarkOptions.DeviceCount; i++) { taskFactory.StartNew(async (state) => { @@ -80,12 +80,12 @@ namespace JT808.Gateway.CleintBenchmark.Services { logger.LogError(ex.Message); } - await Task.Delay(clientBenchmarkOptions.Interval); + Thread.Sleep(clientBenchmarkOptions.Interval); } }, i); Thread.Sleep(300); } - }); + }).Start(); return Task.CompletedTask; } diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj index 3ee4cd6..9ce53d3 100644 --- a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj +++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj @@ -8,8 +8,8 @@ - - + + diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj index 9bc6fa1..c41bbd2 100644 --- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj +++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs index effba55..7028a1b 100644 --- a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs +++ b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs @@ -35,7 +35,8 @@ namespace JT808.Gateway.Client.Services public Task StartAsync(CancellationToken cancellationToken) { - Task.Run(async()=> { + new Thread(async () => + { foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken)) { try @@ -66,7 +67,7 @@ namespace JT808.Gateway.Client.Services await Task.Delay(TimeSpan.FromSeconds(5)); } } - }, cancellationToken); + }).Start(); return Task.CompletedTask; } diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj index 78ad2b8..e32a40b 100644 --- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj @@ -8,7 +8,7 @@ JT808.Gateway.Kafka.xml - + diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs index 610a619..a0cf4de 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs @@ -33,7 +33,7 @@ namespace JT808.Gateway.Kafka public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(() => + new Thread(() => { while (!Cts.IsCancellationRequested) { @@ -49,20 +49,22 @@ namespace JT808.Gateway.Kafka } callback((data.Message.Key, data.Message.Value)); } - catch (ConsumeException ex) + catch (OperationCanceledException) { - logger.LogError(ex, TopicName); + break; } - catch (OperationCanceledException) + catch (ConsumeException ex) { - + logger.LogError(ex, TopicName); + break; } catch (Exception ex) { logger.LogError(ex, TopicName); + break; } } - }); + }).Start(); } public void Subscribe() { diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs index d424325..e42a269 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs @@ -33,8 +33,7 @@ namespace JT808.Gateway.Kafka public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(() => - { + new Thread(() => { while (!Cts.IsCancellationRequested) { if (disposed) return; @@ -49,19 +48,22 @@ namespace JT808.Gateway.Kafka } callback((data.Message.Key, data.Message.Value)); } - catch (ConsumeException ex) + catch (OperationCanceledException) { - logger.LogError(ex, TopicName); + break; } - catch (OperationCanceledException) + catch (ConsumeException ex) { + logger.LogError(ex, TopicName); + break; } catch (Exception ex) { logger.LogError(ex, TopicName); + break; } } - }); + }).Start(); } public void Subscribe() diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyLoggingConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyLoggingConsumer.cs index 51afc60..f68feda 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgReplyLoggingConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyLoggingConsumer.cs @@ -33,7 +33,7 @@ namespace JT808.Gateway.Kafka public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(() => + new Thread(() => { while (!Cts.IsCancellationRequested) { @@ -52,16 +52,19 @@ namespace JT808.Gateway.Kafka catch (ConsumeException ex) { logger.LogError(ex, TopicName); + break; } catch (OperationCanceledException) { + break; } catch (Exception ex) { logger.LogError(ex, TopicName); + break; } } - }); + }).Start(); } public void Subscribe() diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs index 40cb534..ddcda2e 100644 --- a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs @@ -33,7 +33,7 @@ namespace JT808.Gateway.Kafka public void OnMessage(Action<(string Notice, string TerminalNo)> callback) { - Task.Run(() => + new Thread(() => { while (!Cts.IsCancellationRequested) { @@ -49,20 +49,23 @@ namespace JT808.Gateway.Kafka } callback((data.Message.Key, data.Message.Value)); } - catch (ConsumeException ex) + catch (OperationCanceledException ex) { logger.LogError(ex, TopicName); + break; } - catch (OperationCanceledException ex) + catch (ConsumeException ex) { logger.LogError(ex, TopicName); + break; } catch (Exception ex) { logger.LogError(ex, TopicName); + break; } } - }); + }).Start(); } public void Subscribe() diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs index 2309b91..1e5aee4 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs @@ -24,7 +24,7 @@ namespace JT808.Gateway.ReplyMessage /// public JT808ReplyMessageHostedService( ILoggerFactory loggerFactory, - IJT808DownMessageHandler jT808ReplyMessageHandler, + IJT808DownMessageHandler jT808ReplyMessageHandler, IJT808MsgReplyProducer jT808MsgReplyProducer, IJT808MsgConsumer jT808MsgConsumer) { diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs index 3d301e1..65ff100 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs @@ -34,7 +34,7 @@ namespace JT808.Gateway.NormalHosting.Impl public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(async () => + new Thread(async () => { while (!Cts.IsCancellationRequested) { @@ -48,7 +48,7 @@ namespace JT808.Gateway.NormalHosting.Impl logger.LogError(ex, ""); } } - }, Cts.Token); + }).Start(); } public void Subscribe() diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs index f331930..1ed4a8d 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs @@ -28,7 +28,7 @@ namespace JT808.Gateway.NormalHosting.Impl public void OnMessage(Action<(string Notice, string TerminalNo)> callback) { - Task.Run(async () => + new Thread((async () => { while (!Cts.IsCancellationRequested) { @@ -42,7 +42,7 @@ namespace JT808.Gateway.NormalHosting.Impl logger.LogError(ex, ""); } } - }, Cts.Token); + })).Start(); } public void Unsubscribe() diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj index 4a89cd7..0fb98c9 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj @@ -24,9 +24,9 @@ - - - + + + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj index e43df5f..4b7008c 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj @@ -11,9 +11,9 @@ - - - + + + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj index 8620706..9094a47 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj @@ -9,9 +9,9 @@ - - - + + + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml b/src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml index 09e863c..3240afa 100644 --- a/src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml +++ b/src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml @@ -33,6 +33,12 @@ + + + 会话服务集合 + + + 会话服务-通过设备终端号查询对应会话信息 @@ -53,6 +59,12 @@ + + + 会话服务集合 + + + 会话服务-通过设备终端号查询对应会话信息 diff --git a/src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs b/src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs index c5c5adb..4931c8c 100644 --- a/src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs +++ b/src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs @@ -56,6 +56,19 @@ namespace JT808.Gateway.WebApiClientTool return value; } + /// + /// 会话服务集合 + /// + /// + public async ValueTask>>> SessionTcpByPage(int pageIndex=0,int pageSize=10) + { + var request = new HttpRequestMessage(HttpMethod.Get, $"{JT808GatewayConstants.JT808WebApiRouteTable.SessionTcpByPage}?pageIndex={pageIndex}&pageSize={pageSize}"); + var response = await HttpClient.SendAsync(request); + response.EnsureSuccessStatusCode(); + var value = await response.Content.ReadFromJsonAsync>>>(); + return value; + } + /// /// 会话服务-通过设备终端号查询对应会话信息 /// @@ -94,6 +107,19 @@ namespace JT808.Gateway.WebApiClientTool return value; } + /// + /// 会话服务集合 + /// + /// + public async ValueTask>>> SessionUdpByPage(int pageIndex = 0, int pageSize = 10) + { + var request = new HttpRequestMessage(HttpMethod.Get, $"{JT808GatewayConstants.JT808WebApiRouteTable.SessionUdpByPage}?pageIndex={pageIndex}&pageSize={pageSize}"); + var response = await HttpClient.SendAsync(request); + response.EnsureSuccessStatusCode(); + var value = await response.Content.ReadFromJsonAsync>>>(); + return value; + } + /// /// 会话服务-通过设备终端号查询对应会话信息 /// diff --git a/src/JT808.Gateway/JT808.Gateway.xml b/src/JT808.Gateway/JT808.Gateway.xml index 4a192f1..821451f 100644 --- a/src/JT808.Gateway/JT808.Gateway.xml +++ b/src/JT808.Gateway/JT808.Gateway.xml @@ -161,6 +161,7 @@ + 会话服务-通过设备终端号查询对应会话 @@ -181,6 +182,7 @@ + 会话服务-通过设备终端号查询对应会话 diff --git a/src/JT808.Gateway/JT808WebApi.cs b/src/JT808.Gateway/JT808WebApi.cs index 15222cc..a0c506d 100644 --- a/src/JT808.Gateway/JT808WebApi.cs +++ b/src/JT808.Gateway/JT808WebApi.cs @@ -5,6 +5,7 @@ using JT808.Gateway.Services; using JT808.Gateway.Session; using Microsoft.AspNetCore.Cors; using Microsoft.AspNetCore.Mvc; +using Microsoft.AspNetCore.Mvc.RazorPages; using System; using System.Collections.Generic; using System.Linq; @@ -110,6 +111,51 @@ namespace JT808.Gateway return resultDto; } + /// + /// 会话服务-Tcp分页会话查询 + /// jt808api/Tcp/Session/SessionTcpByPage?pageIndex=0&pageSize10 + /// + /// + [HttpGet] + [Route("Tcp/Session/SessionTcpByPage")] + [JT808Token] + public ActionResult>>> SessionTcpByPage([FromQuery]int pageIndex=0, [FromQuery] int pageSize=10) + { + JT808ResultDto>> resultDto = new JT808ResultDto>>(); + try + { + if (pageIndex < 0) + { + pageIndex = 0; + } + if (pageSize >= 1000) + { + pageSize = 1000; + } + JT808PageResult> pageResult = new JT808PageResult>(); + IEnumerable sessionInfoDtos = SessionManager.GetTcpByPage(); + pageResult.Data = sessionInfoDtos.Select(s => new JT808TcpSessionInfoDto + { + LastActiveTime = s.ActiveTime, + StartTime = s.StartTime, + TerminalPhoneNo = s.TerminalPhoneNo, + RemoteAddressIP = s.RemoteEndPoint.ToString(), + }).OrderByDescending(o => o.LastActiveTime).Skip(pageIndex* pageSize).Take(pageSize).ToList(); + pageResult.Total = sessionInfoDtos.Count(); + pageResult.PageIndex = pageIndex; + pageResult.PageSize = pageSize; + resultDto.Data = pageResult; + resultDto.Code = JT808ResultCode.Ok; + } + catch (Exception ex) + { + resultDto.Data = new JT808PageResult>(); + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = ex.StackTrace; + } + return resultDto; + } + /// /// 会话服务-通过设备终端号查询对应会话 /// @@ -198,6 +244,51 @@ namespace JT808.Gateway return resultDto; } + /// + /// 会话服务-Udp分页会话查询 + /// jt808api/Udp/Session/SessionUdpByPage?pageIndex=0&pageSize10 + /// + /// + [HttpGet] + [Route("Udp/Session/SessionUdpByPage")] + [JT808Token] + public ActionResult>>> SessionUdpByPage([FromQuery] int pageIndex = 0, [FromQuery] int pageSize = 10) + { + JT808ResultDto>> resultDto = new JT808ResultDto>>(); + try + { + if (pageIndex < 0) + { + pageIndex = 0; + } + if (pageSize >= 1000) + { + pageSize = 1000; + } + JT808PageResult> pageResult = new JT808PageResult>(); + IEnumerable sessionInfoDtos = SessionManager.GetUdpByPage(); + pageResult.Data = sessionInfoDtos.Select(s => new JT808UdpSessionInfoDto + { + LastActiveTime = s.ActiveTime, + StartTime = s.StartTime, + TerminalPhoneNo = s.TerminalPhoneNo, + RemoteAddressIP = s.RemoteEndPoint.ToString(), + }).OrderByDescending(o => o.LastActiveTime).Skip(pageIndex * pageSize).Take(pageSize).ToList(); + pageResult.Total = sessionInfoDtos.Count(); + pageResult.PageIndex = pageIndex; + pageResult.PageSize = pageSize; + resultDto.Data = pageResult; + resultDto.Code = JT808ResultCode.Ok; + } + catch (Exception ex) + { + resultDto.Data = new JT808PageResult>(); + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = ex.StackTrace; + } + return resultDto; + } + /// /// 会话服务-通过设备终端号查询对应会话 /// diff --git a/src/JT808.Gateway/Session/JT808SessionManager.cs b/src/JT808.Gateway/Session/JT808SessionManager.cs index 532df42..281293c 100644 --- a/src/JT808.Gateway/Session/JT808SessionManager.cs +++ b/src/JT808.Gateway/Session/JT808SessionManager.cs @@ -271,6 +271,17 @@ namespace JT808.Gateway.Session return query.Select(s => (JT808TcpSession)s.Value).ToList(); } + public IEnumerable GetTcpByPage(Func predicate = null) + { + var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp); + if (predicate != null) + { + query = query.Where(s => predicate(s.Value)); + } + return query.Select(s => (JT808TcpSession)s.Value); + } + + public List GetUdpAll(Func predicate = null) { var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp); @@ -280,5 +291,15 @@ namespace JT808.Gateway.Session } return query.Select(s => (JT808UdpSession)s.Value).ToList(); } + + public IEnumerable GetUdpByPage(Func predicate = null) + { + var query = TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp); + if (predicate != null) + { + query = query.Where(s => predicate(s.Value)); + } + return query.Select(s => (JT808UdpSession)s.Value); + } } } diff --git a/src/PipelineInfo.props b/src/PipelineInfo.props index 7e623a8..2fb7c01 100644 --- a/src/PipelineInfo.props +++ b/src/PipelineInfo.props @@ -8,7 +8,7 @@ https://github.com/SmallChi/JT808Gateway https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE - 1.1.8-preview1 + 1.1.8-preview2 LICENSE true latest