From 9fbd8dfc2da89c19fb11cb9c94aa5ce0ff87ff3f Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Thu, 29 Oct 2020 15:45:52 +0800 Subject: [PATCH] =?UTF-8?q?pipeline-preview3=201.=E5=AE=8C=E5=96=84?= =?UTF-8?q?=E9=98=9F=E5=88=97=E6=A8=A1=E5=BC=8F=E7=9A=84=E6=9C=8D=E5=8A=A1?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=202.=E4=BF=AE=E5=A4=8D=E5=BA=94=E7=AD=94?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E7=9A=84=E6=B6=88=E6=81=AF=E5=A4=84=E7=90=86?= =?UTF-8?q?=E6=B3=A8=E5=86=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configurations/JT808Configuration.cs | 2 +- .../JT808.Gateway.Abstractions.csproj | 2 +- .../JT808.Gateway.Client.csproj | 2 +- .../JT808.Gateway.Kafka.xml | 6 +- .../JT808ClientKafkaExtensions.cs | 6 +- .../JT808ReplyMessageExtensions.cs | 5 +- .../JT808ReplyMessageHostedService.cs | 24 +++++- .../Impl/JT808CustomMessageHandlerImpl.cs | 76 +++++++++++++++++++ .../Impl/JT808ReplyMessageHandlerImpl.cs | 59 ++++++++++++++ .../JT808.Gateway.QueueHosting/Program.cs | 7 +- src/Version.props | 2 +- 11 files changed, 174 insertions(+), 17 deletions(-) create mode 100644 src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs create mode 100644 src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs diff --git a/src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs b/src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs index eac429b..6650c9a 100644 --- a/src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs +++ b/src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs @@ -35,6 +35,6 @@ namespace JT808.Gateway.Abstractions.Configurations /// /// 网关不做消息业务处理,往队列发送 /// - public List FilterMsgIdHandlerForQueue { get; set; } + public List FilterMsgIdHandlerForQueue { get; set; } = new List(); } } diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj index 28b7848..b23f24f 100644 --- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj @@ -32,7 +32,7 @@ - + diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj index 8a38222..d516862 100644 --- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj +++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj @@ -22,7 +22,7 @@ - + diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml index b0c6f21..af8a12e 100644 --- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml @@ -8,7 +8,7 @@ - + GetSection("JT808MsgConsumerConfig") @@ -16,7 +16,7 @@ - + GetSection("JT808MsgReplyProducerConfig") @@ -40,7 +40,7 @@ - + GetSection("JT808SessionConsumerConfig") diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs index 9a9d0d8..ee4a19f 100644 --- a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs +++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs @@ -17,7 +17,7 @@ namespace JT808.Gateway.Kafka /// /// /// - /// + /// /// GetSection("JT808MsgConsumerConfig") /// public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) @@ -29,7 +29,7 @@ namespace JT808.Gateway.Kafka /// /// /// - /// + /// /// GetSection("JT808MsgReplyProducerConfig") /// public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) @@ -65,7 +65,7 @@ namespace JT808.Gateway.Kafka /// /// /// - /// + /// /// GetSection("JT808SessionConsumerConfig") /// public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs index bc44020..4a2ba60 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs @@ -1,6 +1,7 @@  using JT808.Gateway.Abstractions; using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; using System; using System.Collections.Generic; using System.Text; @@ -10,14 +11,14 @@ namespace JT808.Gateway.ReplyMessage public static class JT808ReplyMessageExtensions { /// - /// 消息应答服务(不同的消费者实例) + /// 消息应答服务 /// /// /// public static IJT808ClientBuilder AddReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) where TJT808ReplyMessageHandler : IJT808ReplyMessageHandler { - jT808ClientBuilder.JT808Builder.Services.AddSingleton(new ServiceDescriptor(typeof(IJT808ReplyMessageHandler),typeof(TJT808ReplyMessageHandler), ServiceLifetime.Singleton)); + jT808ClientBuilder.JT808Builder.Services.Add(new ServiceDescriptor(typeof(IJT808ReplyMessageHandler),typeof(TJT808ReplyMessageHandler), ServiceLifetime.Singleton)); jT808ClientBuilder.JT808Builder.Services.AddHostedService(); return jT808ClientBuilder; } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs index b2180d9..35178c6 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs @@ -2,6 +2,7 @@ using Microsoft.Extensions.Hosting; using System.Threading; using JT808.Gateway.Abstractions; +using Microsoft.Extensions.Logging; namespace JT808.Gateway.ReplyMessage { @@ -9,20 +10,37 @@ namespace JT808.Gateway.ReplyMessage { private IJT808MsgConsumer jT808MsgConsumer; private IJT808ReplyMessageHandler jT808ReplyMessageHandler; - + private IJT808MsgReplyProducer jT808MsgReplyProducer; + private ILogger logger; public JT808ReplyMessageHostedService( + ILoggerFactory loggerFactory, IJT808ReplyMessageHandler jT808ReplyMessageHandler, + IJT808MsgReplyProducer jT808MsgReplyProducer, IJT808MsgConsumer jT808MsgConsumer) { this.jT808MsgConsumer = jT808MsgConsumer; + this.jT808MsgReplyProducer = jT808MsgReplyProducer; this.jT808ReplyMessageHandler = jT808ReplyMessageHandler; + this.logger = loggerFactory.CreateLogger(); } public Task StartAsync(CancellationToken cancellationToken) { jT808MsgConsumer.Subscribe(); - jT808MsgConsumer.OnMessage((Message)=> { - jT808ReplyMessageHandler.Processor(Message.TerminalNo, Message.Data); + jT808MsgConsumer.OnMessage(async (Message) => + { + try + { + var data = jT808ReplyMessageHandler.Processor(Message.TerminalNo, Message.Data); + if (data != null) + { + await jT808MsgReplyProducer.ProduceAsync(Message.TerminalNo, data); + } + } + catch (System.Exception ex) + { + logger.LogError(ex, ""); + } }); return Task.CompletedTask; } diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs new file mode 100644 index 0000000..9cba695 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs @@ -0,0 +1,76 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Configurations; +using JT808.Gateway.MsgLogging; +using JT808.Gateway.Transmit; +using JT808.Protocol; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.QueueHosting.Impl +{ + public class JT808CustomMessageHandlerImpl : JT808MessageHandler + { + private readonly ILogger logger; + public JT808CustomMessageHandlerImpl( + ILoggerFactory loggerFactory, + IOptionsMonitor jT808ConfigurationOptionsMonitor, + IJT808MsgProducer msgProducer, + IJT808MsgReplyLoggingProducer msgReplyLoggingProducer, + IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor, + msgProducer, + msgReplyLoggingProducer, + jT808Config) + { + logger = loggerFactory.CreateLogger(); + //过滤掉0x0200消息,通过服务服务进行下发应答,可以通过配置文件的方式进行增加修改(支持热更新) + jT808ConfigurationOptionsMonitor.CurrentValue.FilterMsgIdHandlerForQueue.Add(0x0200); + //添加自定义消息 + HandlerDict.Add(0x9999, Msg0x9999); + } + + + /// + /// 重写消息处理器 + /// + /// + /// + public override byte[] Processor(JT808HeaderPackage request, IJT808Session session) + { + try + { + var down = base.Processor(request, session); + return down; + } + catch (Exception) + { + return default; + } + } + + /// + /// 重写自带的消息 + /// + /// + /// + public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session) + { + logger.LogDebug("由于过滤了0x0200,网关是不会处理0x0200消息的应答"); + var data = base.Msg0x0200(request, session); + return data; + } + + /// + /// 自定义消息 + /// + /// + /// + public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session) + { + logger.LogDebug("自定义消息"); + return default; + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs new file mode 100644 index 0000000..8e71c1e --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs @@ -0,0 +1,59 @@ +using JT808.Gateway.Abstractions; +using JT808.Protocol; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.QueueHosting.Impl +{ + public class JT808ReplyMessageHandlerImpl : IJT808ReplyMessageHandler + { + private ILogger logger; + private JT808Serializer Serializer; + + public JT808ReplyMessageHandlerImpl( + IJT808Config jT808Config, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + Serializer = jT808Config.GetSerializer(); + } + + public byte[] Processor(string TerminalNo, byte[] Data) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"实现消息应答处理,{TerminalNo},{Data.ToHexString()}"); + } + var package = Serializer.Deserialize(Data); + if (package.Header.MsgId == 0x0200) + { + if (package.Version == JT808Version.JTT2019) + { + byte[] data = Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(package.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = package.Header.MsgNum + })); + return data; + } + else + { + byte[] data = Serializer.Serialize(JT808MsgId.平台通用应答.Create(package.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = package.Header.MsgNum + })); + return data; + } + } + return default; + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs index 428e894..837d645 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs @@ -15,6 +15,7 @@ using JT808.Gateway.Client; using JT808.Gateway.QueueHosting.Jobs; using JT808.Gateway.Kafka; using JT808.Gateway.WebApiClientTool; +using JT808.Gateway.QueueHosting.Impl; namespace JT808.Gateway.QueueHosting { @@ -48,12 +49,14 @@ namespace JT808.Gateway.QueueHosting //添加客户端服务 .AddClientKafka() .AddMsgConsumer(hostContext.Configuration) - //添加消息应答服务 + //添加消息应答生产者 .AddMsgReplyProducer(hostContext.Configuration) + //添加消息应答服务并实现消息应答处理 + .AddReplyMessage() .Builder() //添加消息应答处理 - //.AddReplyMessage(); .AddGateway(hostContext.Configuration) + .AddMessageHandler() .AddServerKafkaMsgProducer(hostContext.Configuration) .AddServerKafkaSessionProducer(hostContext.Configuration) .AddServerKafkaMsgReplyConsumer(hostContext.Configuration) diff --git a/src/Version.props b/src/Version.props index 5444c47..4a3e028 100644 --- a/src/Version.props +++ b/src/Version.props @@ -1,6 +1,6 @@  2.3.2 - 1.0.2-preview2 + 1.0.2-preview3 \ No newline at end of file