diff --git a/src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs b/src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs index a6cacd9..ca3ebf1 100644 --- a/src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs +++ b/src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs @@ -30,7 +30,7 @@ namespace JT809.DotNetty.Core.Clients private MultithreadEventLoopGroup group; - public IChannel Channel { get; private set; } + private IChannel channel; private readonly ILogger logger; @@ -40,17 +40,21 @@ namespace JT809.DotNetty.Core.Clients private readonly IJT809VerifyCodeGenerator verifyCodeGenerator; + private readonly IJT809SubordinateLinkNotifyService subordinateLinkNotifyService; + private bool disposed = false; public JT809SubordinateClient( IServiceProvider provider, ILoggerFactory loggerFactory, + IJT809SubordinateLinkNotifyService subordinateLinkNotifyService, IJT809VerifyCodeGenerator verifyCodeGenerator) { this.serviceProvider = provider; this.loggerFactory = loggerFactory; this.verifyCodeGenerator = verifyCodeGenerator; this.logger = loggerFactory.CreateLogger(); + this.subordinateLinkNotifyService = subordinateLinkNotifyService; group = new MultithreadEventLoopGroup(); bootstrap = new Bootstrap(); bootstrap.Group(group) @@ -75,39 +79,87 @@ namespace JT809.DotNetty.Core.Clients })); } - public async void ConnectAsync(string ip,int port,uint verifyCode,int delay=3000) + public async void ConnectAsync(string ip,int port,int delay=3000) { + var verifyCode = verifyCodeGenerator.Get(); logger.LogInformation($"ip:{ip},port:{port},verifycode:{verifyCode}"); await Task.Delay(delay); try - { - if (Channel == null) + { + if (channel == null) { - Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + //从链路连接请求消息 + var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001() + { + VerifyCode = verifyCode + }); + logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + JT809Response jT809Response = new JT809Response(package, 100); + SendAsync(jT809Response); } else { - await Channel.CloseAsync(); - Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + await channel.CloseAsync(); + channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port)); + //从链路连接请求消息 + var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001() + { + VerifyCode = verifyCode + }); + logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + JT809Response jT809Response = new JT809Response(package, 100); + SendAsync(jT809Response); } } - catch (AggregateException ex) + catch (ConnectException ex) { + subordinateLinkNotifyService.Notify(JT809_0x9007_ReasonCode.无法连接下级平台指定的服务IP与端口); logger.LogError(ex.InnerException, $"ip:{ip},port:{port},verifycode:{verifyCode}"); } catch (Exception ex) { + subordinateLinkNotifyService.Notify(JT809_0x9007_ReasonCode.其他原因); logger.LogError(ex, $"ip:{ip},port:{port},verifycode:{verifyCode}"); } } + public async Task ReConnectAsync(EndPoint ipEndPoint) + { + var verifyCode = verifyCodeGenerator.Get(); + try + { + logger.LogInformation($"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}"); + channel = await bootstrap.ConnectAsync(ipEndPoint); + //从链路连接请求消息 + var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001() + { + VerifyCode = verifyCode + }); + logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + JT809Response jT809Response = new JT809Response(package, 100); + SendAsync(jT809Response); + return channel.Open; + } + catch (ConnectException ex) + { + logger.LogError(ex.InnerException, $"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}"); + return false; + } + catch (Exception ex) + { + logger.LogError(ex, $"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}"); + return false; + } + } + public async void SendAsync(JT809Response jT809Response) { - if (Channel == null) throw new NullReferenceException("Channel Not Open"); + if (channel == null) throw new NullReferenceException("Channel Not Open"); if (jT809Response == null) throw new ArgumentNullException("Data is null"); - if (Channel.Open && Channel.Active) + if (channel.Open && channel.Active) { - await Channel.WriteAndFlushAsync(jT809Response); + await channel.WriteAndFlushAsync(jT809Response); } } @@ -115,8 +167,8 @@ namespace JT809.DotNetty.Core.Clients { get { - if (Channel == null) return false; - return Channel.Open && Channel.Active; + if (channel == null) return false; + return channel.Open && channel.Active; } } @@ -136,7 +188,7 @@ namespace JT809.DotNetty.Core.Clients VerifyCode = verifyCodeGenerator.Get() }); JT809Response jT809Response = new JT809Response(package, 100); - Channel.WriteAndFlushAsync(jT809Response); + SendAsync(jT809Response); logger.LogInformation($"发送从链路注销请求>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); } catch (Exception ex) @@ -146,7 +198,7 @@ namespace JT809.DotNetty.Core.Clients finally { //清理托管资源 - Channel.CloseAsync(); + channel.CloseAsync(); group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3)); } } diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs index 903d09d..8b38550 100644 --- a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs +++ b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs @@ -1,9 +1,11 @@ using DotNetty.Handlers.Timeout; using DotNetty.Transport.Channels; +using JT809.DotNetty.Core.Clients; using JT809.DotNetty.Core.Metadata; using JT809.Protocol.Enums; using JT809.Protocol.Extensions; using Microsoft.Extensions.Logging; +using Polly; using System; using System.Threading.Tasks; @@ -16,11 +18,14 @@ namespace JT809.DotNetty.Core.Handlers { private readonly ILogger logger; + private readonly JT809SubordinateClient subordinateClient; public JT809SubordinateClientConnectionHandler( + JT809SubordinateClient jT809SubordinateClient, ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); + subordinateClient = jT809SubordinateClient; } /// @@ -41,6 +46,16 @@ namespace JT809.DotNetty.Core.Handlers /// public override void ChannelInactive(IChannelHandlerContext context) { + Policy.HandleResult(context.Channel.Open) + .WaitAndRetryForeverAsync(retryAttempt => + { + return retryAttempt > 3 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试3次,之后重试都是接近12个小时重试一次 + }, + (exception, timespan, ctx) => + { + logger.LogError($"服务端断开{context.Channel.RemoteAddress},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}"); + }) + .ExecuteAsync(async () => await subordinateClient.ReConnectAsync(context.Channel.RemoteAddress)); string channelId = context.Channel.Id.AsShortText(); if (logger.IsEnabled(LogLevel.Debug)) logger.LogDebug($">>>{ channelId } The client disconnects from the server."); diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs index 065d242..27a6237 100644 --- a/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs +++ b/src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs @@ -1,5 +1,7 @@ using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; using DotNetty.Transport.Channels; +using JT809.DotNetty.Core.Clients; using Microsoft.Extensions.Logging; using System; using System.Threading.Tasks; @@ -11,10 +13,10 @@ namespace JT809.DotNetty.Core.Handlers /// internal class JT809SubordinateServerConnectionHandler: ChannelHandlerAdapter { - private readonly ILogger logger; public JT809SubordinateServerConnectionHandler( + JT809SubordinateClient subordinateClient, ILoggerFactory loggerFactory) { logger = loggerFactory.CreateLogger(); diff --git a/src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs b/src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs index b07c561..9cd2c4c 100644 --- a/src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs +++ b/src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs @@ -20,21 +20,20 @@ namespace JT809.DotNetty.Core.Handlers public abstract class JT809SuperiorMsgIdReceiveHandlerBase { protected IJT809VerifyCodeGenerator VerifyCodeGenerator { get; } - protected JT809SubordinateClient SubordinateLinkClient { get; } - protected JT809Configuration Configuration { get; } protected ILogger Logger { get; } + protected IJT809SubordinateLoginService SubordinateLoginService { get; } + /// /// 初始化消息处理业务 /// protected JT809SuperiorMsgIdReceiveHandlerBase( ILoggerFactory loggerFactory, - IOptions jT809ConfigurationAccessor, - IJT809VerifyCodeGenerator verifyCodeGenerator, - JT809SubordinateClient subordinateLinkClient) + IJT809SubordinateLoginService jT809SubordinateLoginService, + IJT809VerifyCodeGenerator verifyCodeGenerator) { this.Logger = loggerFactory.CreateLogger(); this.VerifyCodeGenerator = verifyCodeGenerator; - this.SubordinateLinkClient = subordinateLinkClient; + this.SubordinateLoginService = jT809SubordinateLoginService; HandlerDict = new Dictionary> { {JT809BusinessType.主链路登录请求消息, Msg0x1001}, @@ -74,11 +73,8 @@ namespace JT809.DotNetty.Core.Handlers Result = JT809_0x1002_Result.成功, VerifyCode = verifyCode }); - if (Configuration.SubordinateClientEnable) - { - var jT809_0x1001 = request.Package.Bodies as JT809_0x1001; - SubordinateLinkClient.ConnectAsync(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort, verifyCode); - } + var jT809_0x1001 = request.Package.Bodies as JT809_0x1001; + SubordinateLoginService.Login(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort); return new JT809Response(package, 100); } @@ -165,7 +161,6 @@ namespace JT809.DotNetty.Core.Handlers } - /// /// 主链路动态信息交换消息 /// diff --git a/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLinkNotifyService.cs b/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLinkNotifyService.cs new file mode 100644 index 0000000..09afe28 --- /dev/null +++ b/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLinkNotifyService.cs @@ -0,0 +1,19 @@ +using JT809.Protocol.Enums; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Interfaces +{ + /// + /// 上级平台 + /// 从链路服务 + /// + public interface IJT809SubordinateLinkNotifyService + { + void Notify(JT809_0x9007_ReasonCode reasonCode); + + void Notify(JT809_0x9007_ReasonCode reasonCode,uint msgGNSSCENTERID); + } +} diff --git a/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLoginService.cs b/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLoginService.cs new file mode 100644 index 0000000..fee05a7 --- /dev/null +++ b/src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLoginService.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Interfaces +{ + /// + /// 上级平台 + /// 从链路登录服务 + /// + public interface IJT809SubordinateLoginService + { + void Login(string ip, int port); + } +} diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs new file mode 100644 index 0000000..734d1b9 --- /dev/null +++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs @@ -0,0 +1,58 @@ +using JT809.DotNetty.Core.Configurations; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Session; +using JT809.Protocol; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using JT809.Protocol.MessageBody; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.DotNetty.Core.Internal +{ + class JT809SubordinateLinkNotifyImplService: IJT809SubordinateLinkNotifyService + { + private readonly JT809Configuration configuration; + private readonly JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager; + private readonly ILogger logger; + + public JT809SubordinateLinkNotifyImplService( + ILoggerFactory loggerFactory, + IOptions jT809ConfigurationAccessor, + JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager + ) + { + this.logger = loggerFactory.CreateLogger(); + configuration = jT809ConfigurationAccessor.Value; + this.jT809SuperiorMainSessionManager = jT809SuperiorMainSessionManager; + } + + public void Notify(JT809_0x9007_ReasonCode reasonCode) + { + Notify(reasonCode, JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID); + } + + public void Notify(JT809_0x9007_ReasonCode reasonCode, uint msgGNSSCENTERID) + { + if (configuration.SubordinateClientEnable) + { + var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID); + if (session != null) + { + //发送从链路注销请求 + var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007() + { + ReasonCode = reasonCode + }); + JT809Response jT809Response = new JT809Response(package, 100); + logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); + session.Channel.WriteAndFlushAsync(jT809Response); + } + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLoginImplService.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLoginImplService.cs new file mode 100644 index 0000000..1440786 --- /dev/null +++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLoginImplService.cs @@ -0,0 +1,46 @@ +using JT809.DotNetty.Core.Clients; +using JT809.DotNetty.Core.Configurations; +using JT809.DotNetty.Core.Interfaces; +using JT809.DotNetty.Core.Metadata; +using JT809.DotNetty.Core.Session; +using JT809.Protocol; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using JT809.Protocol.MessageBody; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT809.DotNetty.Core.Internal +{ + /// + /// 上级平台 + /// 从链路登录服务实现 + /// + class JT809SubordinateLoginImplService : IJT809SubordinateLoginService + { + + private readonly JT809SubordinateClient subordinateLinkClient; + private readonly JT809Configuration configuration; + + public JT809SubordinateLoginImplService( + IOptions jT809ConfigurationAccessor, + JT809SubordinateClient subordinateLinkClient + ) + { + this.subordinateLinkClient = subordinateLinkClient; + configuration = jT809ConfigurationAccessor.Value; + } + + public void Login(string ip, int port) + { + if (configuration.SubordinateClientEnable) + { + subordinateLinkClient.ConnectAsync(ip, port); + } + } + } +} diff --git a/src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs b/src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs index 9c07eb0..f2a4c8b 100644 --- a/src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs +++ b/src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs @@ -16,7 +16,7 @@ namespace JT809.DotNetty.Core.Internal /// internal class JT809SuperiorMsgIdReceiveDefaultHandler : JT809SuperiorMsgIdReceiveHandlerBase { - public JT809SuperiorMsgIdReceiveDefaultHandler(ILoggerFactory loggerFactory, IOptions jT809ConfigurationAccessor, IJT809VerifyCodeGenerator verifyCodeGenerator, JT809SubordinateClient subordinateLinkClient) : base(loggerFactory, jT809ConfigurationAccessor, verifyCodeGenerator, subordinateLinkClient) + public JT809SuperiorMsgIdReceiveDefaultHandler(ILoggerFactory loggerFactory, IJT809SubordinateLoginService jT809SubordinateLoginService, IJT809VerifyCodeGenerator verifyCodeGenerator) : base(loggerFactory, jT809SubordinateLoginService, verifyCodeGenerator) { } } diff --git a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj index 1b28832..126891e 100644 --- a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj +++ b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj @@ -18,6 +18,7 @@ + diff --git a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs index b045fcb..3c4661a 100644 --- a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs +++ b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs @@ -72,9 +72,9 @@ namespace JT809.DotNetty.Core /// /// /// - public static IServiceCollection AddJT809InferiorPlatform(this IServiceCollection serviceDescriptors, Action> options) + public static IServiceCollection AddJT809InferiorPlatform(this IServiceCollection serviceDescriptors, Action options) { - serviceDescriptors.Configure(options); + serviceDescriptors.Configure(options); //主从链路客户端和服务端连接处理器 serviceDescriptors.TryAddScoped(); serviceDescriptors.TryAddScoped(); @@ -99,9 +99,9 @@ namespace JT809.DotNetty.Core /// /// /// - public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action> options) + public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action options) { - serviceDescriptors.Configure(options); + serviceDescriptors.Configure(options); serviceDescriptors.TryAddSingleton(); //主从链路客户端和服务端连接处理器 serviceDescriptors.TryAddScoped(); @@ -113,6 +113,7 @@ namespace JT809.DotNetty.Core //主从链路消息接收处理器 serviceDescriptors.TryAddScoped(); serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddSingleton(); //从链路客户端 serviceDescriptors.TryAddSingleton(); //主链路服务端 diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj index 0e4fc83..4ac60b5 100644 --- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj +++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj @@ -12,7 +12,7 @@ - + diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs index 5d9c798..9e52af0 100644 --- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs +++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs @@ -1,5 +1,5 @@ using JT809.DotNetty.Core; -using JT809.DotNetty.Tcp; +using JT809.DotNetty.Core.Configurations; using JT809.Protocol.Configs; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -45,7 +45,9 @@ namespace JT809.DotNetty.Host.Test services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT809Core(hostContext.Configuration) - .AddJT809TcpHost(); + .AddJT809SuperiorPlatform(options=> { + options.TcpPort = 839; + }); }); await serverHostBuilder.RunConsoleAsync();