diff --git a/src/JT809Netty.Core/AtomicCounter.cs b/src/JT809Netty.Core/AtomicCounter.cs new file mode 100644 index 0000000..9b4fcaa --- /dev/null +++ b/src/JT809Netty.Core/AtomicCounter.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT809Netty.Core +{ + /// + /// + /// ref:Grpc.Core.Internal + /// + public class AtomicCounter + { + long counter = 0; + + public AtomicCounter(long initialCount = 0) + { + this.counter = initialCount; + } + + public long Increment() + { + return Interlocked.Increment(ref counter); + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); + } + + public long Count + { + get + { + return Interlocked.Read(ref counter); + } + } + } +} diff --git a/src/JT809Netty.Core/Configs/JT809NettyOptions.cs b/src/JT809Netty.Core/Configs/JT809NettyOptions.cs new file mode 100644 index 0000000..e802b77 --- /dev/null +++ b/src/JT809Netty.Core/Configs/JT809NettyOptions.cs @@ -0,0 +1,14 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809Netty.Core.Configs +{ + public class JT809NettyOptions + { + public string Host { get; set; } + public int Port { get; set; } + public List IpWhiteList { get; set; } = new List(); + public bool IpWhiteListDisabled { get; set; } + } +} diff --git a/src/JT809Netty.Core/Handlers/JT808ServiceHandler.cs b/src/JT809Netty.Core/Handlers/JT808ServiceHandler.cs new file mode 100644 index 0000000..f6e88de --- /dev/null +++ b/src/JT809Netty.Core/Handlers/JT808ServiceHandler.cs @@ -0,0 +1,77 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using JT808.Protocol; +using JT808.Protocol.Extensions; +using DotNetty.Common.Utilities; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using JT808.Protocol.Exceptions; +using System.Threading; + +namespace GPS.JT808NettyServer.Handlers +{ + public class JT808ServiceHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + private readonly JT808MsgIdHandler jT808MsgIdHandler; + + public JT808ServiceHandler( + JT808MsgIdHandler jT808MsgIdHandler, + ILoggerFactory loggerFactory) + { + this.jT808MsgIdHandler = jT808MsgIdHandler; + logger = loggerFactory.CreateLogger(); + } + + public override void ChannelRead(IChannelHandlerContext context, object message) + { + var jT808RequestInfo = (JT808RequestInfo)message; + string receive = string.Empty; + try + { + if (logger.IsEnabled(LogLevel.Debug)) + { + receive = jT808RequestInfo.OriginalBuffer.ToHexString(); + } + Func handlerFunc; + if (jT808RequestInfo.JT808Package != null) + { + if (jT808MsgIdHandler.HandlerDict.TryGetValue(jT808RequestInfo.JT808Package.Header.MsgId, out handlerFunc)) + { + IJT808Package jT808PackageImpl = handlerFunc(jT808RequestInfo, context); + if (jT808PackageImpl != null) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("send>>>" + jT808PackageImpl.JT808Package.Header.MsgId.ToString() + "-" + JT808Serializer.Serialize(jT808PackageImpl.JT808Package).ToHexString()); + //logger.LogDebug("send>>>" + jT808PackageImpl.JT808Package.Header.MsgId.ToString() + "-" + JsonConvert.SerializeObject(jT808PackageImpl.JT808Package)); + } + // 需要注意: + // 1.下发应答必须要在类中重写 ChannelReadComplete 不然客户端接收不到消息 + // context.WriteAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808PackageImpl.JT808Package))); + // 2.直接发送 + context.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808PackageImpl.JT808Package))); + } + } + } + } + catch (JT808Exception ex) + { + if (logger.IsEnabled(LogLevel.Error)) + logger.LogError(ex, "JT808Exception receive<<<" + receive); + } + catch (Exception ex) + { + if (logger.IsEnabled(LogLevel.Error)) + logger.LogError(ex, "Exception receive<<<" + receive); + } + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + } +} diff --git a/src/JT809Netty.Core/Handlers/JT809DecodeHandler.cs b/src/JT809Netty.Core/Handlers/JT809DecodeHandler.cs new file mode 100644 index 0000000..9a55292 --- /dev/null +++ b/src/JT809Netty.Core/Handlers/JT809DecodeHandler.cs @@ -0,0 +1,64 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using JT809.Protocol; +using JT809.Protocol.JT809Exceptions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT809Netty.Core.Handlers +{ + /// + /// JT809解码 + /// + public class JT809DecodeHandler : ByteToMessageDecoder + { + private readonly ILogger logger; + + public JT809DecodeHandler(ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter(); + + private static readonly AtomicCounter MsgFailCounter = new AtomicCounter(); + + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + string msg = string.Empty; + byte[] buffer = null; + try + { + buffer = new byte[input.Capacity + 2]; + input.ReadBytes(buffer,1, input.Capacity); + buffer[0] = JT809Package.BEGINFLAG; + buffer[input.Capacity + 1] = JT809Package.ENDFLAG; + output.Add(JT809Serializer.Deserialize(buffer)); + MsgSuccessCounter.Increment(); + if (logger.IsEnabled(LogLevel.Debug)) + { + msg = ByteBufferUtil.HexDump(buffer); + logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString()); + } + } + catch (JT809Exception ex) + { + MsgFailCounter.Increment(); + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); + logger.LogError(ex, $"{ex.ErrorCode.ToString()}accept msg<<<{msg}"); + return; + } + catch (Exception ex) + { + MsgFailCounter.Increment(); + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); + logger.LogError(ex, "accept msg<<<" + msg); + return; + } + } + } +} diff --git a/src/JT809Netty.Core/Handlers/JT809DownMasterLinkConnectionHandler.cs b/src/JT809Netty.Core/Handlers/JT809DownMasterLinkConnectionHandler.cs new file mode 100644 index 0000000..4793815 --- /dev/null +++ b/src/JT809Netty.Core/Handlers/JT809DownMasterLinkConnectionHandler.cs @@ -0,0 +1,89 @@ +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT809Netty.Core.Configs; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Threading.Tasks; + +namespace JT809Netty.Core.Handlers +{ + /// + /// 下级平台主链路 + /// + public class JT809DownMasterLinkConnectionHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + private IOptionsMonitor optionsMonitor; + + public JT809DownMasterLinkConnectionHandler( + IOptionsMonitor optionsMonitor, + SessionManager sessionManager, + ILoggerFactory loggerFactory) + { + this.optionsMonitor = optionsMonitor; + logger = loggerFactory.CreateLogger(); + } + + public override void ChannelActive(IChannelHandlerContext context) + { + base.ChannelActive(context); + } + + /// + /// 主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug(">>>The client disconnects from the server."); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug("<< + /// 主链路超时策略 + /// 下级平台登录成功后,在与上级平台之间如果有应用业务数据包往来的情况下,不需要发送主链路保持数据包; + /// 否则,下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + switch (idleStateEvent.State) + { + //case IdleState.ReaderIdle: + + // break; + case IdleState.WriterIdle: +#warning 发送心跳保持 + break; + //case IdleState.AllIdle: + + // break; + default: + + break; + } + } + base.UserEventTriggered(context, evt); + } + } +} diff --git a/src/JT809Netty.Core/Handlers/JT809DownSlaveLinkConnectionHandler.cs b/src/JT809Netty.Core/Handlers/JT809DownSlaveLinkConnectionHandler.cs new file mode 100644 index 0000000..97f521b --- /dev/null +++ b/src/JT809Netty.Core/Handlers/JT809DownSlaveLinkConnectionHandler.cs @@ -0,0 +1,91 @@ +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT809Netty.Core.Configs; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System.Threading.Tasks; + +namespace JT809Netty.Core.Handlers +{ + /// + /// 下级平台从链路 + /// + public class JT809DownSlaveLinkConnectionHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + private readonly SessionManager sessionManager; + + private IOptionsMonitor optionsMonitor; + + public JT809DownSlaveLinkConnectionHandler( + IOptionsMonitor optionsMonitor, + SessionManager sessionManager, + ILoggerFactory loggerFactory) + { + this.optionsMonitor = optionsMonitor; + this.sessionManager = sessionManager; + logger = loggerFactory.CreateLogger(); + } + + public override void ChannelActive(IChannelHandlerContext context) + { + base.ChannelActive(context); + } + + /// + /// 主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug(">>>The client disconnects from the server."); + sessionManager.RemoveSessionByID(context.Channel.Id.AsShortText()); + base.ChannelInactive(context); + } + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug("<< + /// 从链路超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + switch (idleStateEvent.State) + { + case IdleState.ReaderIdle: + //下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。 + context.CloseAsync(); + break; + //case IdleState.WriterIdle: + + // break; + //case IdleState.AllIdle: + + // break; + default: + + break; + } + } + base.UserEventTriggered(context, evt); + } + } +} diff --git a/src/JT809Netty.Core/IAppSession.cs b/src/JT809Netty.Core/IAppSession.cs new file mode 100644 index 0000000..7924137 --- /dev/null +++ b/src/JT809Netty.Core/IAppSession.cs @@ -0,0 +1,19 @@ +using DotNetty.Transport.Channels; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT809Netty.Core +{ + public interface IAppSession + { + string SessionID { get; } + + IChannel Channel { get; } + + DateTime LastActiveTime { get; set; } + + DateTime StartTime { get; } + } +} diff --git a/src/JT809Netty.Core/JT809DownMasterLinkNettyService.cs b/src/JT809Netty.Core/JT809DownMasterLinkNettyService.cs new file mode 100644 index 0000000..4d752a3 --- /dev/null +++ b/src/JT809Netty.Core/JT809DownMasterLinkNettyService.cs @@ -0,0 +1,101 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using DotNetty.Transport.Libuv; +using JT809Netty.Core.Configs; +using JT809Netty.Core.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809Netty.Core +{ + /// + /// 下级平台主链路 + /// nettyOptions; + + public JT809DownMasterLinkNettyService( + IOptionsMonitor nettyOptionsAccessor, + IServiceProvider serviceProvider) + { + nettyOptions = nettyOptionsAccessor; + this.serviceProvider = serviceProvider; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + nettyOptions.OnChange(options => + { + try + { + bootstrap.ConnectAsync(options.Host, options.Port); + } + catch (Exception ex) + { + + } + }); + try + { + workerGroup = new MultithreadEventLoopGroup(); + bootstrap = new Bootstrap(); + bootstrap.Group(workerGroup) + .Channel() + .Handler(new ActionChannelInitializer(channel => + { + InitChannel(channel); + })) + .Option(ChannelOption.SoBacklog, 1048576); + bootstrap.ConnectAsync(nettyOptions.CurrentValue.Host, nettyOptions.CurrentValue.Port); + } + catch (Exception ex) + { + + } + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + try + { + Task.WhenAll(workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); + } + catch (Exception ex) + { + + } + return Task.CompletedTask; + } + + private void InitChannel(IChannel channel) + { + var scope = serviceProvider.CreateScope(); + //下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接 + channel.Pipeline.AddLast("systemIdleState", new WriteTimeoutHandler(60)); + channel.Pipeline.AddLast("jt809DownMasterLinkConnection", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG }))); + channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService()); + //channel.Pipeline.AddLast("jt809Service", scope.ServiceProvider.GetRequiredService()); + scope.Dispose(); + } + } +} diff --git a/src/JT809Netty.Core/JT809DownSlaveLinkNettyService.cs b/src/JT809Netty.Core/JT809DownSlaveLinkNettyService.cs new file mode 100644 index 0000000..416c563 --- /dev/null +++ b/src/JT809Netty.Core/JT809DownSlaveLinkNettyService.cs @@ -0,0 +1,104 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT809Netty.Core.Configs; +using JT809Netty.Core.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809Netty.Core +{ + /// + /// 下级平台从链路 + /// + public class JT809DownSlaveLinkNettyService : IHostedService + { + IEventLoopGroup bossGroup; + + IEventLoopGroup workerGroup; + + IChannel boundChannel; + + readonly IServiceProvider serviceProvider; + + readonly JT809NettyOptions nettyOptions; + + public JT809DownSlaveLinkNettyService( + IOptions nettyOptionsAccessor, + IServiceProvider serviceProvider) + { + nettyOptions = nettyOptionsAccessor.Value; + this.serviceProvider = serviceProvider; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + try + { + var dispatcher = new DispatcherEventLoopGroup(); + bossGroup = dispatcher; + workerGroup = new WorkerEventLoopGroup(dispatcher); + var bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + bootstrap + //.Handler(new LoggingHandler("SRV-LSTN")) + .ChildHandler(new ActionChannelInitializer(channel => + { + InitChannel(channel); + })) + .Option(ChannelOption.SoBacklog, 1048576); + if (nettyOptions.Host == "") + { + boundChannel = bootstrap.BindAsync(nettyOptions.Port).Result; + } + else + { + boundChannel = bootstrap.BindAsync(nettyOptions.Host, nettyOptions.Port).Result; + } + } + catch (Exception ex) + { + + } + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + try + { + Task.WhenAll( + bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), + workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), + boundChannel.CloseAsync()); + } + catch (Exception ex) + { + + } + return Task.CompletedTask; + } + + private void InitChannel(IChannel channel) + { + var scope = serviceProvider.CreateScope(); + //下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。 + channel.Pipeline.AddLast("systemIdleState", new ReadTimeoutHandler(180)); + channel.Pipeline.AddLast("jt809DownSlaveLinkConnection", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG }))); + channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService()); + //channel.Pipeline.AddLast("jt809Service", scope.ServiceProvider.GetRequiredService()); + scope.Dispose(); + } + } +} diff --git a/src/JT809Netty.Core/JT809Netty.Core.csproj b/src/JT809Netty.Core/JT809Netty.Core.csproj new file mode 100644 index 0000000..74aa6ca --- /dev/null +++ b/src/JT809Netty.Core/JT809Netty.Core.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + latest + + + + + + + + + + + + + + + + + diff --git a/src/JT809Netty.Core/JT809Session.cs b/src/JT809Netty.Core/JT809Session.cs new file mode 100644 index 0000000..c7854c5 --- /dev/null +++ b/src/JT809Netty.Core/JT809Session.cs @@ -0,0 +1,39 @@ +using DotNetty.Transport.Channels; +using System; +using JT809.Protocol.JT809Enums; + +namespace JT809Netty.Core +{ + public class JT809Session: IAppSession + { + public JT809Session(IChannel channel, string vehicleNo,JT809VehicleColorType vehicleColor) + { + Channel = channel; + VehicleNo = vehicleNo; + VehicleColor = vehicleColor; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + SessionID = Channel.Id.AsShortText(); + Key = $"{VehicleNo}_{VehicleColor.ToString()}"; + } + + /// + /// 车牌号 + /// + public string VehicleNo { get; set; } + /// + /// 车牌颜色 + /// + public JT809VehicleColorType VehicleColor { get; set; } + + public string Key { get; set; } + + public string SessionID { get; } + + public IChannel Channel { get;} + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; } + } +} diff --git a/src/JT809Netty.Core/SessionManager.cs b/src/JT809Netty.Core/SessionManager.cs new file mode 100644 index 0000000..0c69bad --- /dev/null +++ b/src/JT809Netty.Core/SessionManager.cs @@ -0,0 +1,220 @@ +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Logging.Abstractions; +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809Netty.Core +{ + public class SessionManager:IDisposable + { + private readonly ILogger logger; + + private readonly CancellationTokenSource cancellationTokenSource; + +#if DEBUG + private const int timeout = 1 * 1000 * 60; +#else + private const int timeout = 5 * 1000 * 60; +#endif + public SessionManager(ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + cancellationTokenSource = new CancellationTokenSource(); + Task.Run(() => + { + while (!cancellationTokenSource.IsCancellationRequested) + { + logger.LogInformation($"Online Count>>>{SessionCount}"); + if (SessionCount > 0) + { + logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}"); + logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", CustomKey_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}"); + } + Thread.Sleep(timeout); + } + }, cancellationTokenSource.Token); + } + + /// + /// Netty生成的sessionID和Session的对应关系 + /// key = seession id + /// value = Session + /// + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + /// + /// 自定义Key和netty生成的sessionID的对应关系 + /// key = 终端手机号 + /// value = seession id + /// + private ConcurrentDictionary CustomKey_SessionId_Dict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public void RegisterSession(JT809Session appSession) + { + if (CustomKey_SessionId_Dict.ContainsKey(appSession.Key)) + { + return; + } + if (SessionIdDict.TryAdd(appSession.SessionID, appSession) && + CustomKey_SessionId_Dict.TryAdd(appSession.Key, appSession.SessionID)) + { + return; + } + } + + public JT809Session GetSessionByID(string sessionID) + { + if (string.IsNullOrEmpty(sessionID)) + return default; + JT809Session targetSession; + SessionIdDict.TryGetValue(sessionID, out targetSession); + return targetSession; + } + + public JT809Session GetSessionByTerminalPhoneNo(string key) + { + try + { + if (string.IsNullOrEmpty(key)) + return default; + if (CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId)) + { + if (SessionIdDict.TryGetValue(sessionId, out JT809Session targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + else + { + return default; + } + } + catch (Exception ex) + { + logger.LogError(ex, key); + return default; + } + } + + public void Heartbeat(string key) + { + try + { + if(CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId)) + { + if (SessionIdDict.TryGetValue(sessionId, out JT809Session oldjT808Session)) + { + if (oldjT808Session.Channel.Active) + { + oldjT808Session.LastActiveTime = DateTime.Now; + if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session)) + { + + } + } + } + } + } + catch (Exception ex) + { + logger.LogError(ex, key); + } + } + + /// + /// 通过通道Id和自定义key进行关联 + /// + /// + /// + public void UpdateSessionByID(string sessionID, string key) + { + try + { + if (SessionIdDict.TryGetValue(sessionID, out JT809Session oldjT808Session)) + { + oldjT808Session.Key = key; + if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session)) + { + CustomKey_SessionId_Dict.AddOrUpdate(key, sessionID, (tpn, sid) => + { + return sessionID; + }); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, $"{sessionID},{key}"); + } + } + + public void RemoveSessionByID(string sessionID) + { + if (sessionID == null) return; + try + { + if (SessionIdDict.TryRemove(sessionID, out JT809Session session)) + { + if (session.Key != null) + { + if(CustomKey_SessionId_Dict.TryRemove(session.Key, out string sessionid)) + { + logger.LogInformation($">>>{sessionID}-{session.Key} Session Remove."); + } + } + else + { + logger.LogInformation($">>>{sessionID} Session Remove."); + } + session.Channel.CloseAsync(); + } + } + catch (Exception ex) + { + logger.LogError(ex, $">>>{sessionID} Session Remove Exception"); + } + } + + public void RemoveSessionByKey(string key) + { + if (key == null) return; + try + { + if (CustomKey_SessionId_Dict.TryRemove(key, out string sessionid)) + { + if (SessionIdDict.TryRemove(sessionid, out JT809Session session)) + { + logger.LogInformation($">>>{key}-{sessionid} Key Remove."); + } + else + { + logger.LogInformation($">>>{key} Key Remove."); + } + } + } + catch (Exception ex) + { + logger.LogError(ex, $">>>{key} Key Remove Exception."); + } + } + + public void Dispose() + { + cancellationTokenSource.Cancel(); + } + } +} diff --git a/src/JT809Netty.sln b/src/JT809Netty.sln new file mode 100644 index 0000000..5b0b11d --- /dev/null +++ b/src/JT809Netty.sln @@ -0,0 +1,25 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio 15 +VisualStudioVersion = 15.0.28010.2016 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809Netty.Core", "JT809Netty.Core\JT809Netty.Core.csproj", "{2054D7E6-53B6-412F-BE9D-C6DABD80A111}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Debug|Any CPU.Build.0 = Debug|Any CPU + {2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Release|Any CPU.ActiveCfg = Release|Any CPU + {2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D} + EndGlobalSection +EndGlobal diff --git a/src/JT809NettyServer/JT809NettyServer.csproj b/src/JT809NettyServer/JT809NettyServer.csproj new file mode 100644 index 0000000..d3fc122 --- /dev/null +++ b/src/JT809NettyServer/JT809NettyServer.csproj @@ -0,0 +1,12 @@ + + + + Exe + netcoreapp2.1 + + + + + + + diff --git a/src/JT809NettyServer/Program.cs b/src/JT809NettyServer/Program.cs new file mode 100644 index 0000000..a596836 --- /dev/null +++ b/src/JT809NettyServer/Program.cs @@ -0,0 +1,12 @@ +using System; + +namespace JT809NettyServer +{ + class Program + { + static void Main(string[] args) + { + Console.WriteLine("Hello World!"); + } + } +}