From 91ec904e0dd0c5c407d513c569c549484940d2ea Mon Sep 17 00:00:00 2001 From: "smallchi(Koike)" <564952747@qq.com> Date: Wed, 5 Feb 2020 00:31:34 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9demo=202.=E4=BF=AE=E5=A4=8D?= =?UTF-8?q?=E4=BC=9A=E8=AF=9D=E6=9C=AA=E5=85=B3=E8=81=94sim=E5=8D=A1?= =?UTF-8?q?=E5=8F=B7=203.=E4=BF=AE=E6=94=B9=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 2 +- publish.gateway.bat | 1 - .../JT808.Gateway.SimpleClient.csproj | 11 +-- .../Jobs/CallGrpcClientJob.cs | 67 +++++++++++++++ simples/JT808.Gateway.SimpleClient/Program.cs | 2 + .../JT808NormalReplyMessageHandlerImpl.cs | 86 +++++++++++++++++++ .../Impl/JT808SessionConsumer.cs | 63 ++++++++++++++ .../Impl/JT808SessionProducer.cs | 30 +++++++ .../JT808.Gateway.SimpleServer.csproj | 14 +-- .../Jobs/TrafficJob.cs | 48 +++++++++++ simples/JT808.Gateway.SimpleServer/Program.cs | 28 +++--- .../Services/JT808SessionService.cs | 28 ++++++ .../Session/JT808SessionManager.cs | 4 +- 13 files changed, 359 insertions(+), 25 deletions(-) create mode 100644 simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs create mode 100644 simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs create mode 100644 simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs create mode 100644 simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs create mode 100644 simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs create mode 100644 simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs diff --git a/README.md b/README.md index ad7be02..3ad659a 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,7 @@ [玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md) -[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)![.NET Core](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg) +[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)![.NET Core](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg?branch=master) ## 新网关的优势 diff --git a/publish.gateway.bat b/publish.gateway.bat index 789f888..5b6be19 100644 --- a/publish.gateway.bat +++ b/publish.gateway.bat @@ -1,6 +1,5 @@ dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj -c Release --output nupkgs -dotnet pack .\src\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Client\JT808.Gateway.Client.csproj -c Release --output nupkgs diff --git a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj index 33c35ca..ecf150a 100644 --- a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj +++ b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj @@ -1,4 +1,4 @@ - + Exe @@ -6,11 +6,12 @@ - + + - - - + + + diff --git a/simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs b/simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs new file mode 100644 index 0000000..57fedd4 --- /dev/null +++ b/simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs @@ -0,0 +1,67 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using JT808.Gateway.GrpcService; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +namespace JT808.Gateway.SimpleClient.Jobs +{ + public class CallGrpcClientJob :IHostedService + { + private Channel channel; + private readonly ILogger Logger; + private Grpc.Core.Metadata AuthMetadata; + public CallGrpcClientJob( + ILoggerFactory loggerFactory) + { + Logger = loggerFactory.CreateLogger("CallGrpcClientJob"); + channel = new Channel("localhost:828", + ChannelCredentials.Insecure); + AuthMetadata = new Grpc.Core.Metadata(); + AuthMetadata.Add("token", "smallchi518"); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Task.Run(() => + { + while (!cancellationToken.IsCancellationRequested) + { + JT808Gateway.JT808GatewayClient jT808GatewayClient = new JT808Gateway.JT808GatewayClient(channel); + try + { + var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty(), AuthMetadata); + var result2 = jT808GatewayClient.GetTcpSessionAll(new Empty(), AuthMetadata); + Logger.LogInformation($"[GetTcpAtomicCounter]:{JsonSerializer.Serialize(result1)}"); + Logger.LogInformation($"[GetTcpSessionAll]:{JsonSerializer.Serialize(result2)}"); + } + catch (Exception ex) + { + Logger.LogError(ex, "Call Grpc Error"); + } + try + { + var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty()); + } + catch (RpcException ex) + { + Logger.LogError($"{ex.StatusCode.ToString()}-{ex.Message}"); + } + Thread.Sleep(3000); + } + }, cancellationToken); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + channel.ShutdownAsync(); + return Task.CompletedTask; + } + } +} diff --git a/simples/JT808.Gateway.SimpleClient/Program.cs b/simples/JT808.Gateway.SimpleClient/Program.cs index eba3ce6..69e7d84 100644 --- a/simples/JT808.Gateway.SimpleClient/Program.cs +++ b/simples/JT808.Gateway.SimpleClient/Program.cs @@ -9,6 +9,7 @@ using System.Threading; using Microsoft.Extensions.Hosting; using JT808.Gateway.Client; using JT808.Gateway.SimpleClient.Services; +using JT808.Gateway.SimpleClient.Jobs; namespace JT808.Gateway.SimpleClient { @@ -30,6 +31,7 @@ namespace JT808.Gateway.SimpleClient .AddClient(); services.AddHostedService(); services.AddHostedService(); + services.AddHostedService(); }); await serverHostBuilder.RunConsoleAsync(); } diff --git a/simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs b/simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs new file mode 100644 index 0000000..74efb98 --- /dev/null +++ b/simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs @@ -0,0 +1,86 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.MsgLogging; +using JT808.Gateway.Traffic; +using JT808.Gateway.Transmit; +using JT808.Protocol; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.SimpleServer.Impl +{ + public class JT808NormalReplyMessageHandlerImpl : JT808NormalReplyMessageHandler + { + private readonly ILogger logger; + private readonly IJT808Traffic jT808Traffic; + private readonly IJT808MsgLogging jT808MsgLogging; + private readonly JT808TransmitService jT808TransmitService; + public JT808NormalReplyMessageHandlerImpl( + JT808TransmitService jT808TransmitService, + IJT808MsgLogging jT808MsgLogging, + IJT808Traffic jT808Traffic, + ILoggerFactory loggerFactory, + IJT808Config jT808Config) : base(jT808Config) + { + this.jT808TransmitService = jT808TransmitService; + this.jT808Traffic = jT808Traffic; + this.jT808MsgLogging = jT808MsgLogging; + logger =loggerFactory.CreateLogger("JT808NormalReplyMessageHandlerImpl"); + //添加自定义消息 + HandlerDict.Add(0x9999, Msg0x9999); + } + + /// + /// 重写消息处理器 + /// + /// + /// + public override byte[] Processor(JT808HeaderPackage request, IJT808Session session) + { + //AOP 可以自定义添加一些东西:上下行日志、数据转发 + logger.LogDebug("可以自定义添加一些东西:上下行日志、数据转发"); + //流量 + jT808Traffic.Increment(request.Header.TerminalPhoneNo, DateTime.Now.ToString("yyyyMMdd"), request.OriginalData.Length); + var parameter = (request.Header.TerminalPhoneNo, request.OriginalData.ToArray()); + //转发数据(可同步也可以使用队列进行异步) + try + { + jT808TransmitService.Send(parameter); + } + catch (Exception ex) + { + logger.LogError(ex,""); + } + //上行日志(可同步也可以使用队列进行异步) + jT808MsgLogging.Processor(parameter, JT808MsgLoggingType.up); + //处理上行消息 + var down= base.Processor(request, session); + //下行日志(可同步也可以使用队列进行异步) + jT808MsgLogging.Processor((request.Header.TerminalPhoneNo, down), JT808MsgLoggingType.down); + return down; + } + + /// + /// 重写自带的消息 + /// + /// + /// + public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session) + { + logger.LogDebug("重写自带Msg0x0200的消息"); + return base.Msg0x0200(request, session); + } + + /// + /// 自定义消息 + /// + /// + /// + public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session) + { + logger.LogDebug("自定义消息"); + return default; + } + } +} diff --git a/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs b/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs new file mode 100644 index 0000000..3c73ba5 --- /dev/null +++ b/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs @@ -0,0 +1,63 @@ +using JT808.Gateway.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT808.Gateway.SimpleServer.Services; + +namespace JT808.Gateway.SimpleServer.Impl +{ + public class JT808SessionConsumer : IJT808SessionConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly ILogger logger; + + public string TopicName { get; } = JT808GatewayConstants.SessionTopic; + + private readonly JT808SessionService JT808SessionService; + public JT808SessionConsumer( + JT808SessionService jT808SessionService, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + JT808SessionService = jT808SessionService; + } + + public void OnMessage(Action<(string Notice, string TerminalNo)> callback) + { + Task.Run(async () => + { + while (!Cts.IsCancellationRequested) + { + try + { + var item = await JT808SessionService.ReadAsync(Cts.Token); + callback(item); + } + catch (Exception ex) + { + logger.LogError(ex, ""); + } + } + }, Cts.Token); + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + public void Dispose() + { + Cts.Dispose(); + } + + public void Subscribe() + { + + } + } +} diff --git a/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs b/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs new file mode 100644 index 0000000..4c3064a --- /dev/null +++ b/simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs @@ -0,0 +1,30 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.SimpleServer.Services; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.SimpleServer.Impl +{ + public class JT808SessionProducer : IJT808SessionProducer + { + public string TopicName { get; } = JT808GatewayConstants.SessionTopic; + + private readonly JT808SessionService JT808SessionService; + + public JT808SessionProducer(JT808SessionService jT808SessionService) + { + JT808SessionService = jT808SessionService; + } + + public async ValueTask ProduceAsync(string notice,string terminalNo) + { + await JT808SessionService.WriteAsync(notice, terminalNo); + } + + public void Dispose() + { + } + } +} diff --git a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj index 45e6730..8d8f2e9 100644 --- a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj +++ b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj @@ -5,13 +5,13 @@ netcoreapp3.1 - - - - - - - + + + + + + + diff --git a/simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs b/simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs new file mode 100644 index 0000000..bd542c3 --- /dev/null +++ b/simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs @@ -0,0 +1,48 @@ +using JT808.Gateway.Traffic; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.SimpleServer.Jobs +{ + public class TrafficJob : IHostedService + { + private readonly IJT808Traffic jT808Traffic; + private readonly ILogger Logger; + public TrafficJob( + ILoggerFactory loggerFactory, + IJT808Traffic jT808Traffic) + { + Logger = loggerFactory.CreateLogger("TrafficJob"); + this.jT808Traffic = jT808Traffic; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Task.Run(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + await Task.Delay(2 * 1000); + foreach (var item in jT808Traffic.GetAll()) + { + Logger.LogDebug($"{item.Item1}-{item.Item2}"); + } + } + }, cancellationToken); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/simples/JT808.Gateway.SimpleServer/Program.cs b/simples/JT808.Gateway.SimpleServer/Program.cs index 5825865..378a25b 100644 --- a/simples/JT808.Gateway.SimpleServer/Program.cs +++ b/simples/JT808.Gateway.SimpleServer/Program.cs @@ -1,5 +1,4 @@ using JT808.Gateway.Abstractions.Enums; -using JT808.Gateway.InMemoryMQ; using JT808.Gateway.ReplyMessage; using JT808.Gateway.MsgLogging; using JT808.Gateway.Traffic; @@ -14,6 +13,10 @@ using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; using JT808.Gateway.SimpleServer.Impl; +using JT808.Gateway.SimpleServer.Services; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Transmit; +using JT808.Gateway.SimpleServer.Jobs; namespace JT808.Gateway.SimpleServer { @@ -36,18 +39,23 @@ namespace JT808.Gateway.SimpleServer { services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + //使用内存队列实现会话通知 + services.AddSingleton(); + services.AddSingleton(); + services.AddSingleton(); services.AddJT808Configure() - .AddGateway(hostContext.Configuration) + .AddNormalGateway(hostContext.Configuration) + .ReplaceNormalReplyMessageHandler() + .AddMsgLogging() + .AddTraffic() + .AddSessionNotice() + .AddTransmit(hostContext.Configuration) .AddTcp() - .AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer| - JT808ConsumerType.ReplyMessageConsumer | - JT808ConsumerType.MsgLoggingConsumer | - JT808ConsumerType.ReplyMessageLoggingConsumer) - .AddInMemoryMsgIdHandler() - .AddInMemoryReplyMessage() - .AddInMemoryMsgLogging() - .AddInMemorySessionNotice() + .AddUdp() + .AddGrpc() .Builder(); + //流量统计 + services.AddHostedService(); }); await serverHostBuilder.RunConsoleAsync(); diff --git a/simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs b/simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs new file mode 100644 index 0000000..57d0230 --- /dev/null +++ b/simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs @@ -0,0 +1,28 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT808.Gateway.SimpleServer.Services +{ + public class JT808SessionService + { + private readonly Channel<(string Notice, string TerminalNo)> _channel; + + public JT808SessionService() + { + _channel = Channel.CreateUnbounded<(string Notice, string TerminalNo)>(); + } + + public async ValueTask WriteAsync(string notice, string terminalNo) + { + await _channel.Writer.WriteAsync((notice, terminalNo)); + } + public async ValueTask<(string Notice, string TerminalNo)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } + } +} diff --git a/src/JT808.Gateway/Session/JT808SessionManager.cs b/src/JT808.Gateway/Session/JT808SessionManager.cs index 3f2d47d..5fc5fe9 100644 --- a/src/JT808.Gateway/Session/JT808SessionManager.cs +++ b/src/JT808.Gateway/Session/JT808SessionManager.cs @@ -84,7 +84,8 @@ namespace JT808.Gateway.Session } else { - if(TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) + session.TerminalPhoneNo = terminalPhoneNo; + if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) { //会话通知 JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); @@ -104,6 +105,7 @@ namespace JT808.Gateway.Session else { JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint); + session.TerminalPhoneNo = terminalPhoneNo; Sessions.TryAdd(session.SessionID, session); TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session); currentSession = session;