From 58bd8e9636a94e15cb0f70197aebb34987111dfc Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Wed, 6 Mar 2019 22:13:33 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E9=A1=B9=E7=9B=AE=E6=96=87?= =?UTF-8?q?=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../{JT809TcpDecoder.cs => JT809Decoder.cs} | 5 +- .../Codecs/JT809Encoder.cs | 27 +++ .../Enums/JT809AtomicCounterType.cs | 12 ++ ...erBase.cs => JT809MainMsgIdHandlerBase.cs} | 25 ++- .../JT809MainServerConnectionHandler.cs | 97 ++++++++++ .../Handlers/JT809MainServerHandler.cs | 82 ++++++++ .../JT809SubordinateConnectionHandler.cs | 98 ++++++++++ .../JT809SubordinateMsgIdHandlerBase.cs | 41 ++++ .../Handlers/JT809SubordinateServerHandler.cs | 79 ++++++++ ...erator.cs => IJT809VerifyCodeGenerator.cs} | 3 +- .../Internal/JT809MainMsgIdDefaultHandler.cs | 22 +++ .../JT809SubordinateMsgIdDefaultHandler.cs | 18 ++ .../JT809VerifyCodeGeneratorDefaultImpl.cs | 23 +++ .../VerifyCodeGeneratorDefaultImpl.cs | 15 -- .../JT809CoreDotnettyExtensions.cs | 25 ++- .../Links/JT809MainClient.cs | 137 ++++++++++++++ .../Links/JT809SubordinateClient.cs | 176 ++++++++++++++++++ .../Links/SubordinateLinkClient.cs | 113 ----------- .../Metadata/JT809Response.cs | 7 + .../{JT809TcpSession.cs => JT809Session.cs} | 4 +- ...ervice.cs => JT809AtomicCounterService.cs} | 15 +- .../JT809AtomicCounterServiceFactory.cs | 31 +++ .../Services/JT809MainServerHost.cs | 98 ++++++++++ ...ssionManager.cs => JT809SessionManager.cs} | 33 ++-- .../Handlers/JT809MsgIdDefaultTcpHandler.cs | 8 +- .../Handlers/JT809TcpConnectionHandler.cs | 4 +- .../Handlers/JT809TcpServerHandler.cs | 17 +- .../JT809TcpDotnettyExtensions.cs | 7 +- src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs | 2 +- .../JT809.DotNetty.Host.Test/Program.cs | 2 +- 30 files changed, 1043 insertions(+), 183 deletions(-) rename src/JT809.DotNetty.Core/Codecs/{JT809TcpDecoder.cs => JT809Decoder.cs} (84%) create mode 100644 src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs create mode 100644 src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs rename src/JT809.DotNetty.Core/Handlers/{JT809MsgIdTcpHandlerBase.cs => JT809MainMsgIdHandlerBase.cs} (80%) create mode 100644 src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs create mode 100644 src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs create mode 100644 src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs create mode 100644 src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs create mode 100644 src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs rename src/JT809.DotNetty.Core/Interfaces/{IVerifyCodeGenerator.cs => IJT809VerifyCodeGenerator.cs} (75%) create mode 100644 src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs create mode 100644 src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs create mode 100644 src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs delete mode 100644 src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs create mode 100644 src/JT809.DotNetty.Core/Links/JT809MainClient.cs create mode 100644 src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs delete mode 100644 src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs rename src/JT809.DotNetty.Core/Metadata/{JT809TcpSession.cs => JT809Session.cs} (85%) rename src/JT809.DotNetty.Core/Services/{JT809TcpAtomicCounterService.cs => JT809AtomicCounterService.cs} (64%) create mode 100644 src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs create mode 100644 src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs rename src/JT809.DotNetty.Core/Session/{JT809TcpSessionManager.cs => JT809SessionManager.cs} (73%) diff --git a/src/JT809.DotNetty.Core/Codecs/JT809TcpDecoder.cs b/src/JT809.DotNetty.Core/Codecs/JT809Decoder.cs similarity index 84% rename from src/JT809.DotNetty.Core/Codecs/JT809TcpDecoder.cs rename to src/JT809.DotNetty.Core/Codecs/JT809Decoder.cs index 35ccdab..ce19d5e 100644 --- a/src/JT809.DotNetty.Core/Codecs/JT809TcpDecoder.cs +++ b/src/JT809.DotNetty.Core/Codecs/JT809Decoder.cs @@ -6,7 +6,10 @@ using JT809.Protocol; namespace JT809.DotNetty.Core.Codecs { - public class JT809TcpDecoder : ByteToMessageDecoder + /// + /// JT809解码 + /// + public class JT809Decoder : ByteToMessageDecoder { protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) { diff --git a/src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs b/src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs new file mode 100644 index 0000000..87384f2 --- /dev/null +++ b/src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs @@ -0,0 +1,27 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using System.Collections.Generic; +using DotNetty.Transport.Channels; +using JT809.Protocol; +using JT809.DotNetty.Core.Metadata; + +namespace JT809.DotNetty.Core.Codecs +{ + /// + /// JT809编码 + /// + public class JT809Encoder : MessageToByteEncoder + { + protected override void Encode(IChannelHandlerContext context, JT809Response message, IByteBuffer output) + { + if (message.Package != null) { + var sendData = JT809Serializer.Serialize(message.Package, message.MinBufferSize); + output.WriteBytes(sendData); + } + else if (message.HexData != null) + { + output.WriteBytes(message.HexData); + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs b/src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs new file mode 100644 index 0000000..eabd0be --- /dev/null +++ b/src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs @@ -0,0 +1,12 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.DotNetty.Core.Enums +{ + public enum JT809AtomicCounterType + { + ClientSubordinate=1, + ServerMain=2 + } +} diff --git a/src/JT809.DotNetty.Core/Handlers/JT809MsgIdTcpHandlerBase.cs b/src/JT809.DotNetty.Core/Handlers/JT809MainMsgIdHandlerBase.cs similarity index 80% rename from src/JT809.DotNetty.Core/Handlers/JT809MsgIdTcpHandlerBase.cs rename to src/JT809.DotNetty.Core/Handlers/JT809MainMsgIdHandlerBase.cs index 25fe923..3ae13f9 100644 --- a/src/JT809.DotNetty.Core/Handlers/JT809MsgIdTcpHandlerBase.cs +++ b/src/JT809.DotNetty.Core/Handlers/JT809MainMsgIdHandlerBase.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Generic; using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Links; using JT809.DotNetty.Core.Metadata; using JT809.Protocol.Enums; using JT809.Protocol.Extensions; @@ -9,26 +10,29 @@ using JT809.Protocol.MessageBody; namespace JT809.DotNetty.Core.Handlers { /// - /// 基于Tcp模式抽象消息处理业务 + /// 抽象消息处理业务 /// 自定义消息处理业务 /// 注意: /// 1.ConfigureServices: /// services.Replace(new ServiceDescriptor(typeof(JT809MsgIdTcpHandlerBase),typeof(JT809MsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton)); /// 2.解析具体的消息体,具体消息调用具体的JT809Serializer.Deserialize /// - public abstract class JT809MsgIdTcpHandlerBase + public abstract class JT809MainMsgIdHandlerBase { - protected JT809TcpSessionManager sessionManager { get; } - protected IVerifyCodeGenerator verifyCodeGenerator { get; } + protected JT809SessionManager sessionManager { get; } + protected IJT809VerifyCodeGenerator verifyCodeGenerator { get; } + protected JT809SubordinateClient subordinateLinkClient { get; } /// /// 初始化消息处理业务 /// - protected JT809MsgIdTcpHandlerBase( - IVerifyCodeGenerator verifyCodeGenerator, - JT809TcpSessionManager sessionManager) + protected JT809MainMsgIdHandlerBase( + IJT809VerifyCodeGenerator verifyCodeGenerator, + JT809SubordinateClient subordinateLinkClient, + JT809SessionManager sessionManager) { this.sessionManager = sessionManager; this.verifyCodeGenerator = verifyCodeGenerator; + this.subordinateLinkClient = subordinateLinkClient; HandlerDict = new Dictionary> { {JT809BusinessType.主链路登录请求消息, Msg0x1001}, @@ -42,6 +46,7 @@ namespace JT809.DotNetty.Core.Handlers //}; } + public Dictionary> HandlerDict { get; protected set; } //public Dictionary> SubHandlerDict { get; protected set; } @@ -53,12 +58,14 @@ namespace JT809.DotNetty.Core.Handlers /// public virtual JT809Response Msg0x1001(JT809Request request) { + var verifyCode = verifyCodeGenerator.Create(); var package= JT809BusinessType.主链路登录应答消息.Create(new JT809_0x1002() { Result= JT809_0x1002_Result.成功, - VerifyCode= verifyCodeGenerator.Create() + VerifyCode= verifyCode }); - + var jT809_0x1001 = request.Package.Bodies as JT809_0x1001; + subordinateLinkClient.ConnectAsync(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort, verifyCode); return new JT809Response(package,100); } diff --git a/src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs new file mode 100644 index 0000000..4c45196 --- /dev/null +++ b/src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs @@ -0,0 +1,97 @@ +using DotNetty.Buffers; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT809.DotNetty.Core.Metadata; +using JT809.Protocol; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Handlers +{ + /// + /// JT809主链路服务端连接处理器 + /// + public class JT809MainServerConnectionHandler : ChannelHandlerAdapter + { + + private readonly ILogger logger; + + public JT809MainServerConnectionHandler( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + /// + /// 通道激活 + /// + /// + public override void ChannelActive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } Successful client connection to server."); + base.ChannelActive(context); + } + + /// + /// 客户端主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($">>>{ channelId } The client disconnects from the server."); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); + return base.CloseAsync(context); + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + + /// + /// 超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + if (idleStateEvent.State == IdleState.WriterIdle) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>Heartbeat-{channelId}"); + //发送主链路保持请求数据包 + var package = JT809BusinessType.主链路连接保持请求消息.Create(); + JT809Response jT809Response = new JT809Response(package, 100); + context.WriteAndFlushAsync(jT809Response); + } + } + base.UserEventTriggered(context, evt); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogError(exception, $"{channelId} {exception.Message}"); + context.CloseAsync(); + } + } +} diff --git a/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs new file mode 100644 index 0000000..a71526a --- /dev/null +++ b/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs @@ -0,0 +1,82 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT809.Protocol; +using System; +using Microsoft.Extensions.Logging; +using JT809.Protocol.Exceptions; +using JT809.DotNetty.Core.Services; +using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Enums; + +namespace JT809.DotNetty.Core.Handlers +{ + /// + /// JT809主链路服务端处理程序 + /// + internal class JT809MainServerHandler : SimpleChannelInboundHandler + { + private readonly JT809MainMsgIdHandlerBase handler; + + private readonly JT809SessionManager jT809SessionManager; + + private readonly JT809AtomicCounterService jT809AtomicCounterService; + + private readonly ILogger logger; + + public JT809MainServerHandler( + ILoggerFactory loggerFactory, + JT809MainMsgIdHandlerBase handler, + JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty, + JT809SessionManager jT809SessionManager + ) + { + this.handler = handler; + this.jT809SessionManager = jT809SessionManager; + this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ServerMain.ToString()); ; + logger = loggerFactory.CreateLogger(); + } + + + protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) + { + try + { + JT809Package jT809Package = JT809Serializer.Deserialize(msg); + jT809AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString()); + } + jT809SessionManager.TryAdd(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID); + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc)) + { + JT809Response jT808Response = handlerFunc(new JT809Request(jT809Package, msg)); + if (jT808Response != null) + { + var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize); + ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData)); + } + } + } + catch (JT809Exception ex) + { + jT809AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + catch (Exception ex) + { + jT809AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs new file mode 100644 index 0000000..50932e1 --- /dev/null +++ b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs @@ -0,0 +1,98 @@ +using DotNetty.Buffers; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT809.DotNetty.Core.Metadata; +using JT809.Protocol; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Handlers +{ + /// + /// JT809从链路连接处理器 + /// + public class JT809SubordinateConnectionHandler: ChannelHandlerAdapter + { + + private readonly ILogger logger; + + public JT809SubordinateConnectionHandler( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + /// + /// 通道激活 + /// + /// + public override void ChannelActive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } Successful client connection to server."); + base.ChannelActive(context); + } + + /// + /// 客户端主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($">>>{ channelId } The client disconnects from the server."); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); + return base.CloseAsync(context); + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + + /// + /// 超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + if (idleStateEvent.State == IdleState.WriterIdle) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>Heartbeat-{channelId}"); + //发送从链路保持请求数据包 + var package = JT809BusinessType.从链路连接保持请求消息.Create(); + JT809Response jT809Response = new JT809Response(package, 100); + context.WriteAndFlushAsync(jT809Response); + //context.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT809Serializer.Serialize(package,100))); + } + } + base.UserEventTriggered(context, evt); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogError(exception, $"{channelId} {exception.Message}"); + context.CloseAsync(); + } + } +} diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs new file mode 100644 index 0000000..dd1e1c4 --- /dev/null +++ b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs @@ -0,0 +1,41 @@ +using System; +using System.Collections.Generic; +using JT809.DotNetty.Core.Metadata; +using JT809.Protocol.Enums; + +namespace JT809.DotNetty.Core.Handlers +{ + /// + /// 抽象从链路消息处理业务 + /// 自定义消息处理业务 + /// 注意: + /// 1.ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(JT809SubordinateMsgIdTcpHandlerBase),typeof(JT809SubordinateMsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton)); + /// 2.解析具体的消息体,具体消息调用具体的JT809Serializer.Deserialize + /// + public abstract class JT809SubordinateMsgIdHandlerBase + { + /// + /// 初始化消息处理业务 + /// + protected JT809SubordinateMsgIdHandlerBase() + { + HandlerDict = new Dictionary> + { + {JT809BusinessType.从链路注销应答消息, Msg0x9004}, + }; + } + + public Dictionary> HandlerDict { get; protected set; } + + /// + /// 从链路注销应答消息 + /// + /// + /// + public virtual JT809Response Msg0x9004(JT809Request request) + { + return null; + } + } +} diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs new file mode 100644 index 0000000..10cc3ae --- /dev/null +++ b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs @@ -0,0 +1,79 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT809.Protocol; +using System; +using Microsoft.Extensions.Logging; +using JT809.Protocol.Exceptions; +using JT809.DotNetty.Core.Services; +using JT809.DotNetty.Core; +using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Internal; +using JT809.DotNetty.Core.Enums; + +namespace JT809.DotNetty.Core.Handlers +{ + /// + /// JT809从链路业务处理程序 + /// + internal class JT809SubordinateServerHandler : SimpleChannelInboundHandler + { + private readonly JT809SubordinateMsgIdHandlerBase handler; + + private readonly JT809AtomicCounterService jT809AtomicCounterService; + + private readonly ILogger logger; + + public JT809SubordinateServerHandler( + ILoggerFactory loggerFactory, + JT809SubordinateMsgIdHandlerBase handler, + JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty + ) + { + this.handler = handler; + this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ClientSubordinate.ToString()); + logger = loggerFactory.CreateLogger(); + } + + + protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) + { + try + { + JT809Package jT809Package = JT809Serializer.Deserialize(msg); + jT809AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString()); + } + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc)) + { + JT809Response jT808Response = handlerFunc(new JT809Request(jT809Package, msg)); + if (jT808Response != null) + { + ctx.WriteAndFlushAsync(jT808Response); + } + } + } + catch (JT809Exception ex) + { + jT809AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + catch (Exception ex) + { + jT809AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Interfaces/IVerifyCodeGenerator.cs b/src/JT809.DotNetty.Core/Interfaces/IJT809VerifyCodeGenerator.cs similarity index 75% rename from src/JT809.DotNetty.Core/Interfaces/IVerifyCodeGenerator.cs rename to src/JT809.DotNetty.Core/Interfaces/IJT809VerifyCodeGenerator.cs index 2584683..c546532 100644 --- a/src/JT809.DotNetty.Core/Interfaces/IVerifyCodeGenerator.cs +++ b/src/JT809.DotNetty.Core/Interfaces/IJT809VerifyCodeGenerator.cs @@ -7,8 +7,9 @@ namespace JT809.DotNetty.Core.Interfaces /// /// 校验码生成器 /// - public interface IVerifyCodeGenerator + public interface IJT809VerifyCodeGenerator { uint Create(); + uint Get(); } } diff --git a/src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs b/src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs new file mode 100644 index 0000000..2b33b3d --- /dev/null +++ b/src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs @@ -0,0 +1,22 @@ +using JT809.DotNetty.Core; +using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Links; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.DotNetty.Core.Internal +{ + /// + /// 默认主链路服务端消息处理业务实现 + /// + internal class JT809MainMsgIdDefaultHandler : JT809MainMsgIdHandlerBase + { + public JT809MainMsgIdDefaultHandler(IJT809VerifyCodeGenerator verifyCodeGenerator, + JT809SubordinateClient subordinateLinkClient, JT809SessionManager sessionManager) + : base(verifyCodeGenerator, subordinateLinkClient, sessionManager) + { + } + } +} diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs new file mode 100644 index 0000000..f554b69 --- /dev/null +++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs @@ -0,0 +1,18 @@ +using JT809.DotNetty.Core; +using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Links; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.DotNetty.Core.Internal +{ + /// + /// 默认从链路客户端消息处理业务实现 + /// + internal class JT809SubordinateMsgIdDefaultHandler : JT809SubordinateMsgIdHandlerBase + { + + } +} diff --git a/src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs b/src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs new file mode 100644 index 0000000..b3e048a --- /dev/null +++ b/src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs @@ -0,0 +1,23 @@ +using JT809.DotNetty.Core.Interfaces; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.DotNetty.Core.Internal +{ + internal class JT809VerifyCodeGeneratorDefaultImpl : IJT809VerifyCodeGenerator + { + private uint VerifyCode; + + public uint Create() + { + VerifyCode= (uint)Guid.NewGuid().GetHashCode(); + return VerifyCode; + } + + public uint Get() + { + return VerifyCode; + } + } +} diff --git a/src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs b/src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs deleted file mode 100644 index 8089fdc..0000000 --- a/src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs +++ /dev/null @@ -1,15 +0,0 @@ -using JT809.DotNetty.Core.Interfaces; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT809.DotNetty.Core.Internal -{ - internal class VerifyCodeGeneratorDefaultImpl : IVerifyCodeGenerator - { - public uint Create() - { - return (uint)Guid.NewGuid().GetHashCode(); - } - } -} diff --git a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs index 050118d..d9b5436 100644 --- a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs +++ b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs @@ -1,8 +1,13 @@ using JT809.DotNetty.Abstractions; +using JT809.DotNetty.Core.Codecs; using JT809.DotNetty.Core.Configurations; using JT809.DotNetty.Core.Converters; +using JT809.DotNetty.Core.Enums; +using JT809.DotNetty.Core.Handlers; using JT809.DotNetty.Core.Interfaces; using JT809.DotNetty.Core.Internal; +using JT809.DotNetty.Core.Links; +using JT809.DotNetty.Core.Metadata; using JT809.DotNetty.Core.Services; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -50,9 +55,25 @@ namespace JT809.DotNetty.Core }); } serviceDescriptors.Configure(configuration.GetSection("JT809Configuration")); - serviceDescriptors.TryAddSingleton(); - serviceDescriptors.TryAddSingleton(); serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + //JT809编解码器 + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + //主从链路连接处理器 + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + //从链路客户端 + serviceDescriptors.TryAddSingleton(); + //主从链路消息默认业务处理器实现 + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + //主从链路消息接收处理器 + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + //主链路服务端 + serviceDescriptors.AddHostedService(); return serviceDescriptors; } } diff --git a/src/JT809.DotNetty.Core/Links/JT809MainClient.cs b/src/JT809.DotNetty.Core/Links/JT809MainClient.cs new file mode 100644 index 0000000..af3052d --- /dev/null +++ b/src/JT809.DotNetty.Core/Links/JT809MainClient.cs @@ -0,0 +1,137 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT809.DotNetty.Core.Codecs; +using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Metadata; +using JT809.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT809.DotNetty.Core.Links +{ + /// + /// 主链路客户端 + /// 针对企业与监管平台之间 + /// + public sealed class JT809MainClient : IDisposable + { + private Bootstrap bootstrap; + + private MultithreadEventLoopGroup group; + + private IChannel channel; + + private readonly ILogger logger; + + private readonly ILoggerFactory loggerFactory; + + private readonly IServiceProvider serviceProvider; + + private bool disposed = false; + + public JT809MainClient( + IServiceProvider provider, + ILoggerFactory loggerFactory) + { + this.serviceProvider = provider; + this.loggerFactory = loggerFactory; + this.logger = loggerFactory.CreateLogger(); + group = new MultithreadEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.Group(group) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + //下级平台1分钟发送心跳 + //上级平台是3分钟没有发送就断开连接 + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast("jt809MainLinkTcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), + Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); + pipeline.AddLast("jt809MainLinkSystemIdleState", new IdleStateHandler(180, 60, 200)); + pipeline.AddLast("jt809MainLinkTcpEncode", scope.ServiceProvider.GetRequiredService()); + pipeline.AddLast("jt809MainLinkTcpDecode", scope.ServiceProvider.GetRequiredService()); + pipeline.AddLast("jt809MainLinkConnection", scope.ServiceProvider.GetRequiredService()); + + } + })); + } + + public async void ConnectAsync(string ip,int port) + { + logger.LogInformation($"ip:{ip},port:{port}"); + try + { + if (channel == null) + { + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + } + else + { + await channel.CloseAsync(); + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + } + } + catch (AggregateException ex) + { + logger.LogError(ex.InnerException, $"ip:{ip},port:{port}"); + } + catch (Exception ex) + { + logger.LogError(ex, $"ip:{ip},port:{port}"); + } + } + + public async void SendAsync(JT809Response jT809Response) + { + if (channel == null) throw new NullReferenceException("Channel Not Open"); + if (jT809Response == null) throw new ArgumentNullException("Data is null"); + if (channel.Open && channel.Active) + { + await channel.WriteAndFlushAsync(jT809Response); + } + } + + private void Dispose(bool disposing) + { + if (disposed) + { + return; + } + if (disposing) + { + //清理托管资源 + channel.CloseAsync(); + group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3)); + } + //让类型知道自己已经被释放 + disposed = true; + } + + ~JT809MainClient() + { + //必须为false + //这表明,隐式清理时,只要处理非托管资源就可以了。 + Dispose(false); + } + + public void Dispose() + { + //必须为true + Dispose(true); + //通知垃圾回收机制不再调用终结器(析构器) + GC.SuppressFinalize(this); + } + } +} diff --git a/src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs b/src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs new file mode 100644 index 0000000..d981399 --- /dev/null +++ b/src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs @@ -0,0 +1,176 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT809.DotNetty.Core.Codecs; +using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Metadata; +using JT809.Protocol; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using JT809.Protocol.MessageBody; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Links +{ + /// + /// 从链路客户端 + /// 针对企业与企业之间 + /// + public sealed class JT809SubordinateClient:IDisposable + { + private Bootstrap bootstrap; + + private MultithreadEventLoopGroup group; + + private IChannel channel; + + private readonly ILogger logger; + + private readonly ILoggerFactory loggerFactory; + + private readonly IServiceProvider serviceProvider; + + private readonly IJT809VerifyCodeGenerator verifyCodeGenerator; + + private bool disposed = false; + + public JT809SubordinateClient( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IJT809VerifyCodeGenerator verifyCodeGenerator) + { + this.serviceProvider = provider; + this.loggerFactory = loggerFactory; + this.verifyCodeGenerator = verifyCodeGenerator; + this.logger = loggerFactory.CreateLogger(); + group = new MultithreadEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.Group(group) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + //下级平台1分钟发送心跳 + //上级平台是3分钟没有发送就断开连接 + + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast("jt809SubLinkTcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), + Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); + pipeline.AddLast("jt809SubLinkSystemIdleState", new IdleStateHandler(180, 10, 200)); + pipeline.AddLast("jt809SubLinkTcpEncode", scope.ServiceProvider.GetRequiredService()); + pipeline.AddLast("jt809SubLinkTcpDecode", scope.ServiceProvider.GetRequiredService()); + pipeline.AddLast("jt809SubLinkConnection", scope.ServiceProvider.GetRequiredService()); + + } + })); + } + + public async void ConnectAsync(string ip,int port,uint verifyCode,int delay=3000) + { + logger.LogInformation($"ip:{ip},port:{port},verifycode:{verifyCode}"); + await Task.Delay(delay); + try + { + if (channel == null) + { + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + } + else + { + await channel.CloseAsync(); + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + } + } + catch (AggregateException ex) + { + logger.LogError(ex.InnerException, $"ip:{ip},port:{port},verifycode:{verifyCode}"); + } + catch (Exception ex) + { + logger.LogError(ex, $"ip:{ip},port:{port},verifycode:{verifyCode}"); + } + } + + public async void SendAsync(JT809Response jT809Response) + { + if (channel == null) throw new NullReferenceException("Channel Not Open"); + if (jT809Response == null) throw new ArgumentNullException("Data is null"); + if (channel.Open && channel.Active) + { + await channel.WriteAndFlushAsync(jT809Response); + } + } + + public bool IsOpen + { + get + { + if (channel == null) return false; + return channel.Open && channel.Active; + } + } + + private void Dispose(bool disposing) + { + if (disposed) + { + return; + } + if (disposing) + { + try + { + //发送从链路注销请求 + var package = JT809BusinessType.从链路注销请求消息.Create(new JT809_0x9003() + { + VerifyCode = verifyCodeGenerator.Get() + }); + JT809Response jT809Response = new JT809Response(package, 100); + channel.WriteAndFlushAsync(jT809Response); + logger.LogInformation($"发送从链路注销请求>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + } + catch (Exception ex) + { + logger.LogError(ex,"发送从链路注销请求"); + } + finally + { + //清理托管资源 + channel.CloseAsync(); + group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3)); + } + } + //让类型知道自己已经被释放 + disposed = true; + } + + ~JT809SubordinateClient() + { + //必须为false + //这表明,隐式清理时,只要处理非托管资源就可以了。 + Dispose(false); + } + + public void Dispose() + { + //必须为true + Dispose(true); + //通知垃圾回收机制不再调用终结器(析构器) + GC.SuppressFinalize(this); + } + } +} diff --git a/src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs b/src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs deleted file mode 100644 index d562494..0000000 --- a/src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs +++ /dev/null @@ -1,113 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Codecs; -using DotNetty.Handlers.Timeout; -using DotNetty.Transport.Bootstrapping; -using DotNetty.Transport.Channels; -using DotNetty.Transport.Channels.Sockets; -using JT809.DotNetty.Core.Codecs; -using JT809.Protocol; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Generic; -using System.Net; -using System.Text; - -namespace JT809.DotNetty.Core.Links -{ - /// - /// 从链路客户端 - /// - public sealed class SubordinateLinkClient:IDisposable - { - private Bootstrap bootstrap; - - private MultithreadEventLoopGroup group; - - private IChannel channel; - - private readonly ILogger logger; - - private readonly ILoggerFactory loggerFactory; - - private readonly IServiceProvider serviceProvider; - - private bool disposed = false; - - public SubordinateLinkClient( - IServiceProvider provider, - ILoggerFactory loggerFactory) - { - this.serviceProvider = provider; - this.loggerFactory = loggerFactory; - this.logger = loggerFactory.CreateLogger(); - } - - public async void ConnectAsync(string ip,int port,uint verifyCode) - { - group = new MultithreadEventLoopGroup(); - bootstrap = new Bootstrap(); - bootstrap.Group(group) - .Channel() - .Option(ChannelOption.TcpNodelay, true) - .Handler(new ActionChannelInitializer(channel => - { - IChannelPipeline pipeline = channel.Pipeline; - //下级平台1分钟发送心跳 - //上级平台是3分钟没有发送就断开连接 - channel.Pipeline.AddLast("jt809SystemIdleState", new IdleStateHandler(60,180,200)); - //pipeline.AddLast(new ClientConnectionHandler(bootstrap, channeldic, loggerFactory)); - channel.Pipeline.AddLast("jt809TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, - Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), - Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); - using (var scope = serviceProvider.CreateScope()) - { - channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService()); - - } - })); - channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); - } - - public async void SendAsync(byte[] data) - { - if (channel == null) throw new NullReferenceException("Channel Not Open"); - if (data == null) throw new ArgumentNullException("data is null"); - if (channel.Open && channel.Active) - { - await channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); - } - } - - private void Dispose(bool disposing) - { - if (disposed) - { - return; - } - if (disposing) - { - //清理托管资源 - channel.CloseAsync(); - group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3)); - } - //让类型知道自己已经被释放 - disposed = true; - } - - ~SubordinateLinkClient() - { - //必须为false - //这表明,隐式清理时,只要处理非托管资源就可以了。 - Dispose(false); - } - - public void Dispose() - { - //必须为true - Dispose(true); - //通知垃圾回收机制不再调用终结器(析构器) - GC.SuppressFinalize(this); - } - } -} diff --git a/src/JT809.DotNetty.Core/Metadata/JT809Response.cs b/src/JT809.DotNetty.Core/Metadata/JT809Response.cs index ecf6aa1..42932c3 100644 --- a/src/JT809.DotNetty.Core/Metadata/JT809Response.cs +++ b/src/JT809.DotNetty.Core/Metadata/JT809Response.cs @@ -13,6 +13,8 @@ namespace JT809.DotNetty.Core.Metadata /// public int MinBufferSize { get; set; } + public byte[] HexData { get; set; } + public JT809Response() { @@ -23,5 +25,10 @@ namespace JT809.DotNetty.Core.Metadata Package = package; MinBufferSize = minBufferSize; } + + public JT809Response(byte[] hexData) + { + HexData = hexData; + } } } \ No newline at end of file diff --git a/src/JT809.DotNetty.Core/Metadata/JT809TcpSession.cs b/src/JT809.DotNetty.Core/Metadata/JT809Session.cs similarity index 85% rename from src/JT809.DotNetty.Core/Metadata/JT809TcpSession.cs rename to src/JT809.DotNetty.Core/Metadata/JT809Session.cs index f6b514f..e8a86a8 100644 --- a/src/JT809.DotNetty.Core/Metadata/JT809TcpSession.cs +++ b/src/JT809.DotNetty.Core/Metadata/JT809Session.cs @@ -4,9 +4,9 @@ using JT809.Protocol.Enums; namespace JT809.DotNetty.Core.Metadata { - public class JT809TcpSession + public class JT809Session { - public JT809TcpSession(IChannel channel, uint msgGNSSCENTERID) + public JT809Session(IChannel channel, uint msgGNSSCENTERID) { MsgGNSSCENTERID = msgGNSSCENTERID; Channel = channel; diff --git a/src/JT809.DotNetty.Core/Services/JT809TcpAtomicCounterService.cs b/src/JT809.DotNetty.Core/Services/JT809AtomicCounterService.cs similarity index 64% rename from src/JT809.DotNetty.Core/Services/JT809TcpAtomicCounterService.cs rename to src/JT809.DotNetty.Core/Services/JT809AtomicCounterService.cs index aa3b77f..0166220 100644 --- a/src/JT809.DotNetty.Core/Services/JT809TcpAtomicCounterService.cs +++ b/src/JT809.DotNetty.Core/Services/JT809AtomicCounterService.cs @@ -1,19 +1,22 @@ -using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Enums; +using JT809.DotNetty.Core.Internal; +using JT809.DotNetty.Core.Metadata; namespace JT809.DotNetty.Core.Services { /// /// Tcp计数包服务 /// - public class JT809TcpAtomicCounterService + public class JT809AtomicCounterService { - private readonly JT809AtomicCounter MsgSuccessCounter = new JT809AtomicCounter(); + private readonly JT809AtomicCounter MsgSuccessCounter; - private readonly JT809AtomicCounter MsgFailCounter = new JT809AtomicCounter(); + private readonly JT809AtomicCounter MsgFailCounter; - public JT809TcpAtomicCounterService() + public JT809AtomicCounterService() { - + MsgSuccessCounter=new JT809AtomicCounter(); + MsgFailCounter = new JT809AtomicCounter(); } public void Reset() diff --git a/src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs b/src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs new file mode 100644 index 0000000..a212387 --- /dev/null +++ b/src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Collections.Concurrent; + +namespace JT809.DotNetty.Core.Services +{ + public class JT809AtomicCounterServiceFactory + { + private static readonly ConcurrentDictionary cache; + + static JT809AtomicCounterServiceFactory() + { + cache = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + } + + public JT809AtomicCounterService Create(string type) + { + if(cache.TryGetValue(type,out var service)) + { + return service; + } + else + { + var serviceNew = new JT809AtomicCounterService(); + cache.TryAdd(type, serviceNew); + return serviceNew; + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs b/src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs new file mode 100644 index 0000000..1df1fc0 --- /dev/null +++ b/src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs @@ -0,0 +1,98 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT809.DotNetty.Core.Configurations; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; +using JT809.Protocol; +using JT809.DotNetty.Core.Codecs; +using JT809.DotNetty.Core.Handlers; + +namespace JT809.DotNetty.Core.Services +{ + /// + /// JT809 Tcp网关服务 + /// + internal class JT809MainServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT809Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + private IByteBufferAllocator serverBufferAllocator; + private ILoggerFactory loggerFactory; + + public JT809MainServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT809ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT809ConfigurationAccessor.Value; + logger = loggerFactory.CreateLogger(); + this.loggerFactory = loggerFactory; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); + serverBufferAllocator = new PooledByteBufferAllocator(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .ChildOption(ChannelOption.Allocator, serverBufferAllocator) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + channel.Pipeline.AddLast("jt809MainBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), + Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); + channel.Pipeline.AddLast("jt809MainSystemIdleState", new IdleStateHandler( + configuration.ReaderIdleTimeSeconds, + configuration.WriterIdleTimeSeconds, + configuration.AllIdleTimeSeconds)); + pipeline.AddLast("jt809MainEncode", new JT809Encoder()); + pipeline.AddLast("jt809MainDecode", new JT809Decoder()); + channel.Pipeline.AddLast("jt809MainConnection", new JT809MainServerConnectionHandler(loggerFactory)); + using (var scope = serviceProvider.CreateScope()) + { + channel.Pipeline.AddLast("jt809MainService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT809 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); + return bootstrap.BindAsync(configuration.TcpPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT809.DotNetty.Core/Session/JT809TcpSessionManager.cs b/src/JT809.DotNetty.Core/Session/JT809SessionManager.cs similarity index 73% rename from src/JT809.DotNetty.Core/Session/JT809TcpSessionManager.cs rename to src/JT809.DotNetty.Core/Session/JT809SessionManager.cs index cba8419..dc7515f 100644 --- a/src/JT809.DotNetty.Core/Session/JT809TcpSessionManager.cs +++ b/src/JT809.DotNetty.Core/Session/JT809SessionManager.cs @@ -12,21 +12,21 @@ namespace JT809.DotNetty.Core /// /// JT809 Tcp会话管理 /// - public class JT809TcpSessionManager + public class JT809SessionManager { - private readonly ILogger logger; + private readonly ILogger logger; private readonly IJT809SessionPublishing jT809SessionPublishing; - public JT809TcpSessionManager( + public JT809SessionManager( IJT809SessionPublishing jT809SessionPublishing, ILoggerFactory loggerFactory) { this.jT809SessionPublishing = jT809SessionPublishing; - logger = loggerFactory.CreateLogger(); + logger = loggerFactory.CreateLogger(); } - private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(); + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(); public int SessionCount { @@ -36,9 +36,9 @@ namespace JT809.DotNetty.Core } } - public JT809TcpSession GetSession(uint msgGNSSCENTERID) + public JT809Session GetSession(uint msgGNSSCENTERID) { - if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession targetSession)) + if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session targetSession)) { return targetSession; } @@ -50,29 +50,30 @@ namespace JT809.DotNetty.Core public void Heartbeat(uint msgGNSSCENTERID) { - if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession oldjT808Session)) + if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session oldjT808Session)) { oldjT808Session.LastActiveTime = DateTime.Now; SessionIdDict.TryUpdate(msgGNSSCENTERID, oldjT808Session, oldjT808Session); } } - public void TryAdd(JT809TcpSession appSession) + public void TryAdd(IChannel channel, uint msgGNSSCENTERID) { - if (SessionIdDict.TryAdd(appSession.MsgGNSSCENTERID, appSession)) + if (SessionIdDict.ContainsKey(msgGNSSCENTERID)) return; + if (SessionIdDict.TryAdd(msgGNSSCENTERID, new JT809Session(channel, msgGNSSCENTERID))) { - jT809SessionPublishing.PublishAsync(JT809Constants.SessionOnline, appSession.MsgGNSSCENTERID.ToString()); + jT809SessionPublishing.PublishAsync(JT809Constants.SessionOnline, msgGNSSCENTERID.ToString()); } } - public JT809TcpSession RemoveSession(uint msgGNSSCENTERID) + public JT809Session RemoveSession(uint msgGNSSCENTERID) { //可以使用任意mq的发布订阅 - if (!SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession jT808Session)) + if (!SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session jT808Session)) { return default; } - if (SessionIdDict.TryRemove(msgGNSSCENTERID, out JT809TcpSession jT808SessionRemove)) + if (SessionIdDict.TryRemove(msgGNSSCENTERID, out JT809Session jT808SessionRemove)) { logger.LogInformation($">>>{msgGNSSCENTERID} Session Remove."); jT809SessionPublishing.PublishAsync(JT809Constants.SessionOffline, msgGNSSCENTERID.ToString()); @@ -91,7 +92,7 @@ namespace JT809.DotNetty.Core { foreach (var key in keys) { - SessionIdDict.TryRemove(key, out JT809TcpSession jT808SessionRemove); + SessionIdDict.TryRemove(key, out JT809Session jT808SessionRemove); } string nos = string.Join(",", keys); logger.LogInformation($">>>{nos} Channel Remove."); @@ -99,7 +100,7 @@ namespace JT809.DotNetty.Core } } - public IEnumerable GetAll() + public IEnumerable GetAll() { return SessionIdDict.Select(s => s.Value).ToList(); } diff --git a/src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs b/src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs index 6eae823..7e420a3 100644 --- a/src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs +++ b/src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs @@ -1,5 +1,7 @@ using JT809.DotNetty.Core; using JT809.DotNetty.Core.Handlers; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Links; using System; using System.Collections.Generic; using System.Text; @@ -9,9 +11,11 @@ namespace JT809.DotNetty.Tcp.Handlers /// /// 默认消息处理业务实现 /// - internal class JT809MsgIdDefaultTcpHandler : JT809MsgIdTcpHandlerBase + internal class JT809MsgIdDefaultTcpHandler : JT809MainMsgIdHandlerBase { - public JT809MsgIdDefaultTcpHandler(JT809TcpSessionManager sessionManager) : base(sessionManager) + public JT809MsgIdDefaultTcpHandler(IJT809VerifyCodeGenerator verifyCodeGenerator, + JT809SubordinateClient subordinateLinkClient, JT809SessionManager sessionManager) + : base(verifyCodeGenerator, subordinateLinkClient, sessionManager) { } } diff --git a/src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs b/src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs index 8f11e2f..5ccb7a7 100644 --- a/src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs +++ b/src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs @@ -14,10 +14,10 @@ namespace JT809.DotNetty.Tcp.Handlers { private readonly ILogger logger; - private readonly JT809TcpSessionManager jT809SessionManager; + private readonly JT809SessionManager jT809SessionManager; public JT809TcpConnectionHandler( - JT809TcpSessionManager jT809SessionManager, + JT809SessionManager jT809SessionManager, ILoggerFactory loggerFactory) { this.jT809SessionManager = jT809SessionManager; diff --git a/src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs b/src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs index 0100139..50a2585 100644 --- a/src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs +++ b/src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs @@ -8,6 +8,7 @@ using JT809.DotNetty.Core.Services; using JT809.DotNetty.Core; using JT809.DotNetty.Core.Handlers; using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Enums; namespace JT809.DotNetty.Tcp.Handlers { @@ -16,24 +17,24 @@ namespace JT809.DotNetty.Tcp.Handlers /// internal class JT809TcpServerHandler : SimpleChannelInboundHandler { - private readonly JT809MsgIdTcpHandlerBase handler; + private readonly JT809MainMsgIdHandlerBase handler; - private readonly JT809TcpSessionManager jT809SessionManager; + private readonly JT809SessionManager jT809SessionManager; - private readonly JT809TcpAtomicCounterService jT809AtomicCounterService; + private readonly JT809AtomicCounterService jT809AtomicCounterService; private readonly ILogger logger; public JT809TcpServerHandler( ILoggerFactory loggerFactory, - JT809MsgIdTcpHandlerBase handler, - JT809TcpAtomicCounterService jT809AtomicCounterService, - JT809TcpSessionManager jT809SessionManager + JT809MainMsgIdHandlerBase handler, + JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty, + JT809SessionManager jT809SessionManager ) { this.handler = handler; this.jT809SessionManager = jT809SessionManager; - this.jT809AtomicCounterService = jT809AtomicCounterService; + this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ServerMain.ToString()); ; logger = loggerFactory.CreateLogger(); } @@ -48,7 +49,7 @@ namespace JT809.DotNetty.Tcp.Handlers { logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString()); } - jT809SessionManager.TryAdd(new JT809TcpSession(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID)); + jT809SessionManager.TryAdd(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID); Func handlerFunc; if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc)) { diff --git a/src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs b/src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs index 2fff12e..3c8b4a8 100644 --- a/src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs +++ b/src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs @@ -19,11 +19,10 @@ namespace JT809.DotNetty.Tcp { public static IServiceCollection AddJT809TcpHost(this IServiceCollection serviceDescriptors) { - serviceDescriptors.TryAddSingleton(); - serviceDescriptors.TryAddSingleton(); - serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); serviceDescriptors.TryAddScoped(); - serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); serviceDescriptors.TryAddScoped(); serviceDescriptors.AddHostedService(); return serviceDescriptors; diff --git a/src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs b/src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs index 99e178c..1cc42d7 100644 --- a/src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs +++ b/src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs @@ -74,7 +74,7 @@ namespace JT809.DotNetty.Tcp channel.Pipeline.AddLast("jt809TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); - channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService()); channel.Pipeline.AddLast("jt809TcpService", scope.ServiceProvider.GetRequiredService()); } })); diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs index ce6689e..5d9c798 100644 --- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs +++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs @@ -23,7 +23,7 @@ namespace JT809.DotNetty.Host.Test }); //主链路登录请求消息 - //5B000000480000008510010134140E010000000000270F0134140E32303139303232323132372E302E302E31000000000000000000000000000000000000000000000003297B8D5D + //5B00000048000000851001013353D5010000000000270F0133530D32303134303831333132372E302E302E3100000000000000000000000000000000000000000000001FA3275F5D //主链路注销请求消息 //5B000000260000008510030134140E010000000000270F0001E24031323334353600003FE15D //主链路连接保持请求消息