From 88fe3cbae2d56b1ec37b337c092ce7fe4793bec9 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Thu, 2 Jan 2020 23:38:39 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E5=BE=AE=E8=BD=AF=E6=8F=90?= =?UTF-8?q?=E4=BE=9B=E7=9A=84=E5=8F=91=E5=B8=83=E8=AE=A2=E9=98=85channel?= =?UTF-8?q?=202.=E4=BF=AE=E6=94=B9=E5=BC=80=E5=8F=91=E7=8E=AF=E5=A2=83?= =?UTF-8?q?=E6=89=A9=E5=B1=95=E9=85=8D=E7=BD=AE=203.=E4=BF=AE=E6=94=B9?= =?UTF-8?q?=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 33 +++++++++++++------ .../JT808.Gateway.TestHosting.csproj | 1 + src/JT808.Gateway.TestHosting/Program.cs | 19 +++++++++-- .../Internal/JT808MsgProducerDefault.cs | 5 ++- .../Internal/JT808MsgReplyConsumerDefault.cs | 17 ++++++---- src/JT808.Gateway/Internal/JT808MsgService.cs | 20 ++++++++++- src/JT808.Gateway/JT808.Gateway.csproj | 1 + src/JT808.Gateway/JT808GatewayExtensions.cs | 3 +- 8 files changed, 74 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index ca5946d..e991ed5 100644 --- a/README.md +++ b/README.md @@ -27,7 +27,7 @@ ![design_model](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/design_model.png) -## 集成接口功能(JT808.DotNetty.Abstractions) +## 集成接口功能 |接口名称|接口说明|使用场景| |:------:|:------|:------| @@ -51,7 +51,7 @@ |Traffic|流量统计服务 |由于运营商sim卡查询流量滞后,通过流量统计服务可以实时准确的统计设备流量,可以最优配置设备的流量大小,以节省成本 |Transmit| 原包转发服务|该服务可以将设备上报原始数据转发到第三方,支持全部转发,指定终端号转发| -## 基于WebApi的消息业务处理程序(JT808.DotNetty.WebApi) +## 基于WebApi的消息业务处理程序 通过继承JT808.DotNetty.Core.Handlers.JT808MsgIdHttpHandlerBase去实现自定义的WebApi接口服务。 @@ -86,12 +86,12 @@ | Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) | | Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | | Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | -| Install-Package JT808.Gateway.Transmit | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) | -| Install-Package JT808.Gateway.Traffic | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)| -| Install-Package JT808.Gateway.SessionNotice | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)| -| Install-Package JT808.Gateway.ReplyMessage | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)| -| Install-Package JT808.Gateway.MsgLogging | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)| -| Install-Package JT808.Gateway.MsgIdHandler | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)| +| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) | +| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)| +| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)| +| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)| +| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)| +| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)| ## 举个栗子1 @@ -168,10 +168,23 @@ static async Task Main(string[] args) services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT808Configure() - .AddJT808Gateway() + //用于测试网关 + .AddJT808DevelopmentGateway() + //用于生产环境 + //.AddJT808Gateway(options => + //{ + // options.TcpPort=8086; + // options.UdpPort=8086; + // options.MessageQueueType = JT808MessageQueueType.InPlug; + //}) .AddTcp() .AddUdp() - .AddGrpc(); + .AddGrpc() + //kafka插件 + //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) + //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) + //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) + ; //services.AddHostedService(); }); diff --git a/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj b/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj index 8d084b2..ef98ad6 100644 --- a/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj +++ b/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj @@ -14,6 +14,7 @@ + diff --git a/src/JT808.Gateway.TestHosting/Program.cs b/src/JT808.Gateway.TestHosting/Program.cs index 7d8573e..a82a825 100644 --- a/src/JT808.Gateway.TestHosting/Program.cs +++ b/src/JT808.Gateway.TestHosting/Program.cs @@ -7,6 +7,8 @@ using JT808.Protocol; using Microsoft.Extensions.Configuration; using NLog.Extensions.Logging; using JT808.Gateway.TestHosting.Jobs; +using JT808.Gateway.Enums; +using JT808.Gateway.Kafka; namespace JT808.Gateway.TestHosting { @@ -33,10 +35,23 @@ namespace JT808.Gateway.TestHosting services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT808Configure() - .AddJT808Gateway() + //用于测试网关 + .AddJT808DevelopmentGateway() + //用于生产环境 + //.AddJT808Gateway(options => + //{ + // options.TcpPort=8086; + // options.UdpPort=8086; + // options.MessageQueueType = JT808MessageQueueType.InPlug; + //}) .AddTcp() .AddUdp() - .AddGrpc(); + .AddGrpc() + //kafka插件 + //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) + //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) + //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) + ; //services.AddHostedService(); }); diff --git a/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs b/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs index 610dd97..e17fefb 100644 --- a/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs +++ b/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs @@ -19,10 +19,9 @@ namespace JT808.Gateway.Internal { } - public ValueTask ProduceAsync(string terminalNo, byte[] data) + public async ValueTask ProduceAsync(string terminalNo, byte[] data) { - JT808MsgService.MsgQueue.Add((terminalNo, data)); - return default; + await JT808MsgService.WriteAsync(terminalNo, data); } } } diff --git a/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs b/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs index 4276be3..2dad063 100644 --- a/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs +++ b/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs @@ -50,19 +50,22 @@ namespace JT808.Gateway.Internal { Task.Run(() => { - foreach(var item in JT808MsgService.MsgQueue.GetConsumingEnumerable()) + while (!Cts.IsCancellationRequested) { try { - var package = JT808Serializer.HeaderDeserialize(item.Data); - if (HandlerDict.TryGetValue(package.Header.MsgId, out var func)) + if(JT808MsgService.TryRead(out var item)) { - var buffer = func(package); - if (buffer != null) + JT808HeaderPackage package = JT808Serializer.HeaderDeserialize(item.Data); + if (HandlerDict.TryGetValue(package.Header.MsgId, out var func)) { - callback((item.TerminalNo, buffer)); + var buffer = func(package); + if (buffer != null) + { + callback((item.TerminalNo, buffer)); + } } - } + } } catch { diff --git a/src/JT808.Gateway/Internal/JT808MsgService.cs b/src/JT808.Gateway/Internal/JT808MsgService.cs index c1d6095..367be21 100644 --- a/src/JT808.Gateway/Internal/JT808MsgService.cs +++ b/src/JT808.Gateway/Internal/JT808MsgService.cs @@ -1,11 +1,29 @@ using System; using System.Collections.Generic; using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; namespace JT808.Gateway.Internal { internal class JT808MsgService { - public System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)> MsgQueue { get; set; } = new System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)>(); + private readonly Channel<(string TerminalNo, byte[] Data)> _channel; + + public JT808MsgService() + { + _channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); + } + + public async ValueTask WriteAsync(string terminalNo, byte[] data) + { + await _channel.Writer.WriteAsync((terminalNo, data)); + } + + public bool TryRead(out (string TerminalNo, byte[] Data) item) + { + return _channel.Reader.TryRead(out item); + } } } diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj index ae6c5df..608880b 100644 --- a/src/JT808.Gateway/JT808.Gateway.csproj +++ b/src/JT808.Gateway/JT808.Gateway.csproj @@ -24,6 +24,7 @@ + diff --git a/src/JT808.Gateway/JT808GatewayExtensions.cs b/src/JT808.Gateway/JT808GatewayExtensions.cs index 0f2891d..5bb885a 100644 --- a/src/JT808.Gateway/JT808GatewayExtensions.cs +++ b/src/JT808.Gateway/JT808GatewayExtensions.cs @@ -16,7 +16,7 @@ namespace JT808.Gateway { public static partial class JT808GatewayExtensions { - public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder) + public static IJT808GatewayBuilder AddJT808DevelopmentGateway(this IJT808Builder jt808Builder) { IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); server.JT808Builder.Services.TryAddSingleton(); @@ -62,7 +62,6 @@ namespace JT808.Gateway private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config) { - config.JT808Builder.Services.TryAddSingleton(); config.JT808Builder.Services.TryAddSingleton(); config.JT808Builder.Services.TryAddSingleton(); config.JT808Builder.Services.TryAddSingleton();