From 057840d33cf9115c04e4599c73017f659032a7b5 Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Tue, 16 Oct 2018 17:47:11 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=9D=E5=A7=8B=E5=8C=96JT808=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E7=AB=AF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../ISourcePackageDispatcher.cs | 15 ++ .../JT808.Abstractions.csproj | 7 + src/JT808.DotNetty.sln | 25 ++ src/JT808.DotNetty/Codecs/JT808Decoder.cs | 68 ++++++ .../Configurations/JT808Configuration.cs | 34 +++ src/JT808.DotNetty/DotnettyExtensions.cs | 27 +++ .../Handlers/JT808ConnectionHandler.cs | 96 ++++++++ .../Handlers/JT808ServerHandler.cs | 39 +++ src/JT808.DotNetty/Internal/AtomicCounter.cs | 39 +++ .../Internal/JT808MsgIdDefaultHandler.cs | 16 ++ src/JT808.DotNetty/JT808.DotNetty.csproj | 19 ++ src/JT808.DotNetty/JT808MsgIdHandlerBase.cs | 162 +++++++++++++ src/JT808.DotNetty/JT808ServerHost.cs | 90 +++++++ src/JT808.DotNetty/JT808Session.cs | 32 +++ src/JT808.DotNetty/SessionManager.cs | 224 ++++++++++++++++++ 15 files changed, 893 insertions(+) create mode 100644 src/JT808.Abstractions/ISourcePackageDispatcher.cs create mode 100644 src/JT808.Abstractions/JT808.Abstractions.csproj create mode 100644 src/JT808.DotNetty.sln create mode 100644 src/JT808.DotNetty/Codecs/JT808Decoder.cs create mode 100644 src/JT808.DotNetty/Configurations/JT808Configuration.cs create mode 100644 src/JT808.DotNetty/DotnettyExtensions.cs create mode 100644 src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs create mode 100644 src/JT808.DotNetty/Handlers/JT808ServerHandler.cs create mode 100644 src/JT808.DotNetty/Internal/AtomicCounter.cs create mode 100644 src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs create mode 100644 src/JT808.DotNetty/JT808.DotNetty.csproj create mode 100644 src/JT808.DotNetty/JT808MsgIdHandlerBase.cs create mode 100644 src/JT808.DotNetty/JT808ServerHost.cs create mode 100644 src/JT808.DotNetty/JT808Session.cs create mode 100644 src/JT808.DotNetty/SessionManager.cs diff --git a/src/JT808.Abstractions/ISourcePackageDispatcher.cs b/src/JT808.Abstractions/ISourcePackageDispatcher.cs new file mode 100644 index 0000000..503cbb5 --- /dev/null +++ b/src/JT808.Abstractions/ISourcePackageDispatcher.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Abstractions +{ + /// + /// 源包分发器 + /// + public interface ISourcePackageDispatcher + { + Task SendAsync(byte[] data); + } +} diff --git a/src/JT808.Abstractions/JT808.Abstractions.csproj b/src/JT808.Abstractions/JT808.Abstractions.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/JT808.Abstractions/JT808.Abstractions.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/src/JT808.DotNetty.sln b/src/JT808.DotNetty.sln new file mode 100644 index 0000000..48e2787 --- /dev/null +++ b/src/JT808.DotNetty.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.28010.2016 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty", "JT808.DotNetty\JT808.DotNetty.csproj", "{80C7F67E-6B7C-4178-8726-ADD3695622DD}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.Build.0 = Debug|Any CPU + {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.ActiveCfg = Release|Any CPU + {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B} + EndGlobalSection +EndGlobal diff --git a/src/JT808.DotNetty/Codecs/JT808Decoder.cs b/src/JT808.DotNetty/Codecs/JT808Decoder.cs new file mode 100644 index 0000000..c1fb5a8 --- /dev/null +++ b/src/JT808.DotNetty/Codecs/JT808Decoder.cs @@ -0,0 +1,68 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.Protocol; +using JT808.DotNetty.Internal; + +namespace JT808.DotNetty.Codecs +{ + /// + /// JT808解码 + /// + internal class JT808Decoder : ByteToMessageDecoder + { + private readonly ILogger logger; + + public JT808Decoder(ILogger logger) + { + this.logger = logger; + } + + public JT808Decoder(ILoggerFactory loggerFactory) + { + this.logger = loggerFactory.CreateLogger(); + } + + private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter(); + + private static readonly AtomicCounter MsgFailCounter = new AtomicCounter(); + + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + byte[] buffer = new byte[input.Capacity + 2]; + try + { + input.ReadBytes(buffer, 1, input.Capacity); + buffer[0] = JT808Package.BeginFlag; + buffer[input.Capacity + 1] = JT808Package.EndFlag; + JT808Package jT808Package = JT808Serializer.Deserialize(buffer); + output.Add(jT808Package); + MsgSuccessCounter.Increment(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString()); + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + MsgFailCounter.Increment(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + catch (Exception ex) + { + MsgFailCounter.Increment(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); + logger.LogError(ex, "accept msg<<<" + buffer); + } + } + } + } +} diff --git a/src/JT808.DotNetty/Configurations/JT808Configuration.cs b/src/JT808.DotNetty/Configurations/JT808Configuration.cs new file mode 100644 index 0000000..d0ffbe9 --- /dev/null +++ b/src/JT808.DotNetty/Configurations/JT808Configuration.cs @@ -0,0 +1,34 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Configurations +{ + public class JT808Configuration + { + public int Port { get; set; } = 808; + + public int QuietPeriodSeconds { get; set; } = 1; + + public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); + + public int ShutdownTimeoutSeconds { get; set; } = 3; + + public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); + + public int SoBacklog { get; set; } = 8192; + + public int EventLoopCount { get; set; } = Environment.ProcessorCount; + + public int ReaderIdleTimeSeconds { get; set; } = 3600; + + public int WriterIdleTimeSeconds { get; set; } = 3600; + + public int AllIdleTimeSeconds { get; set; } = 3600; + /// + /// 会话报时 + /// 默认5分钟 + /// + public int SessionReportTime { get; set; } = 30000; + } +} diff --git a/src/JT808.DotNetty/DotnettyExtensions.cs b/src/JT808.DotNetty/DotnettyExtensions.cs new file mode 100644 index 0000000..f526dbe --- /dev/null +++ b/src/JT808.DotNetty/DotnettyExtensions.cs @@ -0,0 +1,27 @@ +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Handlers; +using JT808.DotNetty.Internal; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using System; +using System.Reflection; + +namespace JT808.DotNetty +{ + public static class DotnettyExtensions + { + public static IHostBuilder UseJT808Host(this IHostBuilder builder) + { + return builder.ConfigureServices((hostContext, services) => + { + services.Configure(hostContext.Configuration.GetSection("JT808Configuration")); + services.TryAddSingleton(); + services.TryAddSingleton(); + services.TryAddScoped(); + services.TryAddScoped(); + services.AddHostedService(); + }); + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs new file mode 100644 index 0000000..1931030 --- /dev/null +++ b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs @@ -0,0 +1,96 @@ +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Handlers +{ + internal class JT808ConnectionHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + public JT808ConnectionHandler( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + public JT808ConnectionHandler( + ILogger logger) + { + this.logger = logger; + } + + /// + /// 通道激活 + /// + /// + public override void ChannelActive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } Successful client connection to server."); + base.ChannelActive(context); + } + + /// + /// 设备主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($">>>{ channelId } The client disconnects from the server."); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); + return base.CloseAsync(context); + } + + public override void ChannelReadComplete(IChannelHandlerContext context) + { + context.Flush(); + } + + /// + /// 超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 + context.CloseAsync(); + } + base.UserEventTriggered(context, evt); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogError(exception,$"{channelId} {exception.Message}" ); + context.CloseAsync(); + } + } +} + diff --git a/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs new file mode 100644 index 0000000..e824053 --- /dev/null +++ b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs @@ -0,0 +1,39 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Handlers +{ + internal class JT808ServerHandler : SimpleChannelInboundHandler + { + private readonly JT808MsgIdHandlerBase handler; + + public JT808ServerHandler(JT808MsgIdHandlerBase handler) + { + this.handler = handler; + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, JT808Package msg) + { + try + { + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(msg.Header.MsgId, out handlerFunc)) + { + JT808Package jT808Package = handlerFunc(msg, ctx); + if (jT808Package != null) + { + ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package))); + } + } + } + catch + { + + } + } + } +} diff --git a/src/JT808.DotNetty/Internal/AtomicCounter.cs b/src/JT808.DotNetty/Internal/AtomicCounter.cs new file mode 100644 index 0000000..e0f3b0c --- /dev/null +++ b/src/JT808.DotNetty/Internal/AtomicCounter.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.DotNetty.Internal +{ + /// + /// + /// + /// + internal class AtomicCounter + { + long counter = 0; + + public AtomicCounter(long initialCount = 0) + { + this.counter = initialCount; + } + + public long Increment() + { + return Interlocked.Increment(ref counter); + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); + } + + public long Count + { + get + { + return Interlocked.Read(ref counter); + } + } + } +} diff --git a/src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs b/src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs new file mode 100644 index 0000000..7b65c3d --- /dev/null +++ b/src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Internal +{ + /// + /// 默认消息处理业务实现 + /// + internal class JT808MsgIdDefaultHandler : JT808MsgIdHandlerBase + { + public JT808MsgIdDefaultHandler(SessionManager sessionManager) : base(sessionManager) + { + } + } +} diff --git a/src/JT808.DotNetty/JT808.DotNetty.csproj b/src/JT808.DotNetty/JT808.DotNetty.csproj new file mode 100644 index 0000000..07f0ec5 --- /dev/null +++ b/src/JT808.DotNetty/JT808.DotNetty.csproj @@ -0,0 +1,19 @@ + + + + netstandard2.0 + latest + + + + + + + + + + + + + + diff --git a/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs b/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs new file mode 100644 index 0000000..0d3926d --- /dev/null +++ b/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs @@ -0,0 +1,162 @@ +using System; +using System.Collections.Generic; +using System.Text; +using DotNetty.Transport.Channels; +using JT808.Protocol; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; + +namespace JT808.DotNetty +{ + /// + /// 抽象消息处理业务 + /// + public abstract class JT808MsgIdHandlerBase + { + protected SessionManager sessionManager { get; } + /// + /// 初始化消息处理业务 + /// + protected JT808MsgIdHandlerBase(SessionManager sessionManager) + { + this.sessionManager = sessionManager; + HandlerDict = new Dictionary> + { + {JT808MsgId.终端通用应答, Msg0x0001}, + {JT808MsgId.终端鉴权, Msg0x0102}, + {JT808MsgId.终端心跳, Msg0x0002}, + {JT808MsgId.终端注销, Msg0x0003}, + {JT808MsgId.终端注册, Msg0x0100}, + {JT808MsgId.位置信息汇报,Msg0x0200 }, + {JT808MsgId.定位数据批量上传,Msg0x0704 }, + {JT808MsgId.数据上行透传,Msg0x0900 } + }; + } + + public Dictionary> HandlerDict { get; } + /// + /// 终端通用应答 + /// + /// + /// + /// + public virtual JT808Package Msg0x0001(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 终端心跳 + /// + /// + /// + /// + public virtual JT808Package Msg0x0002(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + sessionManager.Heartbeat(reqJT808Package.Header.TerminalPhoneNo); + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 终端注销 + /// + /// + /// + /// + public virtual JT808Package Msg0x0003(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + sessionManager.RemoveSessionByTerminalPhoneNo(reqJT808Package.Header.TerminalPhoneNo); + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 终端注册 + /// + /// + /// + /// + public virtual JT808Package Msg0x0100(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + return JT808MsgId.终端注册应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8100() + { + Code = "J" + reqJT808Package.Header.TerminalPhoneNo, + JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 终端鉴权 + /// + /// + /// + /// + public virtual JT808Package Msg0x0102(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + sessionManager.RegisterSession(new JT808Session(ctx.Channel, reqJT808Package.Header.TerminalPhoneNo)); + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 位置信息汇报 + /// + /// + /// + /// + public virtual JT808Package Msg0x0200(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 定位数据批量上传 + /// + /// + /// + /// + public virtual JT808Package Msg0x0704(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + /// + /// 数据上行透传 + /// + /// + /// + /// + public virtual JT808Package Msg0x0900(JT808Package reqJT808Package, IChannelHandlerContext ctx) + { + return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = reqJT808Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.Success, + MsgNum = reqJT808Package.Header.MsgNum + }); + } + } +} diff --git a/src/JT808.DotNetty/JT808ServerHost.cs b/src/JT808.DotNetty/JT808ServerHost.cs new file mode 100644 index 0000000..4ae3e94 --- /dev/null +++ b/src/JT808.DotNetty/JT808ServerHost.cs @@ -0,0 +1,90 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Net; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty +{ + public class JT808ServerHost : IHostedService + { + public IServiceProvider Provider { get; } + private readonly JT808Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + + public JT808ServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + Provider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using(var scope= Provider.CreateScope()) + { + channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler( + configuration.ReaderIdleTimeSeconds, + configuration.WriterIdleTimeSeconds, + configuration.AllIdleTimeSeconds)); + channel.Pipeline.AddLast("jt808Connection", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt808Service", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"Server start at {IPAddress.Any}:{configuration.Port}."); + return bootstrap.BindAsync(configuration.Port) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT808.DotNetty/JT808Session.cs b/src/JT808.DotNetty/JT808Session.cs new file mode 100644 index 0000000..da7aa9e --- /dev/null +++ b/src/JT808.DotNetty/JT808Session.cs @@ -0,0 +1,32 @@ +using DotNetty.Transport.Channels; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty +{ + public class JT808Session + { + public JT808Session(IChannel channel, string terminalPhoneNo) + { + Channel = channel; + TerminalPhoneNo = terminalPhoneNo; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + SessionID = Channel.Id.AsShortText(); + } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + + public string SessionID { get; } + + public IChannel Channel { get; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; } + } +} diff --git a/src/JT808.DotNetty/SessionManager.cs b/src/JT808.DotNetty/SessionManager.cs new file mode 100644 index 0000000..33cff17 --- /dev/null +++ b/src/JT808.DotNetty/SessionManager.cs @@ -0,0 +1,224 @@ +using JT808.DotNetty.Configurations; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty +{ + public class SessionManager: IDisposable + { + private readonly ILogger logger; + private readonly JT808Configuration configuration; + private readonly CancellationTokenSource cancellationTokenSource; + public SessionManager( + IOptions jT808ConfigurationAccessor, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + configuration = jT808ConfigurationAccessor.Value; + cancellationTokenSource = new CancellationTokenSource(); + Task.Run(() => + { + while (!cancellationTokenSource.IsCancellationRequested) + { + logger.LogInformation($"Online Count>>>{SessionCount}"); + if (SessionCount > 0) + { + logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}"); + logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", TerminalPhoneNo_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}"); + } + Thread.Sleep(configuration.SessionReportTime); + } + }, cancellationTokenSource.Token); + } + + /// + /// Netty生成的sessionID和Session的对应关系 + /// key = seession id + /// value = Session + /// + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + /// + /// 终端手机号和netty生成的sessionID的对应关系 + /// key = 终端手机号 + /// value = seession id + /// + private ConcurrentDictionary TerminalPhoneNo_SessionId_Dict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public void RegisterSession(JT808Session appSession) + { + if (TerminalPhoneNo_SessionId_Dict.ContainsKey(appSession.TerminalPhoneNo)) + { + return; + } + if (SessionIdDict.TryAdd(appSession.SessionID, appSession) && + TerminalPhoneNo_SessionId_Dict.TryAdd(appSession.TerminalPhoneNo, appSession.SessionID)) + { + return; + } + } + + public JT808Session GetSessionByID(string sessionID) + { + if (string.IsNullOrEmpty(sessionID)) + return default; + JT808Session targetSession; + SessionIdDict.TryGetValue(sessionID, out targetSession); + return targetSession; + } + + public JT808Session GetSessionByTerminalPhoneNo(string terminalPhoneNo) + { + try + { + if (string.IsNullOrEmpty(terminalPhoneNo)) + return default; + if (TerminalPhoneNo_SessionId_Dict.TryGetValue(terminalPhoneNo, out string sessionId)) + { + if (SessionIdDict.TryGetValue(sessionId, out JT808Session targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + else + { + return default; + } + } + catch (Exception ex) + { + logger.LogError(ex, terminalPhoneNo); + return default; + } + } + + public void Heartbeat(string terminalPhoneNo) + { + try + { + if (TerminalPhoneNo_SessionId_Dict.TryGetValue(terminalPhoneNo, out string sessionId)) + { + if (SessionIdDict.TryGetValue(sessionId, out JT808Session oldjT808Session)) + { + if (oldjT808Session.Channel.Active) + { + oldjT808Session.LastActiveTime = DateTime.Now; + if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session)) + { + + } + } + } + } + } + catch (Exception ex) + { + logger.LogError(ex, terminalPhoneNo); + } + } + + /// + /// 通过通道Id和设备终端号进行关联 + /// + /// + /// + public void UpdateSessionByID(string sessionID, string terminalPhoneNo) + { + try + { + if (SessionIdDict.TryGetValue(sessionID, out JT808Session oldjT808Session)) + { + oldjT808Session.TerminalPhoneNo = terminalPhoneNo; + if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session)) + { + TerminalPhoneNo_SessionId_Dict.AddOrUpdate(terminalPhoneNo, sessionID, (tpn, sid) => + { + return sessionID; + }); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, $"{sessionID},{terminalPhoneNo}"); + } + } + + public void RemoveSessionByID(string sessionID) + { + if (sessionID == null) return; + try + { + if (SessionIdDict.TryRemove(sessionID, out JT808Session session)) + { + if (session.TerminalPhoneNo != null) + { + if (TerminalPhoneNo_SessionId_Dict.TryRemove(session.TerminalPhoneNo, out string sessionid)) + { + logger.LogInformation($">>>{sessionID}-{session.TerminalPhoneNo} Session Remove."); + } + } + else + { + logger.LogInformation($">>>{sessionID} Session Remove."); + } + // call GPS.JT808NettyServer.Handlers.JT808ConnectionHandler.CloseAsync + session.Channel.CloseAsync(); + } + } + catch (Exception ex) + { + logger.LogError(ex, $">>>{sessionID} Session Remove Exception"); + } + } + + public void RemoveSessionByTerminalPhoneNo(string terminalPhoneNo) + { + if (terminalPhoneNo == null) return; + try + { + if (TerminalPhoneNo_SessionId_Dict.TryRemove(terminalPhoneNo, out string sessionid)) + { + if (SessionIdDict.TryRemove(sessionid, out JT808Session session)) + { + logger.LogInformation($">>>{terminalPhoneNo}-{sessionid} TerminalPhoneNo Remove."); + } + else + { + logger.LogInformation($">>>{terminalPhoneNo} TerminalPhoneNo Remove."); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, $">>>{terminalPhoneNo} TerminalPhoneNo Remove Exception."); + } + } + + public void Dispose() + { + cancellationTokenSource.Cancel(); + cancellationTokenSource.Dispose(); + } + } +} +