@@ -1,23 +0,0 @@ | |||
using JT809.DotNetty.Core; | |||
using JT809.DotNetty.Core.Handlers; | |||
using JT809.DotNetty.Core.Interfaces; | |||
using JT809.DotNetty.Core.Links; | |||
using JT809.DotNetty.Core.Session; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT809.DotNetty.Tcp.Handlers | |||
{ | |||
/// <summary> | |||
/// 默认消息处理业务实现 | |||
/// </summary> | |||
internal class JT809MsgIdDefaultTcpHandler : JT809MainMsgIdHandlerBase | |||
{ | |||
public JT809MsgIdDefaultTcpHandler(IJT809VerifyCodeGenerator verifyCodeGenerator, | |||
JT809SubordinateClient subordinateLinkClient, JT809MainSessionManager sessionManager) | |||
: base(verifyCodeGenerator, subordinateLinkClient, sessionManager) | |||
{ | |||
} | |||
} | |||
} |
@@ -1,98 +0,0 @@ | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Channels; | |||
using JT809.DotNetty.Core; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Threading.Tasks; | |||
namespace JT809.DotNetty.Tcp.Handlers | |||
{ | |||
/// <summary> | |||
/// JT809服务通道处理程序 | |||
/// </summary> | |||
internal class JT809TcpConnectionHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT809TcpConnectionHandler> logger; | |||
private readonly JT809MainSessionManager jT809SessionManager; | |||
public JT809TcpConnectionHandler( | |||
JT809MainSessionManager jT809SessionManager, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT809SessionManager = jT809SessionManager; | |||
logger = loggerFactory.CreateLogger<JT809TcpConnectionHandler>(); | |||
} | |||
/// <summary> | |||
/// 通道激活 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelActive(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($"<<<{ channelId } Successful client connection to server."); | |||
base.ChannelActive(context); | |||
} | |||
/// <summary> | |||
/// 设备主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelInactive(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($">>>{ channelId } The client disconnects from the server."); | |||
jT809SessionManager.RemoveSessionByChannel(context.Channel); | |||
base.ChannelInactive(context); | |||
} | |||
/// <summary> | |||
/// 服务器主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <returns></returns> | |||
public override Task CloseAsync(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); | |||
jT809SessionManager.RemoveSessionByChannel(context.Channel); | |||
return base.CloseAsync(context); | |||
} | |||
public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush(); | |||
/// <summary> | |||
/// 超时策略 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <param name="evt"></param> | |||
public override void UserEventTriggered(IChannelHandlerContext context, object evt) | |||
{ | |||
IdleStateEvent idleStateEvent = evt as IdleStateEvent; | |||
if (idleStateEvent != null) | |||
{ | |||
if(idleStateEvent.State== IdleState.ReaderIdle) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); | |||
jT809SessionManager.RemoveSessionByChannel(context.Channel); | |||
context.CloseAsync(); | |||
} | |||
} | |||
base.UserEventTriggered(context, evt); | |||
} | |||
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
logger.LogError(exception,$"{channelId} {exception.Message}" ); | |||
jT809SessionManager.RemoveSessionByChannel(context.Channel); | |||
context.CloseAsync(); | |||
} | |||
} | |||
} | |||
@@ -1,84 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Transport.Channels; | |||
using JT809.Protocol; | |||
using System; | |||
using Microsoft.Extensions.Logging; | |||
using JT809.Protocol.Exceptions; | |||
using JT809.DotNetty.Core.Services; | |||
using JT809.DotNetty.Core; | |||
using JT809.DotNetty.Core.Handlers; | |||
using JT809.DotNetty.Core.Metadata; | |||
using JT809.DotNetty.Core.Enums; | |||
namespace JT809.DotNetty.Tcp.Handlers | |||
{ | |||
/// <summary> | |||
/// JT809服务端处理程序 | |||
/// </summary> | |||
internal class JT809TcpServerHandler : SimpleChannelInboundHandler<byte[]> | |||
{ | |||
private readonly JT809MainMsgIdHandlerBase handler; | |||
private readonly JT809MainSessionManager jT809SessionManager; | |||
private readonly JT809AtomicCounterService jT809AtomicCounterService; | |||
private readonly ILogger<JT809TcpServerHandler> logger; | |||
public JT809TcpServerHandler( | |||
ILoggerFactory loggerFactory, | |||
JT809MainMsgIdHandlerBase handler, | |||
JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty, | |||
JT809MainSessionManager jT809SessionManager | |||
) | |||
{ | |||
this.handler = handler; | |||
this.jT809SessionManager = jT809SessionManager; | |||
this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ServerMain.ToString()); ; | |||
logger = loggerFactory.CreateLogger<JT809TcpServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) | |||
{ | |||
try | |||
{ | |||
JT809Package jT809Package = JT809Serializer.Deserialize(msg); | |||
jT809AtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
jT809SessionManager.TryAdd(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID); | |||
Func<JT809Request, JT809Response> handlerFunc; | |||
if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc)) | |||
{ | |||
JT809Response jT808Response = handlerFunc(new JT809Request(jT809Package, msg)); | |||
if (jT808Response != null) | |||
{ | |||
var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize); | |||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData)); | |||
} | |||
} | |||
} | |||
catch (JT809Exception ex) | |||
{ | |||
jT809AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT809AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -1,11 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT809.DotNetty.Core\JT809.DotNetty.Core.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -1,31 +0,0 @@ | |||
using JT809.DotNetty.Core; | |||
using JT809.DotNetty.Core.Codecs; | |||
using JT809.DotNetty.Core.Handlers; | |||
using JT809.DotNetty.Core.Services; | |||
using JT809.DotNetty.Tcp.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using Microsoft.Extensions.Hosting; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Reflection; | |||
using System.Runtime.CompilerServices; | |||
[assembly: InternalsVisibleTo("JT809.DotNetty.Tcp.Test")] | |||
namespace JT809.DotNetty.Tcp | |||
{ | |||
public static class JT809TcpDotnettyExtensions | |||
{ | |||
public static IServiceCollection AddJT809TcpHost(this IServiceCollection serviceDescriptors) | |||
{ | |||
serviceDescriptors.TryAddSingleton<JT809MainSessionManager>(); | |||
serviceDescriptors.TryAddSingleton<JT809MainMsgIdHandlerBase, JT809MsgIdDefaultTcpHandler>(); | |||
serviceDescriptors.TryAddScoped<JT809TcpConnectionHandler>(); | |||
serviceDescriptors.TryAddScoped<JT809Decoder>(); | |||
serviceDescriptors.TryAddScoped<JT809TcpServerHandler>(); | |||
serviceDescriptors.AddHostedService<JT809TcpServerHost>(); | |||
return serviceDescriptors; | |||
} | |||
} | |||
} |
@@ -1,95 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT809.DotNetty.Core.Configurations; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using JT809.Protocol; | |||
using JT809.DotNetty.Core.Codecs; | |||
using JT809.DotNetty.Tcp.Handlers; | |||
namespace JT809.DotNetty.Tcp | |||
{ | |||
/// <summary> | |||
/// JT809 Tcp网关服务 | |||
/// </summary> | |||
internal class JT809TcpServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
private readonly JT809Configuration configuration; | |||
private readonly ILogger<JT809TcpServerHost> logger; | |||
private DispatcherEventLoopGroup bossGroup; | |||
private WorkerEventLoopGroup workerGroup; | |||
private IChannel bootstrapChannel; | |||
private IByteBufferAllocator serverBufferAllocator; | |||
public JT809TcpServerHost( | |||
IServiceProvider provider, | |||
ILoggerFactory loggerFactory, | |||
IOptions<JT809Configuration> jT809ConfigurationAccessor) | |||
{ | |||
serviceProvider = provider; | |||
configuration = jT809ConfigurationAccessor.Value; | |||
logger=loggerFactory.CreateLogger<JT809TcpServerHost>(); | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
bossGroup = new DispatcherEventLoopGroup(); | |||
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); | |||
serverBufferAllocator = new PooledByteBufferAllocator(); | |||
ServerBootstrap bootstrap = new ServerBootstrap(); | |||
bootstrap.Group(bossGroup, workerGroup); | |||
bootstrap.Channel<TcpServerChannel>(); | |||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||
{ | |||
bootstrap | |||
.Option(ChannelOption.SoReuseport, true) | |||
.ChildOption(ChannelOption.SoReuseaddr, true); | |||
} | |||
bootstrap | |||
.Option(ChannelOption.SoBacklog, configuration.SoBacklog) | |||
.ChildOption(ChannelOption.Allocator, serverBufferAllocator) | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
using (var scope = serviceProvider.CreateScope()) | |||
{ | |||
channel.Pipeline.AddLast("jt809SystemIdleState", new IdleStateHandler( | |||
configuration.ReaderIdleTimeSeconds, | |||
configuration.WriterIdleTimeSeconds, | |||
configuration.AllIdleTimeSeconds)); | |||
channel.Pipeline.AddLast("jt809TcpConnection", scope.ServiceProvider.GetRequiredService<JT809TcpConnectionHandler>()); | |||
channel.Pipeline.AddLast("jt809TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }), | |||
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG }))); | |||
channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService<JT809Decoder>()); | |||
channel.Pipeline.AddLast("jt809TcpService", scope.ServiceProvider.GetRequiredService<JT809TcpServerHandler>()); | |||
} | |||
})); | |||
logger.LogInformation($"JT809 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); | |||
return bootstrap.BindAsync(configuration.TcpPort) | |||
.ContinueWith(i => bootstrapChannel = i.Result); | |||
} | |||
public async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
await bootstrapChannel.CloseAsync(); | |||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
} | |||
} | |||
} |
@@ -1,39 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
namespace JT809Netty.Core | |||
{ | |||
/// <summary> | |||
/// | |||
/// ref:Grpc.Core.Internal | |||
/// </summary> | |||
public class AtomicCounter | |||
{ | |||
long counter = 0; | |||
public AtomicCounter(long initialCount = 0) | |||
{ | |||
this.counter = initialCount; | |||
} | |||
public long Increment() | |||
{ | |||
return Interlocked.Increment(ref counter); | |||
} | |||
public long Decrement() | |||
{ | |||
return Interlocked.Decrement(ref counter); | |||
} | |||
public long Count | |||
{ | |||
get | |||
{ | |||
return Interlocked.Read(ref counter); | |||
} | |||
} | |||
} | |||
} |
@@ -1,14 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT809Netty.Core.Configs | |||
{ | |||
public class JT809NettyOptions | |||
{ | |||
public string Host { get; set; } | |||
public int Port { get; set; } | |||
public List<string> IpWhiteList { get; set; } = new List<string>(); | |||
public bool IpWhiteListDisabled { get; set; } | |||
} | |||
} |
@@ -1,65 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Transport.Channels; | |||
using JT809.Protocol; | |||
using JT809.Protocol.JT809Exceptions; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
namespace JT809Netty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// JT809解码 | |||
/// </summary> | |||
public class JT809DecodeHandler : ByteToMessageDecoder | |||
{ | |||
private readonly ILogger<JT809DecodeHandler> logger; | |||
public JT809DecodeHandler(ILoggerFactory loggerFactory) | |||
{ | |||
logger = loggerFactory.CreateLogger<JT809DecodeHandler>(); | |||
} | |||
private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter(); | |||
private static readonly AtomicCounter MsgFailCounter = new AtomicCounter(); | |||
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) | |||
{ | |||
string msg = string.Empty; | |||
byte[] buffer = null; | |||
try | |||
{ | |||
buffer = new byte[input.Capacity + 2]; | |||
input.ReadBytes(buffer,1, input.Capacity); | |||
buffer[0] = JT809Package.BEGINFLAG; | |||
buffer[input.Capacity + 1] = JT809Package.ENDFLAG; | |||
output.Add(JT809Serializer.Deserialize(buffer)); | |||
MsgSuccessCounter.Increment(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
msg = ByteBufferUtil.HexDump(buffer); | |||
logger.LogDebug("accept package <<<" + msg); | |||
logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString()); | |||
} | |||
} | |||
catch (JT809Exception ex) | |||
{ | |||
MsgFailCounter.Increment(); | |||
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); | |||
logger.LogError(ex, $"{ex.ErrorCode.ToString()}accept msg<<<{msg}"); | |||
return; | |||
} | |||
catch (Exception ex) | |||
{ | |||
MsgFailCounter.Increment(); | |||
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + msg); | |||
return; | |||
} | |||
} | |||
} | |||
} |
@@ -1,94 +0,0 @@ | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Channels; | |||
using JT809Netty.Core.Configs; | |||
using JT809.Protocol.JT809Extensions; | |||
using JT809.Protocol.JT809Enums; | |||
using JT809.Protocol.JT809MessageBody; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System.Threading.Tasks; | |||
using JT809.Protocol; | |||
using DotNetty.Buffers; | |||
using JT809Netty.Core.ServiceHandlers; | |||
using System.Threading; | |||
using System; | |||
namespace JT809Netty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 下级平台主链路 | |||
/// </summary> | |||
public class JT809DownMasterLinkConnectionHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT809DownMasterLinkConnectionHandler> logger; | |||
private readonly IOptionsMonitor<JT809NettyOptions> optionsMonitor; | |||
private readonly JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler; | |||
public JT809DownMasterLinkConnectionHandler( | |||
JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler, | |||
IOptionsMonitor<JT809NettyOptions> optionsMonitor, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT809DownMasterLinkBusinessTypeHandler = jT809DownMasterLinkBusinessTypeHandler; | |||
this.optionsMonitor = optionsMonitor; | |||
logger = loggerFactory.CreateLogger<JT809DownMasterLinkConnectionHandler>(); | |||
} | |||
public override void ChannelActive(IChannelHandlerContext context) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug(">>>Activate the channel."); | |||
base.ChannelActive(context); | |||
} | |||
/// <summary> | |||
/// 主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelInactive(IChannelHandlerContext context) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug(">>>The client disconnects from the server."); | |||
base.ChannelInactive(context); | |||
} | |||
/// <summary> | |||
/// 服务器主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <returns></returns> | |||
public override Task CloseAsync(IChannelHandlerContext context) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug("<<<The server disconnects from the client."); | |||
return base.CloseAsync(context); | |||
} | |||
/// <summary> | |||
/// 主链路超时策略 | |||
/// 下级平台登录成功后,在与上级平台之间如果有应用业务数据包往来的情况下,不需要发送主链路保持数据包; | |||
/// 否则,下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <param name="evt"></param> | |||
public override void UserEventTriggered(IChannelHandlerContext context, object evt) | |||
{ | |||
IdleStateEvent idleStateEvent = evt as IdleStateEvent; | |||
if (idleStateEvent != null) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
switch (idleStateEvent.State) | |||
{ | |||
case IdleState.WriterIdle: | |||
//发送心跳保持 | |||
logger.LogInformation($"{idleStateEvent.State.ToString()} heartbeat>>>{channelId}"); | |||
jT809DownMasterLinkBusinessTypeHandler.Msg0x1005(context); | |||
break; | |||
} | |||
} | |||
base.UserEventTriggered(context, evt); | |||
} | |||
} | |||
} |
@@ -1,61 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Transport.Channels; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Text; | |||
using DotNetty.Common.Utilities; | |||
using Microsoft.Extensions.Logging; | |||
using Newtonsoft.Json; | |||
using System.Threading; | |||
using JT809Netty.Core.ServiceHandlers; | |||
using JT809.Protocol; | |||
using JT809.Protocol.JT809Exceptions; | |||
namespace JT809Netty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 下级平台主链路 | |||
/// </summary> | |||
public class JT809DownMasterLinkServiceHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT809DownMasterLinkServiceHandler> logger; | |||
private readonly JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler; | |||
public JT809DownMasterLinkServiceHandler( | |||
JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT809DownMasterLinkBusinessTypeHandler = jT809DownMasterLinkBusinessTypeHandler; | |||
logger = loggerFactory.CreateLogger<JT809DownMasterLinkServiceHandler>(); | |||
} | |||
public override void ChannelRead(IChannelHandlerContext context, object message) | |||
{ | |||
var jT809Package = (JT809Package)message; | |||
string receive = string.Empty; | |||
try | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug(JsonConvert.SerializeObject(jT809Package)); | |||
if (jT809DownMasterLinkBusinessTypeHandler.ResponseHandlerDict.TryGetValue(jT809Package.Header.BusinessType,out var action)) | |||
{ | |||
action(jT809Package, context); | |||
} | |||
} | |||
catch (JT809Exception ex) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
logger.LogError(ex, "JT809Exception receive<<<" + receive); | |||
} | |||
catch (Exception ex) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
logger.LogError(ex, "Exception receive<<<" + receive); | |||
} | |||
} | |||
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); | |||
} | |||
} |
@@ -1,82 +0,0 @@ | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Channels; | |||
using JT809Netty.Core.Configs; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System.Threading.Tasks; | |||
namespace JT809Netty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 下级平台从链路 | |||
/// </summary> | |||
public class JT809DownSlaveLinkConnectionHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT809DownSlaveLinkConnectionHandler> logger; | |||
private readonly SessionManager sessionManager; | |||
private IOptionsMonitor<JT809NettyOptions> optionsMonitor; | |||
public JT809DownSlaveLinkConnectionHandler( | |||
IOptionsMonitor<JT809NettyOptions> optionsMonitor, | |||
SessionManager sessionManager, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.optionsMonitor = optionsMonitor; | |||
this.sessionManager = sessionManager; | |||
logger = loggerFactory.CreateLogger<JT809DownSlaveLinkConnectionHandler>(); | |||
} | |||
public override void ChannelActive(IChannelHandlerContext context) | |||
{ | |||
base.ChannelActive(context); | |||
} | |||
/// <summary> | |||
/// 主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelInactive(IChannelHandlerContext context) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug(">>>The client disconnects from the server."); | |||
sessionManager.RemoveSessionByID(context.Channel.Id.AsShortText()); | |||
base.ChannelInactive(context); | |||
} | |||
/// <summary> | |||
/// 服务器主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <returns></returns> | |||
public override Task CloseAsync(IChannelHandlerContext context) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug("<<<The server disconnects from the client."); | |||
return base.CloseAsync(context); | |||
} | |||
/// <summary> | |||
/// 从链路超时策略 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <param name="evt"></param> | |||
public override void UserEventTriggered(IChannelHandlerContext context, object evt) | |||
{ | |||
IdleStateEvent idleStateEvent = evt as IdleStateEvent; | |||
if (idleStateEvent != null) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); | |||
switch (idleStateEvent.State) | |||
{ | |||
case IdleState.ReaderIdle: | |||
//下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。 | |||
context.CloseAsync(); | |||
break; | |||
} | |||
} | |||
base.UserEventTriggered(context, evt); | |||
} | |||
} | |||
} |
@@ -1,19 +0,0 @@ | |||
using DotNetty.Transport.Channels; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT809Netty.Core | |||
{ | |||
public interface IAppSession | |||
{ | |||
string SessionID { get; } | |||
IChannel Channel { get; } | |||
DateTime LastActiveTime { get; set; } | |||
DateTime StartTime { get; } | |||
} | |||
} |
@@ -1,121 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using DotNetty.Transport.Libuv; | |||
using JT809Netty.Core.Configs; | |||
using JT809Netty.Core.Handlers; | |||
using JT809Netty.Core.ServiceHandlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT809Netty.Core | |||
{ | |||
/// <summary> | |||
/// 下级平台主链路 | |||
/// </summary | |||
public class JT809DownMasterLinkNettyService : IHostedService | |||
{ | |||
IEventLoopGroup workerGroup; | |||
Bootstrap bootstrap; | |||
readonly IServiceProvider serviceProvider; | |||
readonly IOptionsMonitor<JT809NettyOptions> nettyOptions; | |||
IChannel ClientChannel; | |||
private readonly JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler; | |||
private readonly ILogger<JT809DownMasterLinkNettyService> logger; | |||
public JT809DownMasterLinkNettyService( | |||
ILoggerFactory loggerFactory, | |||
JT809DownMasterLinkBusinessTypeHandler jT809DownMasterLinkBusinessTypeHandler, | |||
IOptionsMonitor<JT809NettyOptions> nettyOptionsAccessor, | |||
IServiceProvider serviceProvider) | |||
{ | |||
logger = loggerFactory.CreateLogger<JT809DownMasterLinkNettyService>(); | |||
this.jT809DownMasterLinkBusinessTypeHandler = jT809DownMasterLinkBusinessTypeHandler; | |||
nettyOptions = nettyOptionsAccessor; | |||
this.serviceProvider = serviceProvider; | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
Task.Run(async () => | |||
{ | |||
try | |||
{ | |||
workerGroup = new MultithreadEventLoopGroup(); | |||
bootstrap = new Bootstrap(); | |||
bootstrap.Group(workerGroup) | |||
.Channel<TcpSocketChannel>() | |||
.Handler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
InitChannel(channel); | |||
})) | |||
.Option(ChannelOption.SoBacklog, 1048576); | |||
ClientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(nettyOptions.CurrentValue.Host), nettyOptions.CurrentValue.Port)); | |||
jT809DownMasterLinkBusinessTypeHandler.Msg0x1001(ClientChannel); | |||
} | |||
catch (Exception ex) | |||
{ | |||
} | |||
}); | |||
return Task.CompletedTask; | |||
} | |||
public Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
try | |||
{ | |||
jT809DownMasterLinkBusinessTypeHandler.Msg0x1003(ClientChannel); | |||
// 已发送注销请求,等待30s,待服务器响应 | |||
int sleepTime = 50000; | |||
logger.LogInformation($">>>The logout request has been sent, waiting for {sleepTime/1000}s for the server to respond..."); | |||
Thread.Sleep(sleepTime); | |||
logger.LogInformation($"Check Status:<<<{jT809DownMasterLinkBusinessTypeHandler.Status.ToString()}"); | |||
ClientChannel.CloseAsync().ContinueWith((state) => { | |||
Task.WhenAll(workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1))); | |||
}); | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex,""); | |||
} | |||
return Task.CompletedTask; | |||
} | |||
private void InitChannel(IChannel channel) | |||
{ | |||
var scope = serviceProvider.CreateScope(); | |||
try | |||
{ | |||
//下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接 | |||
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler(0, 60, 0)); | |||
channel.Pipeline.AddLast("jt809DownMasterLinkConnection", scope.ServiceProvider.GetRequiredService<JT809DownMasterLinkConnectionHandler>()); | |||
channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG }))); | |||
channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService<JT809DecodeHandler>()); | |||
channel.Pipeline.AddLast("jT809DownMasterLinkServiceHandler", scope.ServiceProvider.GetRequiredService<JT809DownMasterLinkServiceHandler>()); | |||
} | |||
finally | |||
{ | |||
scope.Dispose(); | |||
} | |||
} | |||
} | |||
} |
@@ -1,104 +0,0 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT809Netty.Core.Configs; | |||
using JT809Netty.Core.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT809Netty.Core | |||
{ | |||
/// <summary> | |||
/// 下级平台从链路 | |||
/// </summary> | |||
public class JT809DownSlaveLinkNettyService : IHostedService | |||
{ | |||
IEventLoopGroup bossGroup; | |||
IEventLoopGroup workerGroup; | |||
IChannel boundChannel; | |||
readonly IServiceProvider serviceProvider; | |||
readonly JT809NettyOptions nettyOptions; | |||
public JT809DownSlaveLinkNettyService( | |||
IOptions<JT809NettyOptions> nettyOptionsAccessor, | |||
IServiceProvider serviceProvider) | |||
{ | |||
nettyOptions = nettyOptionsAccessor.Value; | |||
this.serviceProvider = serviceProvider; | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
try | |||
{ | |||
var dispatcher = new DispatcherEventLoopGroup(); | |||
bossGroup = dispatcher; | |||
workerGroup = new WorkerEventLoopGroup(dispatcher); | |||
var bootstrap = new ServerBootstrap(); | |||
bootstrap.Group(bossGroup, workerGroup); | |||
bootstrap.Channel<TcpServerChannel>(); | |||
bootstrap | |||
//.Handler(new LoggingHandler("SRV-LSTN")) | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
InitChannel(channel); | |||
})) | |||
.Option(ChannelOption.SoBacklog, 1048576); | |||
if (nettyOptions.Host == "") | |||
{ | |||
boundChannel = bootstrap.BindAsync(nettyOptions.Port).Result; | |||
} | |||
else | |||
{ | |||
boundChannel = bootstrap.BindAsync(nettyOptions.Host, nettyOptions.Port).Result; | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
} | |||
return Task.CompletedTask; | |||
} | |||
public Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
try | |||
{ | |||
Task.WhenAll( | |||
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), | |||
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)), | |||
boundChannel.CloseAsync()); | |||
} | |||
catch (Exception ex) | |||
{ | |||
} | |||
return Task.CompletedTask; | |||
} | |||
private void InitChannel(IChannel channel) | |||
{ | |||
var scope = serviceProvider.CreateScope(); | |||
//下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。 | |||
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler(180, 0, 0)); | |||
channel.Pipeline.AddLast("jt809DownSlaveLinkConnection", scope.ServiceProvider.GetRequiredService<JT809DownSlaveLinkConnectionHandler>()); | |||
channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG }))); | |||
channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService<JT809DecodeHandler>()); | |||
//channel.Pipeline.AddLast("jt809Service", scope.ServiceProvider.GetRequiredService<JT808ServiceHandler>()); | |||
scope.Dispose(); | |||
} | |||
} | |||
} |
@@ -1,28 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<LangVersion>latest</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<Compile Remove="Handlers\JT808ServiceHandler.cs" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="JT809" Version="1.0.3" /> | |||
<PackageReference Include="Microsoft.Extensions.Configuration.EnvironmentVariables" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.1" /> | |||
<PackageReference Include="Newtonsoft.Json" Version="11.0.2" /> | |||
</ItemGroup> | |||
</Project> |
@@ -1,39 +0,0 @@ | |||
using DotNetty.Transport.Channels; | |||
using System; | |||
using JT809.Protocol.JT809Enums; | |||
namespace JT809Netty.Core | |||
{ | |||
public class JT809Session: IAppSession | |||
{ | |||
public JT809Session(IChannel channel, string vehicleNo,JT809VehicleColorType vehicleColor) | |||
{ | |||
Channel = channel; | |||
VehicleNo = vehicleNo; | |||
VehicleColor = vehicleColor; | |||
StartTime = DateTime.Now; | |||
LastActiveTime = DateTime.Now; | |||
SessionID = Channel.Id.AsShortText(); | |||
Key = $"{VehicleNo}_{VehicleColor.ToString()}"; | |||
} | |||
/// <summary> | |||
/// 车牌号 | |||
/// </summary> | |||
public string VehicleNo { get; set; } | |||
/// <summary> | |||
/// 车牌颜色 | |||
/// </summary> | |||
public JT809VehicleColorType VehicleColor { get; set; } | |||
public string Key { get; set; } | |||
public string SessionID { get; } | |||
public IChannel Channel { get;} | |||
public DateTime LastActiveTime { get; set; } | |||
public DateTime StartTime { get; } | |||
} | |||
} |
@@ -1,139 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using DotNetty.Transport.Channels; | |||
using JT809.Protocol; | |||
using JT809.Protocol.JT809Enums; | |||
using JT809.Protocol.JT809MessageBody; | |||
using JT809.Protocol.JT809Extensions; | |||
using DotNetty.Buffers; | |||
using Microsoft.Extensions.Logging; | |||
namespace JT809Netty.Core.ServiceHandlers | |||
{ | |||
public class JT809DownMasterLinkBusinessTypeHandler | |||
{ | |||
public ConnectionStatus Status { get; private set; } = ConnectionStatus.蓄势待发; | |||
public Dictionary<JT809BusinessType, Func<IChannelHandlerContext, JT809Package>> RequestHandlerDict { get; } | |||
public Dictionary<JT809BusinessType, Action<JT809Package, IChannelHandlerContext>> ResponseHandlerDict { get; } | |||
private readonly ILogger<JT809DownMasterLinkBusinessTypeHandler> logger; | |||
public JT809DownMasterLinkBusinessTypeHandler( | |||
ILoggerFactory loggerFactory) | |||
{ | |||
logger = loggerFactory.CreateLogger<JT809DownMasterLinkBusinessTypeHandler>(); | |||
RequestHandlerDict = new Dictionary<JT809BusinessType, Func<IChannelHandlerContext, JT809Package>> | |||
{ | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路登录请求消息, Msg0x1001}, | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路注销请求消息, Msg0x1003}, | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路连接保持请求消息, Msg0x1005}, | |||
//{JT809.Protocol.JT809Enums.JT809BusinessType.UP_DISCONNECT_INFORM, Msg0x1007}, | |||
//{JT809.Protocol.JT809Enums.JT809BusinessType.UP_CLOSELINK_INFORM, Msg0x1008}, | |||
}; | |||
ResponseHandlerDict = new Dictionary<JT809BusinessType, Action<JT809Package, IChannelHandlerContext>> | |||
{ | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路登录应答消息, Msg0x1002}, | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路注销应答消息, Msg0x1004}, | |||
{JT809.Protocol.JT809Enums.JT809BusinessType.主链路连接保持应答消息, Msg0x1006}, | |||
}; | |||
} | |||
public enum ConnectionStatus | |||
{ | |||
蓄势待发=0, | |||
下级平台主链路已发送注销请求=1, | |||
上级级平台主链路已发送注销应答 = 2, | |||
} | |||
public JT809Package Msg0x1001(IChannelHandlerContext channelHandlerContext) | |||
{ | |||
JT809Package loginPackage = JT809BusinessType.主链路登录请求消息.Create(new JT809_0x1001 | |||
{ | |||
UserId = 1234, | |||
Password = "20181009", | |||
DownLinkIP = "127.0.0.1", | |||
DownLinkPort = 8091 | |||
}); | |||
byte[] sendLoginData = JT809Serializer.Serialize(loginPackage, 256); | |||
channelHandlerContext.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendLoginData)); | |||
return loginPackage; | |||
} | |||
public JT809Package Msg0x1001(IChannel channel) | |||
{ | |||
JT809Package loginPackage = JT809BusinessType.主链路登录请求消息.Create(new JT809_0x1001 | |||
{ | |||
UserId = 1234, | |||
Password = "20181009", | |||
DownLinkIP = "127.0.0.1", | |||
DownLinkPort = 8091 | |||
}); | |||
byte[] sendLoginData = JT809Serializer.Serialize(loginPackage, 256); | |||
channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendLoginData)); | |||
return loginPackage; | |||
} | |||
public JT809Package Msg0x1003(IChannelHandlerContext channelHandlerContext) | |||
{ | |||
JT809Package loginPackage = JT809BusinessType.主链路登录请求消息.Create(new JT809_0x1001 | |||
{ | |||
UserId = 1234, | |||
Password = "20181009", | |||
DownLinkIP = "127.0.0.1", | |||
DownLinkPort = 8091 | |||
}); | |||
byte[] sendLoginData = JT809Serializer.Serialize(loginPackage, 256); | |||
channelHandlerContext.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendLoginData)); | |||
return loginPackage; | |||
} | |||
public JT809Package Msg0x1003(IChannel channel) | |||
{ | |||
JT809Package logoutPackage = JT809BusinessType.主链路注销请求消息.Create(new JT809_0x1003 | |||
{ | |||
UserId = 1234, | |||
Password = "20181009", | |||
}); | |||
byte[] sendLoginData = JT809Serializer.Serialize(logoutPackage, 128); | |||
channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendLoginData)); | |||
Status = ConnectionStatus.下级平台主链路已发送注销请求; | |||
return logoutPackage; | |||
} | |||
public JT809Package Msg0x1005(IChannelHandlerContext channelHandlerContext) | |||
{ | |||
JT809Package heartbeatPackage = JT809BusinessType.主链路连接保持请求消息.Create(); | |||
byte[] sendHeartbeatData = JT809Serializer.Serialize(heartbeatPackage, 100); | |||
channelHandlerContext.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendHeartbeatData)); | |||
return heartbeatPackage; | |||
} | |||
public JT809Package Msg0x1007(IChannelHandlerContext channelHandlerContext) | |||
{ | |||
return null; | |||
} | |||
public void Msg0x1002(JT809Package jT809Package, IChannelHandlerContext channelHandlerContext) | |||
{ | |||
} | |||
public void Msg0x1004(JT809Package jT809Package, IChannelHandlerContext channelHandlerContext) | |||
{ | |||
Status = ConnectionStatus.上级级平台主链路已发送注销应答; | |||
} | |||
public void Msg0x1006(JT809Package jT809Package, IChannelHandlerContext channelHandlerContext) | |||
{ | |||
} | |||
public void Msg0x1008(JT809Package jT809Package, IChannelHandlerContext channelHandlerContext) | |||
{ | |||
} | |||
} | |||
} |
@@ -1,220 +0,0 @@ | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Logging.Abstractions; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Linq; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT809Netty.Core | |||
{ | |||
public class SessionManager:IDisposable | |||
{ | |||
private readonly ILogger<SessionManager> logger; | |||
private readonly CancellationTokenSource cancellationTokenSource; | |||
#if DEBUG | |||
private const int timeout = 1 * 1000 * 60; | |||
#else | |||
private const int timeout = 5 * 1000 * 60; | |||
#endif | |||
public SessionManager(ILoggerFactory loggerFactory) | |||
{ | |||
logger = loggerFactory.CreateLogger<SessionManager>(); | |||
cancellationTokenSource = new CancellationTokenSource(); | |||
Task.Run(() => | |||
{ | |||
while (!cancellationTokenSource.IsCancellationRequested) | |||
{ | |||
logger.LogInformation($"Online Count>>>{SessionCount}"); | |||
if (SessionCount > 0) | |||
{ | |||
logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}"); | |||
logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", CustomKey_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}"); | |||
} | |||
Thread.Sleep(timeout); | |||
} | |||
}, cancellationTokenSource.Token); | |||
} | |||
/// <summary> | |||
/// Netty生成的sessionID和Session的对应关系 | |||
/// key = seession id | |||
/// value = Session | |||
/// </summary> | |||
private ConcurrentDictionary<string, JT809Session> SessionIdDict = new ConcurrentDictionary<string, JT809Session>(StringComparer.OrdinalIgnoreCase); | |||
/// <summary> | |||
/// 自定义Key和netty生成的sessionID的对应关系 | |||
/// key = 终端手机号 | |||
/// value = seession id | |||
/// </summary> | |||
private ConcurrentDictionary<string, string> CustomKey_SessionId_Dict = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase); | |||
public int SessionCount | |||
{ | |||
get | |||
{ | |||
return SessionIdDict.Count; | |||
} | |||
} | |||
public void RegisterSession(JT809Session appSession) | |||
{ | |||
if (CustomKey_SessionId_Dict.ContainsKey(appSession.Key)) | |||
{ | |||
return; | |||
} | |||
if (SessionIdDict.TryAdd(appSession.SessionID, appSession) && | |||
CustomKey_SessionId_Dict.TryAdd(appSession.Key, appSession.SessionID)) | |||
{ | |||
return; | |||
} | |||
} | |||
public JT809Session GetSessionByID(string sessionID) | |||
{ | |||
if (string.IsNullOrEmpty(sessionID)) | |||
return default; | |||
JT809Session targetSession; | |||
SessionIdDict.TryGetValue(sessionID, out targetSession); | |||
return targetSession; | |||
} | |||
public JT809Session GetSessionByTerminalPhoneNo(string key) | |||
{ | |||
try | |||
{ | |||
if (string.IsNullOrEmpty(key)) | |||
return default; | |||
if (CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId)) | |||
{ | |||
if (SessionIdDict.TryGetValue(sessionId, out JT809Session targetSession)) | |||
{ | |||
return targetSession; | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, key); | |||
return default; | |||
} | |||
} | |||
public void Heartbeat(string key) | |||
{ | |||
try | |||
{ | |||
if(CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId)) | |||
{ | |||
if (SessionIdDict.TryGetValue(sessionId, out JT809Session oldjT808Session)) | |||
{ | |||
if (oldjT808Session.Channel.Active) | |||
{ | |||
oldjT808Session.LastActiveTime = DateTime.Now; | |||
if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session)) | |||
{ | |||
} | |||
} | |||
} | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, key); | |||
} | |||
} | |||
/// <summary> | |||
/// 通过通道Id和自定义key进行关联 | |||
/// </summary> | |||
/// <param name="sessionID"></param> | |||
/// <param name="key"></param> | |||
public void UpdateSessionByID(string sessionID, string key) | |||
{ | |||
try | |||
{ | |||
if (SessionIdDict.TryGetValue(sessionID, out JT809Session oldjT808Session)) | |||
{ | |||
oldjT808Session.Key = key; | |||
if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session)) | |||
{ | |||
CustomKey_SessionId_Dict.AddOrUpdate(key, sessionID, (tpn, sid) => | |||
{ | |||
return sessionID; | |||
}); | |||
} | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, $"{sessionID},{key}"); | |||
} | |||
} | |||
public void RemoveSessionByID(string sessionID) | |||
{ | |||
if (sessionID == null) return; | |||
try | |||
{ | |||
if (SessionIdDict.TryRemove(sessionID, out JT809Session session)) | |||
{ | |||
if (session.Key != null) | |||
{ | |||
if(CustomKey_SessionId_Dict.TryRemove(session.Key, out string sessionid)) | |||
{ | |||
logger.LogInformation($">>>{sessionID}-{session.Key} Session Remove."); | |||
} | |||
} | |||
else | |||
{ | |||
logger.LogInformation($">>>{sessionID} Session Remove."); | |||
} | |||
session.Channel.CloseAsync(); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, $">>>{sessionID} Session Remove Exception"); | |||
} | |||
} | |||
public void RemoveSessionByKey(string key) | |||
{ | |||
if (key == null) return; | |||
try | |||
{ | |||
if (CustomKey_SessionId_Dict.TryRemove(key, out string sessionid)) | |||
{ | |||
if (SessionIdDict.TryRemove(sessionid, out JT809Session session)) | |||
{ | |||
logger.LogInformation($">>>{key}-{sessionid} Key Remove."); | |||
} | |||
else | |||
{ | |||
logger.LogInformation($">>>{key} Key Remove."); | |||
} | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, $">>>{key} Key Remove Exception."); | |||
} | |||
} | |||
public void Dispose() | |||
{ | |||
cancellationTokenSource.Cancel(); | |||
} | |||
} | |||
} |
@@ -1,26 +0,0 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<OutputType>Exe</OutputType> | |||
<TargetFramework>netcoreapp2.1</TargetFramework> | |||
<LangVersion>latest</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.1.1" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT809Netty.Core\JT809Netty.Core.csproj" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<None Update="appsettings.Development.json"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
<None Update="appsettings.json"> | |||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||
</None> | |||
</ItemGroup> | |||
</Project> |
@@ -1,57 +0,0 @@ | |||
using DotNetty.Handlers.Logging; | |||
using JT809Netty.Core; | |||
using JT809Netty.Core.Configs; | |||
using JT809Netty.Core.Handlers; | |||
using JT809Netty.Core.ServiceHandlers; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Threading.Tasks; | |||
namespace JT809Netty.DownMasterLink | |||
{ | |||
class Program | |||
{ | |||
static async Task Main(string[] args) | |||
{ | |||
JT809.Protocol.JT809GlobalConfig.Instance | |||
.SetHeaderOptions(new JT809.Protocol.JT809Configs.JT809HeaderOptions | |||
{ | |||
MsgGNSSCENTERID= 20141013, | |||
Version=new JT809.Protocol.JT809Header_Version (2,0,0), | |||
EncryptKey=9595 | |||
}); | |||
var serverHostBuilder = new HostBuilder() | |||
.UseEnvironment(args[0].Split('=')[1]) | |||
.ConfigureAppConfiguration((hostingContext, config) => | |||
{ | |||
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) | |||
.AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true); | |||
}) | |||
.ConfigureLogging((context, logging) => | |||
{ | |||
logging.AddConsole(); | |||
//NLog.LogManager.LoadConfiguration("Configs/nlog.config"); | |||
//logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true }); | |||
logging.SetMinimumLevel(Microsoft.Extensions.Logging.LogLevel.Trace); | |||
}) | |||
.ConfigureServices((hostContext, services) => | |||
{ | |||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
services.Configure<JT809NettyOptions>(hostContext.Configuration.GetSection("JT809NettyOptions")); | |||
services.AddSingleton<JT809DownMasterLinkBusinessTypeHandler, JT809DownMasterLinkBusinessTypeHandler>(); | |||
services.AddScoped<JT809DownMasterLinkConnectionHandler, JT809DownMasterLinkConnectionHandler>(); | |||
services.AddScoped<JT809DownMasterLinkServiceHandler, JT809DownMasterLinkServiceHandler>(); | |||
services.AddScoped<JT809DecodeHandler, JT809DecodeHandler>(); | |||
services.AddSingleton<IHostedService, JT809DownMasterLinkNettyService>(); | |||
}); | |||
await serverHostBuilder.RunConsoleAsync(); | |||
} | |||
} | |||
} |
@@ -1,25 +0,0 @@ | |||
{ | |||
"Logging": { | |||
"IncludeScopes": false, | |||
"Debug": { | |||
"LogLevel": { | |||
"Default": "Trace" | |||
} | |||
}, | |||
"Console": { | |||
"LogLevel": { | |||
"Default": "Trace" | |||
} | |||
} | |||
}, | |||
"ConnectionStrings": { | |||
"TestDbContext": "", | |||
"RedisHost": "" | |||
}, | |||
"JT809NettyOptions": { | |||
"Host": "127.0.0.1", | |||
"Port": 16565, | |||
"IpWhiteList": [], | |||
"IpWhiteListDisabled": true | |||
} | |||
} |
@@ -1,25 +0,0 @@ | |||
{ | |||
"Logging": { | |||
"IncludeScopes": false, | |||
"Debug": { | |||
"LogLevel": { | |||
"Default": "Trace" | |||
} | |||
}, | |||
"Console": { | |||
"LogLevel": { | |||
"Default": "Trace" | |||
} | |||
} | |||
}, | |||
"ConnectionStrings": { | |||
"TestDbContext": "", | |||
"RedisHost": "" | |||
}, | |||
"JT809NettyOptions": { | |||
"Host": "", | |||
"Port": 6566, | |||
"IpWhiteList": [], | |||
"IpWhiteListDisabled": false | |||
} | |||
} |