From 7a91b0b35da3c2d6d5ddce763eae08ebbac18941 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Tue, 30 Jun 2020 15:47:29 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=B7=BB=E5=8A=A0tcp=E6=9C=8D=E5=8A=A1=202.?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0tcp=E4=BC=9A=E8=AF=9D=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=203.=E6=B7=BB=E5=8A=A0=E5=86=85=E5=AD=98?= =?UTF-8?q?=E9=98=9F=E5=88=97=E7=9A=84=E5=AE=9E=E7=8E=B0=E5=BA=93=204.?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=86=85=E5=AD=98=E9=98=9F=E5=88=97=E7=9A=84?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- .../JT1078.Gateway.Abstractions.csproj | 7 ++- .../JT1078GatewayConstants.cs | 8 +++ .../JT1078.Gateway.InMemoryMQ.csproj | 31 ++++++++++ .../JT1078InMemoryMQExtensions.cs | 26 ++++++++ .../JT1078MsgChannel.cs | 17 +++++ .../JT1078PackageConsumer.cs | 60 ++++++++++++++++++ .../JT1078PackageProducer.cs | 32 ++++++++++ .../JT1078.Gateway.TestNormalHosting.csproj | 24 +++++++ .../Program.cs | 43 +++++++++++++ .../Services/JT1078NormalMsgHostedService.cs | 27 ++++++++ .../appsettings.json | 15 +++++ src/JT1078.Gateway.sln | 17 ++++- src/JT1078.Gateway/JT1078.Gateway.csproj | 4 +- src/JT1078.Gateway/JT1078GatewayExtensions.cs | 45 +++++++++++++- src/JT1078.Gateway/JT1078TcpServer.cs | 4 +- src/JT1078.Gateway/JT1078UdpServer.cs | 47 ++++++++++++++ .../Jobs/JT1078SessionNoticeJob.cs | 46 ++++++++++++++ .../Jobs/JT1078TcpReceiveTimeoutJob.cs | 62 +++++++++++++++++++ .../Services/JT1078SessionNoticeService.cs | 16 +++++ .../Sessions/JT1078SessionManager.cs | 54 ++++------------ 21 files changed, 533 insertions(+), 54 deletions(-) create mode 100644 src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs create mode 100644 src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj create mode 100644 src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs create mode 100644 src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs create mode 100644 src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs create mode 100644 src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json create mode 100644 src/JT1078.Gateway/JT1078UdpServer.cs create mode 100644 src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs create mode 100644 src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs create mode 100644 src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs diff --git a/README.md b/README.md index 498d2c9..6cf7cd2 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# JT1078.Gateway +# JT1078Gateway ## 前提条件 diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index d52d04b..fedc579 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -7,8 +7,8 @@ SmallChi(Koike) JT1078.Gateway.Abstractions JT1078.Gateway.Abstractions - 基于Pipeline实现的JT1078Gateway的抽象库 - 基于Pipeline实现的JT1078Gateway的抽象库 + 基于JT1078Gateway的抽象库 + 基于JT1078Gateway的抽象库 https://github.com/SmallChi/JT1078Gateway https://github.com/SmallChi/JT1078Gateway https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE @@ -17,7 +17,7 @@ false LICENSE true - 1.0.0-preview1 + 1.0.0-preview2 @@ -30,5 +30,6 @@ + diff --git a/src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs b/src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs new file mode 100644 index 0000000..f90653a --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs @@ -0,0 +1,8 @@ +namespace JT1078.Gateway.Abstractions +{ + public static class JT1078GatewayConstants + { + public const string SessionOnline= "JT1078SessionOnline"; + public const string SessionOffline = "JT1078SessionOffline"; + } +} diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj new file mode 100644 index 0000000..0577923 --- /dev/null +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj @@ -0,0 +1,31 @@ + + + + netstandard2.1 + 8.0 + Copyright 2019. + SmallChi(Koike) + JT1078.Gateway.InMemoryMQ + JT1078.Gateway.InMemoryMQ + 基于JT1078Gateway实现的内存队列 + 基于JT1078Gateway实现的内存队列 + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + false + false + LICENSE + true + 1.0.0-preview2 + + + + + + + + + + + diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs new file mode 100644 index 0000000..4291b15 --- /dev/null +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs @@ -0,0 +1,26 @@ +using JT1078.Gateway.Abstractions; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.InMemoryMQ +{ + public static class JT1078InMemoryMQExtensions + { + public static IJT1078NormalGatewayBuilder AddMsgProducer(this IJT1078NormalGatewayBuilder builder) + { + builder.JT1078Builder.Services.TryAddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); + return builder; + } + + public static IJT1078NormalGatewayBuilder AddMsgConsumer(this IJT1078NormalGatewayBuilder builder) + { + builder.JT1078Builder.Services.TryAddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); + return builder; + } + } +} diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs new file mode 100644 index 0000000..ae3ff54 --- /dev/null +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Channels; + +namespace JT1078.Gateway.InMemoryMQ +{ + public class JT1078MsgChannel + { + public Channel<(string, JT1078.Protocol.JT1078Package)> Channel { get;} + + public JT1078MsgChannel() + { + Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, JT1078.Protocol.JT1078Package)>(); + } + } +} diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs new file mode 100644 index 0000000..31ccaa0 --- /dev/null +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs @@ -0,0 +1,60 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Protocol; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; +using System.Text.Json.Serialization; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway.InMemoryMQ +{ + public class JT1078PackageConsumer: IJT1078PackageConsumer + { + private JT1078MsgChannel Channel; + private readonly ILogger logger; + + public JT1078PackageConsumer(ILoggerFactory loggerFactory,JT1078MsgChannel channel) + { + Channel = channel; + logger = loggerFactory.CreateLogger(); + } + + public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); + + public string TopicName { get; } = "JT1078Package"; + + public void Dispose() + { + + } + + public void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + var reader = await Channel.Channel.Reader.ReadAsync(Cts.Token); + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace(JsonSerializer.Serialize(reader.Item2)); + } + callback(reader); + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + + } + } +} diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs new file mode 100644 index 0000000..f33134b --- /dev/null +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs @@ -0,0 +1,32 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Protocol; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT1078.Gateway.InMemoryMQ +{ + public class JT1078PackageProducer : IJT1078PackageProducer + { + public string TopicName { get; }= "JT1078Package"; + + private JT1078MsgChannel Channel; + + public JT1078PackageProducer(JT1078MsgChannel channel) + { + Channel = channel; + } + + public void Dispose() + { + + } + + public async ValueTask ProduceAsync(string terminalNo, JT1078Package data) + { + await Channel.Channel.Writer.WriteAsync((terminalNo, data)); + } + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj new file mode 100644 index 0000000..cde7db3 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -0,0 +1,24 @@ + + + + Exe + netcoreapp3.1 + + + + + + + + + + + + + + + + Always + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs new file mode 100644 index 0000000..954638c --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -0,0 +1,43 @@ +using JT1078.Gateway.InMemoryMQ; +using JT1078.Gateway.TestNormalHosting.Services; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting +{ + class Program + { + static async Task Main(string[] args) + { + var serverHostBuilder = new HostBuilder() + .ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); + }) + .ConfigureLogging((context, logging) => + { + logging.AddConsole(); + logging.SetMinimumLevel(LogLevel.Trace); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + //使用内存队列实现会话通知 + services.AddJT1078Gateway(hostContext.Configuration) + .AddTcp() + .AddNormal() + .AddMsgProducer() + .AddMsgConsumer(); + services.AddHostedService(); + }); + + await serverHostBuilder.RunConsoleAsync(); + } + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs new file mode 100644 index 0000000..c63ea63 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs @@ -0,0 +1,27 @@ +using JT1078.Gateway.Abstractions; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + public class JT1078NormalMsgHostedService : BackgroundService + { + private IJT1078PackageConsumer PackageConsumer; + public JT1078NormalMsgHostedService(IJT1078PackageConsumer packageConsumer) + { + PackageConsumer = packageConsumer; + } + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + PackageConsumer.OnMessage((Message) => + { + + }); + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json new file mode 100644 index 0000000..08cd388 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json @@ -0,0 +1,15 @@ +{ + "Logging": { + "IncludeScopes": false, + "Debug": { + "LogLevel": { + "Default": "Trace" + } + }, + "Console": { + "LogLevel": { + "Default": "Trace" + } + } + } +} diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index 6d2021e..de89a84 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -5,11 +5,15 @@ VisualStudioVersion = 16.0.29418.71 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -29,12 +33,21 @@ Global {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU + {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.Build.0 = Release|Any CPU + {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution {DAC80A8E-4172-451F-910D-9032BF8640F9} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E} + {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9172690A-1D5A-491A-ACDD-3AF893980E0B} diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index e4bf481..651c32a 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -16,7 +16,7 @@ false LICENSE true - 1.0.0-preview1 + 1.0.0-preview2 @@ -40,6 +40,8 @@ + + diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index c69709c..9a4e369 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -1,6 +1,9 @@ using JT1078.Gateway.Abstractions; using JT1078.Gateway.Configurations; using JT1078.Gateway.Impl; +using JT1078.Gateway.Jobs; +using JT1078.Gateway.Services; +using JT1078.Gateway.Sessions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -15,17 +18,53 @@ namespace JT1078.Gateway public static class JT1078GatewayExtensions { - public static IJT1078Builder AddJT1078Gateway(this IServiceCollection serviceDescriptors, IConfiguration configuration) + public static IJT1078GatewayBuilder AddJT1078Gateway(this IServiceCollection serviceDescriptors, IConfiguration configuration) { IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure(configuration.GetSection("JT1078Configuration")); - return builder; + IJT1078GatewayBuilder jT1078GatewayBuilderDefault = new JT1078GatewayBuilderDefault(builder); + jT1078GatewayBuilderDefault.AddJT1078Core(); + return jT1078GatewayBuilderDefault; } - public static IJT1078Builder AddJT1078Gateway(this IServiceCollection serviceDescriptors, Action jt1078Options) + public static IJT1078GatewayBuilder AddJT1078Gateway(this IServiceCollection serviceDescriptors, Action jt1078Options) { IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure(jt1078Options); + IJT1078GatewayBuilder jT1078GatewayBuilderDefault = new JT1078GatewayBuilderDefault(builder); + jT1078GatewayBuilderDefault.AddJT1078Core(); + return jT1078GatewayBuilderDefault; + } + + public static IJT1078GatewayBuilder AddTcp(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddHostedService(); + builder.JT1078Builder.Services.AddHostedService(); + return builder; + } + + public static IJT1078GatewayBuilder AddUdp(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddHostedService(); + return builder; + } + + public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder) + { + return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder); + } + + public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddHostedService(); + return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder); + } + + internal static IJT1078GatewayBuilder AddJT1078Core(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddHostedService(); return builder; } } diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs index 53f8d83..eb40eb5 100644 --- a/src/JT1078.Gateway/JT1078TcpServer.cs +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -52,7 +52,7 @@ namespace JT1078.Gateway { SessionManager = jT1078SessionManager; jT1078UseType = JT1078UseType.Normal; - Logger = loggerFactory.CreateLogger("JT1078TcpServer"); + Logger = loggerFactory.CreateLogger(); Configuration = jT1078ConfigurationAccessor.Value; this.jT1078PackageProducer = jT1078PackageProducer; InitServer(); @@ -73,7 +73,7 @@ namespace JT1078.Gateway { SessionManager = jT1078SessionManager; jT1078UseType = JT1078UseType.Queue; - Logger = loggerFactory.CreateLogger("JT1078TcpServer"); + Logger = loggerFactory.CreateLogger(); Configuration = jT1078ConfigurationAccessor.Value; this.jT1078MsgProducer = jT1078MsgProducer; InitServer(); diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs new file mode 100644 index 0000000..2057e8d --- /dev/null +++ b/src/JT1078.Gateway/JT1078UdpServer.cs @@ -0,0 +1,47 @@ +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Abstractions.Enums; +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using JT1078.Protocol; +using JT1078.Protocol.Enums; +using JT1078.Protocol.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JT1078.Gateway +{ + public class JT1078UdpServer : IHostedService + { + public Task StartAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + string ReadBCD(ReadOnlySpan readOnlySpan, int len) + { + int count = len / 2; + StringBuilder bcdSb = new StringBuilder(count); + for (int i = 0; i < count; i++) + { + bcdSb.Append(readOnlySpan[i].ToString("X2")); + } + return bcdSb.ToString().TrimStart('0'); + } + } +} diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs new file mode 100644 index 0000000..5a9c2d4 --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs @@ -0,0 +1,46 @@ +using JT1078.Gateway.Services; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + + +namespace JT1078.Gateway.Jobs +{ + public class JT1078SessionNoticeJob : BackgroundService + { + private readonly ILogger logger; + private JT1078SessionNoticeService SessionNoticeService; + public JT1078SessionNoticeJob( + JT1078SessionNoticeService sessionNoticeService, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + SessionNoticeService = sessionNoticeService; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await Task.Run(() => { + try + { + foreach (var notice in SessionNoticeService.SessionNoticeBlockingCollection.GetConsumingEnumerable(stoppingToken)) + { + if (logger.IsEnabled(LogLevel.Information)) + { + logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}"); + } + } + } + catch + { + + } + }, stoppingToken); + } + } +} diff --git a/src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs new file mode 100644 index 0000000..c98a0fd --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs @@ -0,0 +1,62 @@ +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Jobs +{ + internal class JT1078TcpReceiveTimeoutJob : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT1078SessionManager SessionManager; + + private readonly IOptionsMonitor Configuration; + public JT1078TcpReceiveTimeoutJob( + IOptionsMonitor jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager + ) + { + SessionManager = jT1078SessionManager; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + foreach (var item in SessionManager.GetTcpAll()) + { + if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.TcpReaderIdleTimeSeconds) < DateTime.Now) + { + item.ReceiveTimeout.Cancel(); + } + } + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Check Receive Timeout]"); + Logger.LogInformation($"[Session Online Count]:{SessionManager.TcpSessionCount}"); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Timeout]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.TcpReceiveTimeoutCheckTimeSeconds), stoppingToken); + } + } + } + } +} diff --git a/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs new file mode 100644 index 0000000..3d1b70d --- /dev/null +++ b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Services +{ + public class JT1078SessionNoticeService + { + public BlockingCollection<(string SessionType, string TerminalPhoneNo,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; } + public JT1078SessionNoticeService() + { + SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string TerminalPhoneNo, string ProtocolType)>(); + } + } +} diff --git a/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs index 5775d78..6169d14 100644 --- a/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs @@ -1,5 +1,6 @@ using JT1078.Gateway.Abstractions; using JT1078.Gateway.Abstractions.Enums; +using JT1078.Gateway.Services; using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; @@ -20,11 +21,15 @@ namespace JT1078.Gateway.Sessions private readonly ILogger logger; public ConcurrentDictionary Sessions { get; } public ConcurrentDictionary TerminalPhoneNoSessions { get; } - public JT1078SessionManager(ILoggerFactory loggerFactory) + private readonly JT1078SessionNoticeService SessionNoticeService; + public JT1078SessionManager( + JT1078SessionNoticeService jT1078SessionNoticeService, + ILoggerFactory loggerFactory) { Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - logger = loggerFactory.CreateLogger("JT1078SessionManager"); + logger = loggerFactory.CreateLogger(); + this.SessionNoticeService = jT1078SessionNoticeService; } public int TotalSessionCount @@ -62,8 +67,7 @@ namespace JT1078.Gateway.Sessions session.ActiveTime = curretDatetime; TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession); //会话通知 - //todo: 会话通知 - + SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, session.TransportProtocolType.ToString())); } else { @@ -77,40 +81,7 @@ namespace JT1078.Gateway.Sessions if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) { //会话通知 - //todo: 会话通知 - - } - } - } - - internal void TryLink(IJT1078Session session) - { - DateTime curretDatetime = DateTime.Now; - if (TerminalPhoneNoSessions.TryGetValue(session.SessionID, out IJT1078Session cacheSession)) - { - if (session.SessionID != cacheSession.SessionID) - { - //从转发到直连的数据需要更新缓存 - session.ActiveTime = curretDatetime; - TerminalPhoneNoSessions.TryUpdate(session.SessionID, session, cacheSession); - //会话通知 - //todo: 会话通知 - - } - else - { - cacheSession.ActiveTime = curretDatetime; - TerminalPhoneNoSessions.TryUpdate(session.SessionID, cacheSession, cacheSession); - } - } - else - { - session.TerminalPhoneNo = session.SessionID; - if (TerminalPhoneNoSessions.TryAdd(session.SessionID, session)) - { - //会话通知 - //todo: 会话通知 - + SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, session.TransportProtocolType.ToString())); } } } @@ -133,7 +104,7 @@ namespace JT1078.Gateway.Sessions currentSession = session; } //会话通知 - //todo: 会话通知 + SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, currentSession.TransportProtocolType.ToString())); return currentSession; } @@ -206,8 +177,7 @@ namespace JT1078.Gateway.Sessions if (logger.IsEnabled(LogLevel.Information)) logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); //会话通知 - //todo: 会话通知 SessionOffline - + SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOffline, terminalPhoneNo, removeTerminalPhoneNoSessions.TransportProtocolType.ToString())); } } } @@ -225,7 +195,7 @@ namespace JT1078.Gateway.Sessions } var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); //会话通知 - //todo: 会话通知 SessionOffline tmpTerminalPhoneNo + SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOffline, tmpTerminalPhoneNo, removeSession.TransportProtocolType.ToString())); if (logger.IsEnabled(LogLevel.Information)) logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}"); }