diff --git a/README.md b/README.md index 2df8106..fd5839c 100644 --- a/README.md +++ b/README.md @@ -1,9 +1,9 @@ # JT808Gateway -基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理 - 基于Pipeline封装的JT808Pipeline支持TCP/UDP通用消息业务处理 +基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理 + [了解JT808协议进这边](https://github.com/SmallChi/JT808) [了解JT809协议进这边](https://github.com/SmallChi/JT809) @@ -59,6 +59,22 @@ [GRPC消息业务处理协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto) +## 基于core 3.1 Pipeline的NuGet安装 + +| Package Name | Version | Downloads | +| --------------------- | -------------------------------------------------- | --------------------------------------------------- | +| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) | +| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | +| Install-Package JT808.Gateway.Client| ![JT808.Gateway.Client](https://img.shields.io/nuget/v/JT808.Gateway.Client.svg) | ![JT808.Gateway.Client](https://img.shields.io/nuget/dt/JT808.Gateway.Client.svg) | +| Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) | +| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | +| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) | +| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)| +| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)| +| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)| +| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)| +| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)| + ## 基于DotNetty的NuGet安装 | Package Name | Version | Downloads | @@ -79,22 +95,6 @@ | Install-Package JT808.DotNetty.Kafka | ![JT808.DotNetty.Kafka](https://img.shields.io/nuget/v/JT808.DotNetty.Kafka.svg) | ![JT808.DotNetty.Kafka](https://img.shields.io/nuget/dt/JT808.DotNetty.Kafka.svg) | | Install-Package JT808.DotNetty.RabbitMQ | ![JT808.DotNetty.RabbitMQ](https://img.shields.io/nuget/v/JT808.DotNetty.RabbitMQ.svg) | ![JT808.DotNetty.RabbitMQ](https://img.shields.io/nuget/dt/JT808.DotNetty.RabbitMQ.svg) | -## 基于core 3.1 Pipeline的NuGet安装 - -| Package Name | Version | Downloads | -| --------------------- | -------------------------------------------------- | --------------------------------------------------- | -| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) | -| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | -| Install-Package JT808.Gateway.Client| ![JT808.Gateway.Client](https://img.shields.io/nuget/v/JT808.Gateway.Client.svg) | ![JT808.Gateway.Client](https://img.shields.io/nuget/dt/JT808.Gateway.Client.svg) | -| Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) | -| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | -| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) | -| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)| -| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)| -| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)| -| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)| -| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)| - ## 举个栗子 ### Pipeline diff --git a/src/JT808.Gateway.Abstractions/Enums/JT808ConsumerType.cs b/src/JT808.Gateway.Abstractions/Enums/JT808ConsumerType.cs new file mode 100644 index 0000000..5c9d8e8 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/Enums/JT808ConsumerType.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions.Enums +{ + [Flags] + public enum JT808ConsumerType:int + { + MsgIdHandlerConsumer=1, + MsgLoggingConsumer=2, + ReplyMessageConsumer=4, + TrafficConsumer=8, + TransmitConsumer=16, + ReplyMessageLoggingConsumer = 32, + All = 64, + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgConsumerFactory.cs b/src/JT808.Gateway.Abstractions/IJT808MsgConsumerFactory.cs new file mode 100644 index 0000000..6acea41 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgConsumerFactory.cs @@ -0,0 +1,12 @@ +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgConsumerFactory + { + IJT808MsgConsumer Create(JT808ConsumerType consumerType); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumerFactory.cs b/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumerFactory.cs new file mode 100644 index 0000000..23b63ad --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumerFactory.cs @@ -0,0 +1,13 @@ +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgReplyConsumerFactory + { + IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType); + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgIdHandlerConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgIdHandlerConsumer.cs new file mode 100644 index 0000000..791d61c --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgIdHandlerConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgIdHandlerConsumer : IJT808MsgConsumer + { + private readonly JT808MsgIdHandlerService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgIdHandlerConsumer( + JT808MsgIdHandlerService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgIdHandlerConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgLoggingConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgLoggingConsumer.cs new file mode 100644 index 0000000..f2949bc --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgLoggingConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgLoggingConsumer : IJT808MsgConsumer + { + private readonly JT808MsgLoggingService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgLoggingConsumer( + JT808MsgLoggingService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgLoggingConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageConsumer.cs new file mode 100644 index 0000000..009404c --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgReplyMessageConsumer : IJT808MsgConsumer + { + private readonly JT808MsgReplyMessageService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgReplyMessageConsumer( + JT808MsgReplyMessageService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgReplyMessageConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageLoggingConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageLoggingConsumer.cs new file mode 100644 index 0000000..2c5dc5b --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageLoggingConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgReplyMessageLoggingConsumer : IJT808MsgReplyConsumer + { + private readonly JT808MsgReplyMessageLoggingService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgReplyMessageLoggingConsumer( + JT808MsgReplyMessageLoggingService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgReplyMessageLoggingConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTrafficConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTrafficConsumer.cs new file mode 100644 index 0000000..ed7c8a5 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTrafficConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgTrafficConsumer : IJT808MsgConsumer + { + private readonly JT808MsgTrafficService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgTrafficConsumer( + JT808MsgTrafficService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgTrafficConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTransmitConsumer.cs b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTransmitConsumer.cs new file mode 100644 index 0000000..0bce3ab --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTransmitConsumer.cs @@ -0,0 +1,60 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgTransmitConsumer : IJT808MsgConsumer + { + private readonly JT808MsgTransmitService JT808MsgService; + public CancellationTokenSource Cts => new CancellationTokenSource(); + private readonly ILogger logger; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgTransmitConsumer( + JT808MsgTransmitService jT808MsgService, + ILoggerFactory loggerFactory) + { + JT808MsgService = jT808MsgService; + logger = loggerFactory.CreateLogger("JT808MsgTransmitConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(async() => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); + } + catch(Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs index 4f890e5..516b65d 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs @@ -1,4 +1,5 @@ using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; using JT808.Gateway.InMemoryMQ.Services; using Microsoft.Extensions.Logging; using System; @@ -12,14 +13,17 @@ namespace JT808.Gateway.InMemoryMQ public class JT808MsgConsumer : IJT808MsgConsumer { private readonly JT808MsgService JT808MsgService; + private readonly Func func; public CancellationTokenSource Cts => new CancellationTokenSource(); private readonly ILogger logger; public string TopicName => JT808GatewayConstants.MsgTopic; public JT808MsgConsumer( + Func func, JT808MsgService jT808MsgService, ILoggerFactory loggerFactory) { JT808MsgService = jT808MsgService; + this.func = func; logger = loggerFactory.CreateLogger("JT808MsgConsumer"); } @@ -32,7 +36,15 @@ namespace JT808.Gateway.InMemoryMQ try { var item = await JT808MsgService.ReadAsync(Cts.Token); - callback(item); + foreach(var type in JT808ServerInMemoryMQExtensions.ConsumerTypes) + { + var method = func(type); + if (method != null) + { + await method.WriteAsync(item.TerminalNo, item.Data); + } + } + //callback(item); } catch(Exception ex) { diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerFactory.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerFactory.cs new file mode 100644 index 0000000..49af6c1 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerFactory.cs @@ -0,0 +1,23 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgConsumerFactory : IJT808MsgConsumerFactory + { + private readonly Func factory; + + public JT808MsgConsumerFactory(Func accesor) + { + factory = accesor; + } + + public IJT808MsgConsumer Create(JT808ConsumerType consumerType) + { + return factory(consumerType); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerInMemoryHostedService.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerInMemoryHostedService.cs new file mode 100644 index 0000000..82a5091 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerInMemoryHostedService.cs @@ -0,0 +1,32 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgConsumerInMemoryHostedService : IHostedService + { + private readonly IJT808MsgConsumer jT808MsgConsumer; + + public JT808MsgConsumerInMemoryHostedService( + IJT808MsgConsumer jT808MsgConsumer) + { + this.jT808MsgConsumer = jT808MsgConsumer; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(null); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumerFactory.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumerFactory.cs new file mode 100644 index 0000000..ab256de --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumerFactory.cs @@ -0,0 +1,23 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808MsgReplyConsumerFactory : IJT808MsgReplyConsumerFactory + { + private readonly Func factory; + + public JT808MsgReplyConsumerFactory(Func accesor) + { + factory = accesor; + } + + public IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType) + { + return factory(consumerType); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs index bd9c3b3..76ce9b4 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs @@ -1,4 +1,5 @@ using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; using JT808.Gateway.InMemoryMQ.Services; using System; using System.Collections.Generic; @@ -10,14 +11,29 @@ namespace JT808.Gateway.InMemoryMQ public class JT808MsgReplyProducer : IJT808MsgReplyProducer { public string TopicName => JT808GatewayConstants.MsgReplyTopic; + + //JT808ServerInMemoryMQExtensions private readonly JT808ReplyMsgService JT808ReplyMsgService; - public JT808MsgReplyProducer(JT808ReplyMsgService jT808ReplyMsgService) + + private readonly Func func; + public JT808MsgReplyProducer( + Func func, + JT808ReplyMsgService jT808ReplyMsgService) { + this.func = func; JT808ReplyMsgService = jT808ReplyMsgService; } public async ValueTask ProduceAsync(string terminalNo, byte[] data) { await JT808ReplyMsgService.WriteAsync(terminalNo, data); + if (JT808ServerInMemoryMQExtensions.ReplyMessageLoggingConsumer.HasValue) + { + var method = func(JT808ConsumerType.ReplyMessageLoggingConsumer); + if (method != null) + { + await method.WriteAsync(terminalNo, data); + } + } } public void Dispose() { diff --git a/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs b/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs index 402ca86..ac3a0ff 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs @@ -1,25 +1,363 @@ using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; using JT808.Gateway.InMemoryMQ.Services; using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +[assembly: InternalsVisibleTo("JT808.Gateway.InMemoryMQ.Test")] namespace JT808.Gateway.InMemoryMQ { public static class JT808ServerInMemoryMQExtensions { + internal static List ConsumerTypes { get; private set; } + + static JT808ServerInMemoryMQExtensions() + { + ConsumerTypes = new List(); + } + + internal static JT808ConsumerType? ReplyMessageLoggingConsumer { get; private set; } + + /// + /// + /// + /// + /// + public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder, JT808ConsumerType consumerType) + { + if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All) + { + ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); + ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); + ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); + ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); + ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); + ConsumerTypes.Add(JT808ConsumerType.ReplyMessageLoggingConsumer); + } + else + { + if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); + } + if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); + } + if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); + } + if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); + } + if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); + } + if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer) + { + // + ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer; + } + } + jT808GatewayBuilder.AddServerInMemoryConsumers(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } + /// /// /// /// /// - public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder) + public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder,params JT808ConsumerType[] consumerTypes) { + if (consumerTypes == null) + { + throw new ArgumentNullException("消费类型不为空!"); + } + ConsumerTypes = consumerTypes.ToList(); + jT808GatewayBuilder.AddServerInMemoryConsumers(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } + + /// + /// + /// + /// + /// + internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, JT808ConsumerType consumerType) + { + if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All) + { + ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); + ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); + ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); + ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); + ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); + } + else + { + if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); + } + if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); + } + if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); + } + if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); + } + if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer) + { + ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); + } + if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer) + { + ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer; + } + } + serviceDescriptors.AddServerInMemoryConsumers(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddHostedService(); + return serviceDescriptors; + } + + /// + /// + /// + /// + /// + internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, params JT808ConsumerType[] consumerTypes) + { + if (consumerTypes == null) + { + throw new ArgumentNullException("消费类型不为空!"); + } + ConsumerTypes = consumerTypes.ToList(); + serviceDescriptors.AddServerInMemoryConsumers(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddHostedService(); + return serviceDescriptors; + } + + + /// + /// + /// + /// + /// + private static IJT808GatewayBuilder AddServerInMemoryConsumers(this IJT808GatewayBuilder jT808GatewayBuilder) + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.MsgIdHandlerConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.MsgLoggingConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TrafficConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TransmitConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.ReplyMessageLoggingConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.MsgIdHandlerConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.MsgLoggingConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TrafficConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TransmitConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageLoggingConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); return jT808GatewayBuilder; } + + /// + /// + /// + /// + /// + private static IServiceCollection AddServerInMemoryConsumers(this IServiceCollection serviceDescriptors) + { + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.MsgIdHandlerConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.MsgLoggingConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TrafficConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TransmitConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); + serviceDescriptors.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.ReplyMessageLoggingConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); + serviceDescriptors.AddSingleton((factory) => + { + Func accesor = type => + { + switch (type) + { + case JT808ConsumerType.MsgIdHandlerConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.MsgLoggingConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TrafficConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.TransmitConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageConsumer: + return factory.GetRequiredService(); + case JT808ConsumerType.ReplyMessageLoggingConsumer: + return factory.GetRequiredService(); + default: + return default; + } + }; + return accesor; + }); + return serviceDescriptors; + } } } \ No newline at end of file diff --git a/src/JT808.Gateway.InMemoryMQ/JT808SessionConsumer.cs b/src/JT808.Gateway.InMemoryMQ/JT808SessionConsumer.cs new file mode 100644 index 0000000..13719fa --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/JT808SessionConsumer.cs @@ -0,0 +1,63 @@ +using JT808.Gateway.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT808.Gateway.InMemoryMQ.Services; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808SessionConsumer : IJT808SessionConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly ILogger logger; + + public string TopicName { get; } = JT808GatewayConstants.SessionTopic; + + private readonly JT808SessionService JT808SessionService; + public JT808SessionConsumer( + JT808SessionService jT808SessionService, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + JT808SessionService = jT808SessionService; + } + + public void OnMessage(Action<(string Notice, string TerminalNo)> callback) + { + Task.Run(async () => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808SessionService.ReadAsync(Cts.Token); + callback(item); + } + catch (Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + + public void Subscribe() + { + + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/JT808SessionProducer.cs b/src/JT808.Gateway.InMemoryMQ/JT808SessionProducer.cs new file mode 100644 index 0000000..58c4f0b --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/JT808SessionProducer.cs @@ -0,0 +1,30 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ +{ + public class JT808SessionProducer : IJT808SessionProducer + { + public string TopicName { get; } = JT808GatewayConstants.SessionTopic; + + private readonly JT808SessionService JT808SessionService; + + public JT808SessionProducer(JT808SessionService jT808SessionService) + { + JT808SessionService = jT808SessionService; + } + + public async ValueTask ProduceAsync(string notice,string terminalNo) + { + await JT808SessionService.WriteAsync(notice, terminalNo); + } + + public void Dispose() + { + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgIdHandlerService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgIdHandlerService.cs new file mode 100644 index 0000000..028ba53 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgIdHandlerService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgIdHandlerService: JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgLoggingService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgLoggingService.cs new file mode 100644 index 0000000..4fc0fc9 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgLoggingService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgLoggingService: JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageLoggingService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageLoggingService.cs new file mode 100644 index 0000000..58c4001 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageLoggingService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgReplyMessageLoggingService : JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageService.cs new file mode 100644 index 0000000..e2831f3 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgReplyMessageService: JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs index 0ed1ee8..83ec2d1 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs @@ -7,23 +7,8 @@ using System.Threading.Tasks; namespace JT808.Gateway.InMemoryMQ.Services { - public class JT808MsgService + public class JT808MsgService: JT808MsgServiceBase { - private readonly Channel<(string TerminalNo, byte[] Data)> _channel; - - public JT808MsgService() - { - _channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); - } - - public async ValueTask WriteAsync(string terminalNo, byte[] data) - { - await _channel.Writer.WriteAsync((terminalNo, data)); - } - - public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) - { - return await _channel.Reader.ReadAsync(cancellationToken); - } + } } diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgServiceBase.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgServiceBase.cs new file mode 100644 index 0000000..8e3b067 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgServiceBase.cs @@ -0,0 +1,29 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgServiceBase + { + private readonly Channel<(string TerminalNo, byte[] Data)> _channel; + + public JT808MsgServiceBase() + { + _channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); + } + + public async ValueTask WriteAsync(string terminalNo, byte[] data) + { + await _channel.Writer.WriteAsync((terminalNo, data)); + } + + public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTrafficService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTrafficService.cs new file mode 100644 index 0000000..da8da45 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTrafficService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgTrafficService: JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTransmitService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTransmitService.cs new file mode 100644 index 0000000..fc52024 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTransmitService.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808MsgTransmitService: JT808MsgServiceBase + { + + } +} diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs index 028e2fd..a24200f 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs @@ -7,22 +7,8 @@ using System.Threading.Tasks; namespace JT808.Gateway.InMemoryMQ.Services { - public class JT808ReplyMsgService + public class JT808ReplyMsgService: JT808MsgServiceBase { - private readonly Channel<(string TerminalNo, byte[] Data)> _channel; - - public JT808ReplyMsgService() - { - _channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); - } - - public async ValueTask WriteAsync(string terminalNo, byte[] data) - { - await _channel.Writer.WriteAsync((terminalNo, data)); - } - public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) - { - return await _channel.Reader.ReadAsync(cancellationToken); - } + } } diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808SessionService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808SessionService.cs new file mode 100644 index 0000000..696b004 --- /dev/null +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808SessionService.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.InMemoryMQ.Services +{ + public class JT808SessionService + { + private readonly Channel<(string Notice, string TerminalNo)> _channel; + + public JT808SessionService() + { + _channel = Channel.CreateUnbounded<(string Notice, string TerminalNo)>(); + } + + public async ValueTask WriteAsync(string notice, string terminalNo) + { + await _channel.Writer.WriteAsync((notice, terminalNo)); + } + public async ValueTask<(string Notice, string TerminalNo)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs index aeb54b6..5cefd83 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs @@ -8,12 +8,20 @@ namespace JT808.Gateway.MsgIdHandler { public static class JT808MsgIdHandlerExtensions { - public static IJT808ClientBuilder AddJT808MsgIdHandler(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddMsgIdHandler(this IJT808ClientBuilder jT808ClientBuilder) where TJT808MsgIdHandler: IJT808MsgIdHandler { jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler),typeof(TJT808MsgIdHandler)); jT808ClientBuilder.JT808Builder.Services.AddHostedService(); return jT808ClientBuilder; } + + public static IJT808GatewayBuilder AddInMemoryMsgIdHandler(this IJT808GatewayBuilder jT808GatewayBuilder) + where TJT808MsgIdHandler : IJT808MsgIdHandler + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler), typeof(TJT808MsgIdHandler)); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerInMemoryHostedService.cs new file mode 100644 index 0000000..09d0661 --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerInMemoryHostedService.cs @@ -0,0 +1,35 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.MsgIdHandler +{ + public class JT808MsgIdHandlerInMemoryHostedService : IHostedService + { + private readonly IJT808MsgConsumer jT808MsgConsumer; + + private readonly IJT808MsgIdHandler jT808MsgIdHandler; + public JT808MsgIdHandlerInMemoryHostedService( + IJT808MsgIdHandler jT808MsgIdHandler, + IJT808MsgConsumerFactory jT808MsgConsumerFactory) + { + this.jT808MsgIdHandler = jT808MsgIdHandler; + this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(jT808MsgIdHandler.Processor); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgDownLoggingInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgDownLoggingInMemoryHostedService.cs new file mode 100644 index 0000000..8b53812 --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgDownLoggingInMemoryHostedService.cs @@ -0,0 +1,37 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.MsgLogging +{ + public class JT808MsgDownLoggingInMemoryHostedService : IHostedService + { + private readonly IJT808MsgReplyConsumer jT808MsgReplyConsumer; + private readonly IJT808MsgLogging jT808MsgLogging; + public JT808MsgDownLoggingInMemoryHostedService( + IJT808MsgLogging jT808MsgLogging, + IJT808MsgReplyConsumerFactory jT808MsgReplyConsumerFactory) + { + this.jT808MsgReplyConsumer = jT808MsgReplyConsumerFactory.Create(JT808ConsumerType.ReplyMessageLoggingConsumer); + this.jT808MsgLogging = jT808MsgLogging; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgReplyConsumer.Subscribe(); + jT808MsgReplyConsumer.OnMessage(item=> + { + jT808MsgLogging.Processor(item, JT808MsgLoggingType.down); + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgReplyConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs index 3bb00b6..2890b4b 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs @@ -8,7 +8,7 @@ namespace JT808.Gateway.MsgLogging { public static class JT808MsgLoggingExtensions { - public static IJT808ClientBuilder AddJT808MsgLogging(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddMsgLogging(this IJT808ClientBuilder jT808ClientBuilder) where TJT808MsgLogging: IJT808MsgLogging { jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging),typeof(TJT808MsgLogging)); @@ -16,5 +16,14 @@ namespace JT808.Gateway.MsgLogging jT808ClientBuilder.JT808Builder.Services.AddHostedService(); return jT808ClientBuilder; } + + public static IJT808GatewayBuilder AddInMemoryMsgLogging(this IJT808GatewayBuilder jT808GatewayBuilder) + where TJT808MsgLogging : IJT808MsgLogging + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging), typeof(TJT808MsgLogging)); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgUpLoggingInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgUpLoggingInMemoryHostedService.cs new file mode 100644 index 0000000..7784def --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgUpLoggingInMemoryHostedService.cs @@ -0,0 +1,37 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.MsgLogging +{ + public class JT808MsgUpLoggingInMemoryHostedService : IHostedService + { + private readonly IJT808MsgConsumer jT808MsgConsumer; + private readonly IJT808MsgLogging jT808MsgLogging; + public JT808MsgUpLoggingInMemoryHostedService( + IJT808MsgLogging jT808MsgLogging, + IJT808MsgConsumerFactory jT808MsgConsumerFactory) + { + this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer); + this.jT808MsgLogging = jT808MsgLogging; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(item=> + { + jT808MsgLogging.Processor(item, JT808MsgLoggingType.up); + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs index 88befbf..7a9e786 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs @@ -14,7 +14,7 @@ namespace JT808.Gateway.ReplyMessage /// /// /// - public static IJT808ClientBuilder AddJT808InPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddInPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) { jT808ClientBuilder.JT808Builder.Services.AddSingleton(); jT808ClientBuilder.JT808Builder.Services.AddHostedService(); @@ -26,7 +26,7 @@ namespace JT808.Gateway.ReplyMessage /// 自定义消息回复服务 /// /// - public static IJT808ClientBuilder AddJT808InPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddInPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) where TReplyMessageHandler : JT808ReplyMessageHandler { jT808ClientBuilder.JT808Builder.Services.AddSingleton(); @@ -39,11 +39,11 @@ namespace JT808.Gateway.ReplyMessage /// 自定义消息回复服务 /// /// - public static IJT808GatewayBuilder AddJT808InMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) + public static IJT808GatewayBuilder AddInMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) where TReplyMessageHandler : JT808ReplyMessageHandler { jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); return jT808GatewayBuilder; } /// @@ -54,7 +54,7 @@ namespace JT808.Gateway.ReplyMessage public static IJT808GatewayBuilder AddInMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) { jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); return jT808GatewayBuilder; } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageInMemoryHostedService.cs new file mode 100644 index 0000000..cb25d30 --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageInMemoryHostedService.cs @@ -0,0 +1,35 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.ReplyMessage +{ + public class JT808ReplyMessageInMemoryHostedService : IHostedService + { + private readonly IJT808MsgConsumer jT808MsgConsumer; + private readonly JT808ReplyMessageHandler jT808ReplyMessageHandler; + + public JT808ReplyMessageInMemoryHostedService( + JT808ReplyMessageHandler jT808ReplyMessageHandler, + IJT808MsgConsumerFactory jT808MsgConsumerFactory) + { + this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); + this.jT808ReplyMessageHandler = jT808ReplyMessageHandler; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(jT808ReplyMessageHandler.Processor); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs index ca78dc7..5622a64 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs @@ -14,7 +14,7 @@ namespace JT808.Gateway.SessionNotice /// /// /// - public static IJT808ClientBuilder AddJT808InPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddInPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) { jT808ClientBuilder.JT808Builder.Services.AddSingleton(); jT808ClientBuilder.JT808Builder.Services.AddHostedService(); @@ -27,7 +27,7 @@ namespace JT808.Gateway.SessionNotice /// 自定义会话通知服务 /// /// - public static IJT808ClientBuilder AddJT808InPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddInPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) where TSessionNoticeService : JT808SessionNoticeService { jT808ClientBuilder.JT808Builder.Services.AddSingleton(); @@ -41,7 +41,7 @@ namespace JT808.Gateway.SessionNotice /// 自定义会话通知服务 /// /// - public static IJT808GatewayBuilder AddJT808InMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder) + public static IJT808GatewayBuilder AddInMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder) where TSessionNoticeService : JT808SessionNoticeService { jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); @@ -54,11 +54,11 @@ namespace JT808.Gateway.SessionNotice /// /// /// - //public static IJT808GatewayBuilder AddJT808InMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder) - //{ - // jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - // jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); - // return jT808GatewayBuilder; - //} + public static IJT808GatewayBuilder AddInMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder) + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/IJT808Traffic.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/IJT808Traffic.cs new file mode 100644 index 0000000..084cc4e --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/IJT808Traffic.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Collections.Concurrent; +using System.Linq; + +namespace JT808.Gateway.Traffic +{ + public interface IJT808Traffic + { + long Get(string key); + long Increment(string terminalNo, string field, int len); + List<(string,long)> GetAll(); + } + + public class JT808TrafficDefault : IJT808Traffic + { + private ConcurrentDictionary dict = new ConcurrentDictionary(); + + public long Get(string key) + { + long value; + dict.TryGetValue(key, out value); + return value; + } + + public List<(string, long)> GetAll() + { + return dict.Select(s => (s.Key, s.Value)).ToList(); + } + + public long Increment(string terminalNo, string field, int len) + { + return dict.AddOrUpdate($"{terminalNo}_{field}", len, (id, count) => count + len); + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj index 4d74b28..9d2d525 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj +++ b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj @@ -20,10 +20,6 @@ 基于JT808设备流量统计服务 LICENSE - - - - diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficService.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficService.cs deleted file mode 100644 index c13c38f..0000000 --- a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficService.cs +++ /dev/null @@ -1,32 +0,0 @@ -using Microsoft.Extensions.Configuration; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT808.Gateway.Traffic -{ - public class JT808TrafficService:IDisposable - { - private readonly CSRedis.CSRedisClient redisClien; - public JT808TrafficService(IConfiguration configuration) - { - redisClien = new CSRedis.CSRedisClient(configuration.GetConnectionString("TrafficRedisHost")); - TrafficRedisClient.Initialization(redisClien); - } - - public void Dispose() - { - redisClien.Dispose(); - } - - /// - /// 按设备每天统计sim卡流量 - /// - /// - /// - public void Processor(string terminalNo,int len) - { - TrafficRedisClient.HIncrBy(terminalNo, DateTime.Now.ToString("yyyyMMdd"), len); - } - } -} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs index d292a4a..26cf199 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs @@ -13,9 +13,10 @@ namespace JT808.Gateway.Traffic /// /// /// - public static IJT808ClientBuilder AddJT808InPlugTraffic(this IJT808ClientBuilder jT808ClientBuilder) + public static IJT808ClientBuilder AddInPlugTraffic(this IJT808ClientBuilder jT808ClientBuilder) + where TIJT808Traffic:IJT808Traffic { - jT808ClientBuilder.JT808Builder.Services.AddSingleton(); + jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); jT808ClientBuilder.JT808Builder.Services.AddHostedService(); return jT808ClientBuilder; } @@ -25,11 +26,23 @@ namespace JT808.Gateway.Traffic /// /// /// - //public static IJT808GatewayBuilder AddJT808InMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder) - //{ - // jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - // jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); - // return jT808GatewayBuilder; - //} + public static IJT808GatewayBuilder AddInMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder) + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(JT808TrafficDefault)); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } + /// + /// 消息流量统计服务(消费者单实例) + /// + /// + /// + /// + public static IJT808GatewayBuilder AddInMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder) + { + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs index 8c7d0b5..d4032f4 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs @@ -3,20 +3,21 @@ using Microsoft.Extensions.Hosting; using System.Threading; using JT808.Protocol.Extensions; using JT808.Gateway.Abstractions; +using System; namespace JT808.Gateway.Traffic { public class JT808TrafficServiceHostedService : IHostedService { private readonly IJT808MsgConsumer jT808MsgConsumer; - private readonly JT808TrafficService jT808TrafficService; + private readonly IJT808Traffic jT808Traffic; public JT808TrafficServiceHostedService( - JT808TrafficService jT808TrafficService, + IJT808Traffic jT808Traffic, IJT808MsgConsumer jT808MsgConsumer) { this.jT808MsgConsumer = jT808MsgConsumer; - this.jT808TrafficService = jT808TrafficService; + this.jT808Traffic = jT808Traffic; } public Task StartAsync(CancellationToken cancellationToken) @@ -24,7 +25,7 @@ namespace JT808.Gateway.Traffic jT808MsgConsumer.Subscribe(); jT808MsgConsumer.OnMessage((item)=> { //string str = item.Data.ToHexString(); - jT808TrafficService.Processor(item.TerminalNo, item.Data.Length); + jT808Traffic.Increment(item.TerminalNo,DateTime.Now.ToString("yyyyMMdd"), item.Data.Length); }); return Task.CompletedTask; } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceInMemoryHostedService.cs new file mode 100644 index 0000000..bc64a69 --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceInMemoryHostedService.cs @@ -0,0 +1,40 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Protocol.Extensions; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using System; + +namespace JT808.Gateway.Traffic +{ + public class JT808TrafficServiceInMemoryHostedService : IHostedService + { + private readonly IJT808MsgConsumer jT808MsgConsumer; + private readonly IJT808Traffic jT808Traffic; + + public JT808TrafficServiceInMemoryHostedService( + IJT808Traffic jT808Traffic, + IJT808MsgConsumerFactory jT808MsgConsumerFactory) + { + this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TrafficConsumer); + this.jT808Traffic = jT808Traffic; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage((item)=> { + //string str = item.Data.ToHexString(); + jT808Traffic.Increment(item.TerminalNo, DateTime.Now.ToString("yyyyMMdd"), item.Data.Length); + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/TrafficRedisClient.cs b/src/JT808.Gateway.Services/JT808.Gateway.Traffic/TrafficRedisClient.cs deleted file mode 100644 index 872e675..0000000 --- a/src/JT808.Gateway.Services/JT808.Gateway.Traffic/TrafficRedisClient.cs +++ /dev/null @@ -1,9 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT808.Gateway.Traffic -{ - class TrafficRedisClient: RedisHelper - { } -} diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs index b6e497d..22507a8 100644 --- a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs +++ b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs @@ -17,7 +17,7 @@ namespace JT808.Gateway.Transmit /// /// /// - public static IJT808ClientBuilder AddJT808InPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration) + public static IJT808ClientBuilder AddInPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration) { jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("RemoteServerOptions")); jT808ClientBuilder.JT808Builder.Services.AddSingleton(); @@ -30,12 +30,12 @@ namespace JT808.Gateway.Transmit /// /// /// - //public static IJT808GatewayBuilder AddJT808InMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) - //{ - // jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("RemoteServerOptions")); - // jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); - // jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); - // return jT808GatewayBuilder; - //} + public static IJT808GatewayBuilder AddInMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("RemoteServerOptions")); + jT808GatewayBuilder.JT808Builder.Services.AddSingleton(); + jT808GatewayBuilder.JT808Builder.Services.AddHostedService(); + return jT808GatewayBuilder; + } } } diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitInMemoryHostedService.cs b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitInMemoryHostedService.cs new file mode 100644 index 0000000..51547bb --- /dev/null +++ b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitInMemoryHostedService.cs @@ -0,0 +1,34 @@ +using System.Threading.Tasks; +using Microsoft.Extensions.Hosting; +using System.Threading; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; + +namespace JT808.Gateway.Transmit +{ + public class JT808TransmitInMemoryHostedService : IHostedService + { + private readonly JT808TransmitService jT808TransmitService; + private readonly IJT808MsgConsumer jT808MsgConsumer; + public JT808TransmitInMemoryHostedService( + IJT808MsgConsumerFactory jT808MsgConsumerFactory, + JT808TransmitService jT808TransmitService) + { + this.jT808TransmitService = jT808TransmitService; + this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TransmitConsumer); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(jT808TransmitService.Send); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj index 0c9eaf0..2d4b6b2 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj @@ -7,6 +7,8 @@ + + @@ -15,6 +17,7 @@ + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808MsgProducerTest.cs b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808MsgProducerTest.cs new file mode 100644 index 0000000..549b37f --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808MsgProducerTest.cs @@ -0,0 +1,124 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Internal; +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; + +namespace JT808.Gateway.InMemoryMQ.Test +{ + public class JT808MsgProducerTest + { + [Fact] + public void Test1() + { + IServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer | JT808ConsumerType.ReplyMessageConsumer); + IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); + IJT808MsgProducer producer = serviceProvider.GetRequiredService(); + producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); + IJT808MsgConsumer consumer = serviceProvider.GetRequiredService(); + consumer.OnMessage((item) => { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService(); + var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); + msgIdHandlerConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); + replyMessageConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + } + + [Fact] + public void Test2() + { + IServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer); + IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); + IJT808MsgProducer producer = serviceProvider.GetRequiredService(); + producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); + IJT808MsgConsumer consumer = serviceProvider.GetRequiredService(); + consumer.OnMessage((item) => { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService(); + var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); + msgIdHandlerConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); + replyMessageConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + } + + [Fact] + public void Test3() + { + IServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.All); + IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); + IJT808MsgProducer producer = serviceProvider.GetRequiredService(); + producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); + IJT808MsgConsumer consumer = serviceProvider.GetRequiredService(); + consumer.OnMessage((item) => { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService(); + var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); + msgIdHandlerConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); + replyMessageConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var msgLoggingConsumer = consumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer); + msgLoggingConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var trafficConsumer = consumerFactory.Create(JT808ConsumerType.TrafficConsumer); + trafficConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + var transmitConsumer = consumerFactory.Create(JT808ConsumerType.TransmitConsumer); + transmitConsumer.OnMessage((item) => + { + Assert.Equal("123", item.TerminalNo); + Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); + }); + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808SessionServiceTest.cs b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808SessionServiceTest.cs new file mode 100644 index 0000000..3381c0e --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808SessionServiceTest.cs @@ -0,0 +1,32 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.InMemoryMQ.Services; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Xunit; + +namespace JT808.Gateway.InMemoryMQ.Test.Services +{ + public class JT808SessionServiceTest + { + [Fact] + public void Test1() + { + JT808SessionService jT808SessionService = new JT808SessionService(); + jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456").GetAwaiter().GetResult(); + jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOffline, "123457").GetAwaiter().GetResult(); + jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456,123457").GetAwaiter().GetResult(); + var result1 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); + var result2 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); + var result3 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); + Assert.Equal(JT808GatewayConstants.SessionOnline, result1.Notice); + Assert.Equal("123456", result1.TerminalNo); + Assert.Equal(JT808GatewayConstants.SessionOffline, result2.Notice); + Assert.Equal("123457", result2.TerminalNo); + //转发 + Assert.Equal(JT808GatewayConstants.SessionOnline, result3.Notice); + Assert.Equal("123456,123457", result3.TerminalNo); + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgIdHandler.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgIdHandler.cs new file mode 100644 index 0000000..b50922d --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgIdHandler.cs @@ -0,0 +1,22 @@ +using JT808.Gateway.MsgIdHandler; +using JT808.Protocol.Extensions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.TestHosting.Impl +{ + public class JT808MsgIdHandler : IJT808MsgIdHandler + { + private readonly ILogger Logger; + public JT808MsgIdHandler(ILoggerFactory loggerFactory) + { + Logger = loggerFactory.CreateLogger("JT808MsgIdHandler"); + } + public void Processor((string TerminalNo, byte[] Data) parameter) + { + Logger.LogDebug($"{parameter.TerminalNo}-{parameter.Data.ToHexString()}"); + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgLogging.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgLogging.cs new file mode 100644 index 0000000..77c378e --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgLogging.cs @@ -0,0 +1,22 @@ +using JT808.Gateway.MsgLogging; +using JT808.Protocol.Extensions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.TestHosting.Impl +{ + public class JT808MsgLogging : IJT808MsgLogging + { + private readonly ILogger Logger; + public JT808MsgLogging(ILoggerFactory loggerFactory) + { + Logger = loggerFactory.CreateLogger("JT808MsgLogging"); + } + public void Processor((string TerminalNo, byte[] Data) parameter, JT808MsgLoggingType jT808MsgLoggingType) + { + Logger.LogDebug($"{jT808MsgLoggingType.ToString()}-{parameter.TerminalNo}-{parameter.Data.ToHexString()}"); + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj index 346ce12..85459c8 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj @@ -16,6 +16,11 @@ + + + + + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/TrafficJob.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/TrafficJob.cs new file mode 100644 index 0000000..5e1ee64 --- /dev/null +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/TrafficJob.cs @@ -0,0 +1,49 @@ +using JT808.Gateway.Client; +using JT808.Gateway.Traffic; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.TestHosting.Jobs +{ + public class TrafficJob : IHostedService + { + private readonly IJT808Traffic jT808Traffic; + private readonly ILogger Logger; + public TrafficJob( + ILoggerFactory loggerFactory, + IJT808Traffic jT808Traffic) + { + Logger = loggerFactory.CreateLogger("TrafficJob"); + this.jT808Traffic = jT808Traffic; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Task.Run(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(2 * 1000); + foreach (var item in jT808Traffic.GetAll()) + { + Logger.LogDebug($"{item.Item1}-{item.Item2}"); + } + } + }, cancellationToken); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs index e12423a..7c84796 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs @@ -11,6 +11,13 @@ using JT808.Gateway.Kafka; using JT808.Gateway.InMemoryMQ; using JT808.Gateway.ReplyMessage; using JT808.Gateway.Client; +using JT808.Gateway.SessionNotice; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.MsgIdHandler; +using JT808.Gateway.MsgLogging; +using JT808.Gateway.Traffic; +using JT808.Gateway.Transmit; +using JT808.Gateway.TestHosting.Impl; namespace JT808.Gateway.TestHosting { @@ -48,14 +55,26 @@ namespace JT808.Gateway.TestHosting .AddTcp() .AddUdp() .AddGrpc() - //InMemoryMQ - .AddServerInMemoryMQ() + //InMemoryMQ 按需要加载对应的服务 + //注意:不需要的就不用add进来了 + .AddServerInMemoryMQ(JT808ConsumerType.All) + //方式1 + //.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer| JT808ConsumerType.ReplyMessageConsumer) + //方式2 + //.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer) + .AddInMemoryTraffic() + .AddInMemoryTransmit(hostContext.Configuration) + .AddInMemoryMsgIdHandler() + .AddInMemoryMsgLogging() + .AddInMemorySessionNotice() .AddInMemoryReplyMessage() //kafka插件 //.AddServerKafkaMsgProducer(hostContext.Configuration) //.AddServerKafkaMsgReplyConsumer(hostContext.Configuration) //.AddServerKafkaSessionProducer(hostContext.Configuration) ; + //流量统计 + //services.AddHostedService(); //grpc客户端调用 //services.AddHostedService(); //客户端测试 diff --git a/src/JT808.Gateway.sln b/src/JT808.Gateway.sln index 4c077e8..92845e7 100644 --- a/src/JT808.Gateway.sln +++ b/src/JT808.Gateway.sln @@ -35,7 +35,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.Tests\JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{69C815FE-3C32-473E-99C9-F3C4B3BCFF81}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.InMemoryMQ.Test", "JT808.Gateway.Tests\JT808.Gateway.InMemoryMQ.Test\JT808.Gateway.InMemoryMQ.Test.csproj", "{E103E89D-3069-4AAE-99CE-2AD633AD351E}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.InMemoryMQ.Test", "JT808.Gateway.Tests\JT808.Gateway.InMemoryMQ.Test\JT808.Gateway.InMemoryMQ.Test.csproj", "{E103E89D-3069-4AAE-99CE-2AD633AD351E}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index 2b40548..5dc3827 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -45,7 +45,6 @@ namespace JT808.Gateway IJT808MsgProducer jT808MsgProducer, JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory) { - SessionManager = jT808SessionManager; Logger = loggerFactory.CreateLogger("JT808TcpServer"); Serializer = jT808Config.GetSerializer(); @@ -185,12 +184,17 @@ namespace JT808.Gateway try { contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan; - var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240); - AtomicCounterService.MsgSuccessIncrement(); - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); - if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); - SessionManager.TryLink(package.Header.TerminalPhoneNo, session); - MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); + //过滤掉不是808标准包(14) + //(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1 + if (contentSpan.Length > 14) + { + var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240); + AtomicCounterService.MsgSuccessIncrement(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); + SessionManager.TryLink(package.Header.TerminalPhoneNo, session); + MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); + } } catch (JT808Exception ex) { diff --git a/src/Version.props b/src/Version.props index a502cd4..9d2758c 100644 --- a/src/Version.props +++ b/src/Version.props @@ -1,6 +1,6 @@  2.3.1 - 1.0.0-preview4 + 1.0.0-preview5 \ No newline at end of file