From d65fffce91aa79d9ca0b5d83199828d819580483 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Tue, 15 Sep 2020 23:23:37 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=AE=8C=E5=96=84=E5=90=84=E4=B8=AA=E6=A8=A1?= =?UTF-8?q?=E5=9D=97=E7=9A=84=E6=B3=A8=E5=86=8C=202.=E6=B7=BB=E5=8A=A0?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E7=9A=84=E5=90=84=E4=B8=AA=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=A8=A1=E5=9D=97=203.=E5=8E=BB=E6=8E=89=E6=97=A0=E7=94=A8?= =?UTF-8?q?=E7=9A=84=E6=96=87=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT808MessageHandler.cs | 20 +- .../JT808MsgReplyConsumer.cs | 2 +- .../Impl/JT808CustomMessageHandlerImpl.cs | 28 ++- .../Impl/JT808MsgReplyConsumer.cs | 64 ++++++ .../Impl/JT808MsgReplyProducer.cs | 30 +++ .../Impl/JT808SessionConsumer.cs | 2 +- .../JT808.Gateway.NormalHosting/Program.cs | 8 +- .../Services/JT808MsgReplyDataService.cs | 28 +++ .../Internal/JT808MsgProducer_Empty.cs | 23 +++ .../JT808MsgReplyLoggingProducer_Empty.cs | 23 +++ .../JT808NormalGatewayBuilderDefault.cs | 20 -- .../JT808QueueGatewayBuilderDefault.cs | 20 -- src/JT808.Gateway/JT808.Gateway.csproj | 6 + src/JT808.Gateway/JT808.Gateway.xml | 186 ++++++++++++++++++ src/JT808.Gateway/JT808GatewayExtensions.cs | 163 ++++++++++----- src/JT808.Gateway/JT808TcpServer.cs | 2 +- src/JT808.Gateway/JT808UdpServer.cs | 4 +- .../Metadata/JT808AtomicCounter.cs | 49 ----- .../Services/JT808MsgReplyHostedService.cs | 14 +- .../Session/JT808SessionManager.cs | 6 +- 20 files changed, 531 insertions(+), 167 deletions(-) create mode 100644 src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs create mode 100644 src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs create mode 100644 src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs create mode 100644 src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs create mode 100644 src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs delete mode 100644 src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs delete mode 100644 src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs create mode 100644 src/JT808.Gateway/JT808.Gateway.xml delete mode 100644 src/JT808.Gateway/Metadata/JT808AtomicCounter.cs diff --git a/src/JT808.Gateway.Abstractions/JT808MessageHandler.cs b/src/JT808.Gateway.Abstractions/JT808MessageHandler.cs index 4bba55a..7360576 100644 --- a/src/JT808.Gateway.Abstractions/JT808MessageHandler.cs +++ b/src/JT808.Gateway.Abstractions/JT808MessageHandler.cs @@ -23,15 +23,21 @@ namespace JT808.Gateway.Abstractions protected IJT808MsgProducer MsgProducer; + protected IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer; + protected IOptionsMonitor JT808ConfigurationOptionsMonitor; + protected IJT808Config JT808Config; + public JT808MessageHandler( IOptionsMonitor jT808ConfigurationOptionsMonitor, IJT808MsgProducer msgProducer, + IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, IJT808Config jT808Config) { this.JT808Serializer = jT808Config.GetSerializer(); this.MsgProducer = msgProducer; + this.MsgReplyLoggingProducer = msgReplyLoggingProducer; this.JT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; HandlerDict = new Dictionary { {JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, @@ -56,12 +62,6 @@ namespace JT808.Gateway.Abstractions }; } - public JT808MessageHandler(IOptionsMonitor jT808ConfigurationOptionsMonitor - , IJT808Config jT808Config) : this(jT808ConfigurationOptionsMonitor, null, jT808Config) - { - - } - /// /// 消息处理 /// @@ -85,12 +85,16 @@ namespace JT808.Gateway.Abstractions } else { - return func(request, session); + var data=func(request, session); + MsgReplyLoggingProducer.ProduceAsync(request.Header.TerminalPhoneNo, data); + return data; } } else { - return func(request, session); + var data = func(request, session); + MsgReplyLoggingProducer.ProduceAsync(request.Header.TerminalPhoneNo, data); + return data; } } else diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs index 7bb7a59..8dfb760 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs @@ -28,7 +28,7 @@ namespace JT808.Gateway.Kafka { consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); TopicName = consumerConfigAccessor.Value.TopicName; - logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); + logger = loggerFactory.CreateLogger(); } public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs index 0492392..3fd717b 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs @@ -16,24 +16,33 @@ namespace JT808.Gateway.NormalHosting.Impl { private readonly ILogger logger; //private readonly IJT808Traffic jT808Traffic; - private readonly IJT808MsgLogging jT808MsgLogging; //private readonly JT808TransmitService jT808TransmitService; + private readonly IJT808MsgLogging jT808MsgLogging; + private readonly IJT808MsgReplyProducer MsgReplyProducer; + public JT808CustomMessageHandlerImpl( - IOptionsMonitor jT808ConfigurationOptionsMonitor, - //JT808TransmitService jT808TransmitService, + ILoggerFactory loggerFactory, + IJT808MsgReplyProducer msgReplyProducer, IJT808MsgLogging jT808MsgLogging, - //IJT808Traffic jT808Traffic, - ILoggerFactory loggerFactory, - IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor, jT808Config) + IOptionsMonitor jT808ConfigurationOptionsMonitor, + IJT808MsgProducer msgProducer, + IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, + IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor, + msgProducer, + msgReplyLoggingProducer, + jT808Config) { + MsgReplyProducer = msgReplyProducer; //this.jT808TransmitService = jT808TransmitService; //this.jT808Traffic = jT808Traffic; this.jT808MsgLogging = jT808MsgLogging; - logger =loggerFactory.CreateLogger(); + logger = loggerFactory.CreateLogger(); //添加自定义消息 HandlerDict.Add(0x9999, Msg0x9999); } + + /// /// 重写消息处理器 /// @@ -72,7 +81,10 @@ namespace JT808.Gateway.NormalHosting.Impl public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session) { logger.LogDebug("重写自带Msg0x0200的消息"); - return base.Msg0x0200(request, session); + var data = base.Msg0x0200(request, session); + logger.LogDebug("往应答服务发送相同数据进行测试"); + MsgReplyProducer.ProduceAsync(request.Header.TerminalPhoneNo, data).ConfigureAwait(false); + return data; } /// diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs new file mode 100644 index 0000000..3d301e1 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs @@ -0,0 +1,64 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.NormalHosting.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.NormalHosting.Impl +{ + public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer + { + public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); + + public string TopicName { get; } = JT808GatewayConstants.MsgReplyTopic; + + private readonly JT808MsgReplyDataService MsgReplyDataService; + + private ILogger logger; + + public JT808MsgReplyConsumer( + ILoggerFactory loggerFactory, + JT808MsgReplyDataService msgReplyDataService) + { + MsgReplyDataService = msgReplyDataService; + logger = loggerFactory.CreateLogger(); + } + + public void Dispose() + { + + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async () => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await MsgReplyDataService.ReadAsync(Cts.Token); + callback(item); + } + catch (Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs new file mode 100644 index 0000000..a647202 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs @@ -0,0 +1,30 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.NormalHosting.Services; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.NormalHosting.Impl +{ + public class JT808MsgReplyProducer : IJT808MsgReplyProducer + { + public string TopicName { get; } = JT808GatewayConstants.MsgReplyTopic; + + private readonly JT808MsgReplyDataService MsgReplyDataService; + + public JT808MsgReplyProducer(JT808MsgReplyDataService msgReplyDataService) + { + MsgReplyDataService = msgReplyDataService; + } + + public async ValueTask ProduceAsync(string terminalNo, byte[] data) + { + await MsgReplyDataService.WriteAsync(terminalNo, data); + } + + public void Dispose() + { + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs index b26ce6f..f331930 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs @@ -22,7 +22,7 @@ namespace JT808.Gateway.NormalHosting.Impl JT808SessionService jT808SessionService, ILoggerFactory loggerFactory) { - logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + logger = loggerFactory.CreateLogger(); JT808SessionService = jT808SessionService; } diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs index a408a52..74d2d03 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs @@ -44,11 +44,15 @@ namespace JT808.Gateway.NormalHosting services.AddSingleton(); services.AddSingleton(); services.AddSingleton(); + //使用内存队列实现应答生产消费 + services.AddSingleton(); + services.AddSingleton(); services.AddJT808Configure() //添加客户端工具 //.AddClient() .AddGateway(hostContext.Configuration) - .ReplaceMessageHandler() + .AddMessageHandler() + .AddMsgReplyConsumer() .AddMsgLogging() //.AddTraffic() //.AddSessionNotice() @@ -56,7 +60,7 @@ namespace JT808.Gateway.NormalHosting .AddTcp() //.AddUdp() .AddHttp() - ; + .Register();//必须注册的 //流量统计 //services.AddHostedService(); //grpc客户端调用 diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs new file mode 100644 index 0000000..d4f138a --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.NormalHosting.Services +{ + public class JT808MsgReplyDataService + { + private readonly Channel<(string TerminalNo, byte[] Data)> _channel; + + public JT808MsgReplyDataService() + { + _channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); + } + + public async ValueTask WriteAsync(string terminalNo, byte[] Data) + { + await _channel.Writer.WriteAsync((terminalNo, Data)); + } + public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } + } +} diff --git a/src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs b/src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs new file mode 100644 index 0000000..f89c775 --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs @@ -0,0 +1,23 @@ +using JT808.Gateway.Abstractions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Internal +{ + class JT808MsgProducer_Empty : IJT808MsgProducer + { + public string TopicName { get; } + + public void Dispose() + { + + } + + public ValueTask ProduceAsync(string terminalNo, byte[] data) + { + return default; + } + } +} diff --git a/src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs b/src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs new file mode 100644 index 0000000..f1c3fba --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs @@ -0,0 +1,23 @@ +using JT808.Gateway.Abstractions; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Internal +{ + class JT808MsgReplyLoggingProducer_Empty : IJT808MsgReplyLoggingProducer + { + public string TopicName { get; } + + public void Dispose() + { + + } + + public ValueTask ProduceAsync(string terminalNo, byte[] data) + { + return default; + } + } +} diff --git a/src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs b/src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs deleted file mode 100644 index 23f2de3..0000000 --- a/src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs +++ /dev/null @@ -1,20 +0,0 @@ -using JT808.Gateway.Abstractions; -using JT808.Protocol; - -namespace JT808.Gateway.Internal -{ - public class JT808NormalGatewayBuilderDefault : IJT808NormalGatewayBuilder - { - public IJT808Builder JT808Builder { get; } - - public JT808NormalGatewayBuilderDefault(IJT808Builder builder) - { - JT808Builder = builder; - } - - public IJT808Builder Builder() - { - return JT808Builder; - } - } -} \ No newline at end of file diff --git a/src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs b/src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs deleted file mode 100644 index d37792f..0000000 --- a/src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs +++ /dev/null @@ -1,20 +0,0 @@ -using JT808.Gateway.Abstractions; -using JT808.Protocol; - -namespace JT808.Gateway.Internal -{ - public class JT808QueueGatewayBuilderDefault : IJT808QueueGatewayBuilder - { - public IJT808Builder JT808Builder { get; } - - public JT808QueueGatewayBuilderDefault(IJT808Builder builder) - { - JT808Builder = builder; - } - - public IJT808Builder Builder() - { - return JT808Builder; - } - } -} \ No newline at end of file diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj index 8a04f7e..1e1c236 100644 --- a/src/JT808.Gateway/JT808.Gateway.csproj +++ b/src/JT808.Gateway/JT808.Gateway.csproj @@ -19,6 +19,12 @@ JT808.Gateway $(JT808GatewayPackageVersion) + + JT808.Gateway.xml + + + JT808.Gateway.xml + diff --git a/src/JT808.Gateway/JT808.Gateway.xml b/src/JT808.Gateway/JT808.Gateway.xml new file mode 100644 index 0000000..7b6aaca --- /dev/null +++ b/src/JT808.Gateway/JT808.Gateway.xml @@ -0,0 +1,186 @@ + + + + JT808.Gateway + + + + + 默认消息处理业务实现 + + + + + 会话服务集合 + + + + + + + 通过终端手机号查询对应会话 + + + + + + + 会话服务-通过设备终端号移除对应会话 + + + + + + + 会话服务集合 + + + + + + + 通过终端手机号查询对应会话 + + + + + + + 会话服务-通过设备终端号移除对应会话 + + + + + + + 统一下发信息 + + + + + + + 添加808网关 + + + + + + + + 添加808网关 + + + + + + + + 添加tcp服务器 + + + + + + + 添加udp服务器 + + + + + + + 添加http服务器 + + + + + + + 添加http服务器 + + + + + + + + 添加消息业务处理程序 + + + + + + + + 添加Http服务认证机制 + + + + + + + + 添加消息生产者 + + + + + + + + 添加消息应答后的应答生产者 + + + + + + + + 添加消息应答消费者 + + + + + + + + 必须注册的 + + + + + + 添加公共模块 + + + + + + + 使用队列方式 + + + + + + + + + + + 不支持变态类型:既发TCP和UDP + + + + + 终端手机号 + + + + + 终端手机号 + + + + diff --git a/src/JT808.Gateway/JT808GatewayExtensions.cs b/src/JT808.Gateway/JT808GatewayExtensions.cs index ac7971f..06da718 100644 --- a/src/JT808.Gateway/JT808GatewayExtensions.cs +++ b/src/JT808.Gateway/JT808GatewayExtensions.cs @@ -11,6 +11,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using System; using System.Runtime.CompilerServices; +using System.Linq; [assembly: InternalsVisibleTo("JT808.Gateway.TestHosting")] [assembly: InternalsVisibleTo("JT808.Gateway.Test")] @@ -18,44 +19,12 @@ namespace JT808.Gateway { public static partial class JT808GatewayExtensions { - //public static IJT808QueueGatewayBuilder AddQueueGateway(this IJT808Builder jT808Builder, Action config) - //{ - // IJT808QueueGatewayBuilder server = new JT808QueueGatewayBuilderDefault(jT808Builder); - // server.JT808Builder.Services.Configure(config); - // server.AddJT808Core(); - // server.JT808Builder.Services.AddHostedService(); - // return server; - //} - - //public static IJT808NormalGatewayBuilder AddNormalGateway(this IJT808Builder jT808Builder, Action config) - //{ - // IJT808NormalGatewayBuilder server = new JT808NormalGatewayBuilderDefault(jT808Builder); - // server.JT808Builder.Services.AddSingleton(); - // server.JT808Builder.Services.Configure(config); - // server.AddJT808Core(); - // return server; - //} - - //public static IJT808QueueGatewayBuilder AddQueueGateway(this IJT808Builder jT808Builder, IConfiguration configuration) - //{ - // IJT808QueueGatewayBuilder server = new JT808QueueGatewayBuilderDefault(jT808Builder); - // server.JT808Builder.Services.Configure(configuration.GetSection("JT808Configuration")); - // server.AddJT808Core(); - // server.JT808Builder.Services.AddHostedService(); - // return server; - //} - - //public static IJT808NormalGatewayBuilder AddNormalGateway(this IJT808Builder jT808Builder, IConfiguration configuration) - //{ - // IJT808NormalGatewayBuilder server = new JT808NormalGatewayBuilderDefault(jT808Builder); - // server.JT808Builder.Services.AddSingleton(); - // server.JT808Builder.Services.Configure(configuration.GetSection("JT808Configuration")); - // server.AddJT808Core(); - // return server; - //} - - - + /// + /// 添加808网关 + /// + /// + /// + /// public static IJT808GatewayBuilder AddGateway(this IJT808Builder jT808Builder, Action config) { JT808GatewayBuilderDefault jT808GatewayBuilderDefault = new JT808GatewayBuilderDefault(jT808Builder); @@ -63,7 +32,12 @@ namespace JT808.Gateway jT808GatewayBuilderDefault.AddJT808Core(); return jT808GatewayBuilderDefault; } - + /// + /// 添加808网关 + /// + /// + /// + /// public static IJT808GatewayBuilder AddGateway(this IJT808Builder jT808Builder, IConfiguration configuration) { JT808GatewayBuilderDefault jT808GatewayBuilderDefault = new JT808GatewayBuilderDefault(jT808Builder); @@ -71,28 +45,33 @@ namespace JT808.Gateway jT808GatewayBuilderDefault.AddJT808Core(); return jT808GatewayBuilderDefault; } - - public static IJT808GatewayBuilder ReplaceMessageHandler(this IJT808GatewayBuilder config) - where TJT808MessageHandler : JT808MessageHandler - { - config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(JT808MessageHandler), typeof(TJT808MessageHandler), ServiceLifetime.Singleton)); - return config; - } - + /// + /// 添加tcp服务器 + /// + /// + /// public static IJT808GatewayBuilder AddTcp(this IJT808GatewayBuilder config) { config.JT808Builder.Services.AddHostedService(); config.JT808Builder.Services.AddHostedService(); return config; } - + /// + /// 添加udp服务器 + /// + /// + /// public static IJT808GatewayBuilder AddUdp(this IJT808GatewayBuilder config) { config.JT808Builder.Services.AddHostedService(); config.JT808Builder.Services.AddHostedService(); return config; } - + /// + /// 添加http服务器 + /// + /// + /// public static IJT808GatewayBuilder AddHttp(this IJT808GatewayBuilder config) { config.JT808Builder.Services.AddSingleton(); @@ -100,7 +79,12 @@ namespace JT808.Gateway config.JT808Builder.Services.AddHostedService(); return config; } - + /// + /// 添加http服务器 + /// + /// + /// + /// public static IJT808GatewayBuilder AddHttp(this IJT808GatewayBuilder config) where TJT808MsgIdDefaultWebApiHandler: JT808MsgIdDefaultWebApiHandler { @@ -109,11 +93,88 @@ namespace JT808.Gateway config.JT808Builder.Services.AddHostedService(); return config; } - + /// + /// 添加消息业务处理程序 + /// + /// + /// + /// + public static IJT808GatewayBuilder AddMessageHandler(this IJT808GatewayBuilder config) + where TJT808MessageHandler : JT808MessageHandler + { + config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(JT808MessageHandler), typeof(TJT808MessageHandler), ServiceLifetime.Singleton)); + return config; + } + /// + /// 添加Http服务认证机制 + /// + /// + /// + /// + public static IJT808GatewayBuilder AddHttpAuthorization(this IJT808GatewayBuilder config) + where TJT808Authorization : IJT808Authorization + { + config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808Authorization), typeof(TJT808Authorization), ServiceLifetime.Singleton)); + return config; + } + /// + /// 添加消息生产者 + /// + /// + /// + /// + public static IJT808GatewayBuilder AddMsgProducer(this IJT808GatewayBuilder config) + where TJT808MsgProducer : IJT808MsgProducer + { + config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(TJT808MsgProducer), ServiceLifetime.Singleton)); + return config; + } + /// + /// 添加消息应答后的应答生产者 + /// + /// + /// + /// + public static IJT808GatewayBuilder AddMsgReplyLoggingProducer(this IJT808GatewayBuilder config) + where TJT808MsgReplyLoggingProducer : IJT808MsgReplyLoggingProducer + { + config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyLoggingProducer), typeof(TJT808MsgReplyLoggingProducer), ServiceLifetime.Singleton)); + return config; + } + /// + /// 添加消息应答消费者 + /// + /// + /// + /// + public static IJT808GatewayBuilder AddMsgReplyConsumer(this IJT808GatewayBuilder config) + where TJT808MsgReplyConsumer : IJT808MsgReplyConsumer + { + config.JT808Builder.Services.Add(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(TJT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return config; + } + /// + /// 必须注册的 + /// + /// + public static void Register(this IJT808GatewayBuilder config) + { + if(config.JT808Builder.Services.Where(s => s.ServiceType == typeof(IJT808MsgReplyConsumer)).Count() > 0) + { + config.JT808Builder.Services.AddHostedService(); + } + } + /// + /// 添加公共模块 + /// + /// + /// private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config) { config.JT808Builder.Services.AddSingleton(); config.JT808Builder.Services.AddSingleton(); + config.JT808Builder.Services.AddSingleton(); + config.JT808Builder.Services.AddSingleton(); return config; } } diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index 5304786..2d5c88b 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -230,7 +230,7 @@ namespace JT808.Gateway } public Task StopAsync(CancellationToken cancellationToken) { - Logger.LogInformation("808 Tcp Server Stop"); + Logger.LogInformation("JT808 Tcp Server Stop"); if (server?.Connected ?? false) server.Shutdown(SocketShutdown.Both); server?.Close(); diff --git a/src/JT808.Gateway/JT808UdpServer.cs b/src/JT808.Gateway/JT808UdpServer.cs index 0fda03e..c21b320 100644 --- a/src/JT808.Gateway/JT808UdpServer.cs +++ b/src/JT808.Gateway/JT808UdpServer.cs @@ -43,7 +43,7 @@ namespace JT808.Gateway JT808MessageHandler messageHandler) { SessionManager = jT808SessionManager; - Logger = loggerFactory.CreateLogger("JT808UdpServer"); + Logger = loggerFactory.CreateLogger(); Serializer = jT808Config.GetSerializer(); Configuration = jT808ConfigurationAccessor.Value; MessageHandler = messageHandler; @@ -113,7 +113,7 @@ namespace JT808.Gateway } public Task StopAsync(CancellationToken cancellationToken) { - Logger.LogInformation("808 Udp Server Stop"); + Logger.LogInformation("JT808 Udp Server Stop"); if (server?.Connected ?? false) server.Shutdown(SocketShutdown.Both); server?.Close(); diff --git a/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs b/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs deleted file mode 100644 index 6362905..0000000 --- a/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; - -namespace JT808.Gateway.Metadata -{ - /// - /// - /// - /// - internal class JT808AtomicCounter - { - long counter = 0; - - public JT808AtomicCounter(long initialCount = 0) - { - this.counter = initialCount; - } - - public void Reset() - { - Interlocked.Exchange(ref counter, 0); - } - - public long Increment() - { - return Interlocked.Increment(ref counter); - } - - public long Add(long len) - { - return Interlocked.Add(ref counter,len); - } - - public long Decrement() - { - return Interlocked.Decrement(ref counter); - } - - public long Count - { - get - { - return Interlocked.Read(ref counter); - } - } - } -} diff --git a/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs index dff56f4..8df8b38 100644 --- a/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs +++ b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs @@ -1,6 +1,7 @@ using JT808.Gateway.Abstractions; using JT808.Gateway.Abstractions.Configurations; using JT808.Gateway.Session; +using JT808.Protocol.Extensions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; @@ -16,19 +17,30 @@ namespace JT808.Gateway.Services private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer; + private ILogger logger; + public JT808MsgReplyHostedService( + ILoggerFactory loggerFactory, IJT808MsgReplyConsumer jT808MsgReplyConsumer, JT808SessionManager jT808SessionManager) { JT808MsgReplyConsumer = jT808MsgReplyConsumer; JT808SessionManager = jT808SessionManager; + logger = loggerFactory.CreateLogger(); } public Task StartAsync(CancellationToken cancellationToken) { JT808MsgReplyConsumer.OnMessage(async(item) => { - await JT808SessionManager.TrySendByTerminalPhoneNoAsync(item.TerminalNo, item.Data); + try + { + await JT808SessionManager.TrySendByTerminalPhoneNoAsync(item.TerminalNo, item.Data); + } + catch (Exception ex) + { + logger.LogError(ex, $"{item.TerminalNo}-{item.Data.ToHexString()}"); + } }); JT808MsgReplyConsumer.Subscribe(); return Task.CompletedTask; diff --git a/src/JT808.Gateway/Session/JT808SessionManager.cs b/src/JT808.Gateway/Session/JT808SessionManager.cs index 5fc5fe9..45f8563 100644 --- a/src/JT808.Gateway/Session/JT808SessionManager.cs +++ b/src/JT808.Gateway/Session/JT808SessionManager.cs @@ -29,14 +29,14 @@ namespace JT808.Gateway.Session JT808SessionProducer = jT808SessionProducer; Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - logger = loggerFactory.CreateLogger("JT808SessionManager"); + logger = loggerFactory.CreateLogger(); } public JT808SessionManager(ILoggerFactory loggerFactory) { Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - logger = loggerFactory.CreateLogger("JT808SessionManager"); + logger = loggerFactory.CreateLogger(); } public int TotalSessionCount @@ -126,7 +126,7 @@ namespace JT808.Gateway.Session public async ValueTask TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data) { - if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session)) + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var session)) { if (session.TransportProtocolType == JT808TransportProtocolType.tcp) {