Explorar el Código

增加上级平台从链路管理

tags/old
SmallChi hace 6 años
padre
commit
4d5bee0df3
Se han modificado 13 ficheros con 243 adiciones y 36 borrados
  1. +67
    -15
      src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs
  2. +15
    -0
      src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs
  3. +3
    -1
      src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs
  4. +7
    -12
      src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs
  5. +19
    -0
      src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLinkNotifyService.cs
  6. +16
    -0
      src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLoginService.cs
  7. +58
    -0
      src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs
  8. +46
    -0
      src/JT809.DotNetty.Core/Internal/JT809SubordinateLoginImplService.cs
  9. +1
    -1
      src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs
  10. +1
    -0
      src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj
  11. +5
    -4
      src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs
  12. +1
    -1
      src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj
  13. +4
    -2
      src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs

+ 67
- 15
src/JT809.DotNetty.Core/Clients/JT809SubordinateClient.cs Ver fichero

@@ -30,7 +30,7 @@ namespace JT809.DotNetty.Core.Clients

private MultithreadEventLoopGroup group;

public IChannel Channel { get; private set; }
private IChannel channel;

private readonly ILogger<JT809SubordinateClient> logger;

@@ -40,17 +40,21 @@ namespace JT809.DotNetty.Core.Clients

private readonly IJT809VerifyCodeGenerator verifyCodeGenerator;

private readonly IJT809SubordinateLinkNotifyService subordinateLinkNotifyService;

private bool disposed = false;

public JT809SubordinateClient(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IJT809SubordinateLinkNotifyService subordinateLinkNotifyService,
IJT809VerifyCodeGenerator verifyCodeGenerator)
{
this.serviceProvider = provider;
this.loggerFactory = loggerFactory;
this.verifyCodeGenerator = verifyCodeGenerator;
this.logger = loggerFactory.CreateLogger<JT809SubordinateClient>();
this.subordinateLinkNotifyService = subordinateLinkNotifyService;
group = new MultithreadEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.Group(group)
@@ -75,39 +79,87 @@ namespace JT809.DotNetty.Core.Clients
}));
}

public async void ConnectAsync(string ip,int port,uint verifyCode,int delay=3000)
public async void ConnectAsync(string ip,int port,int delay=3000)
{
var verifyCode = verifyCodeGenerator.Get();
logger.LogInformation($"ip:{ip},port:{port},verifycode:{verifyCode}");
await Task.Delay(delay);
try
{
if (Channel == null)
{
if (channel == null)
{
Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
//从链路连接请求消息
var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001()
{
VerifyCode = verifyCode
});
logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
JT809Response jT809Response = new JT809Response(package, 100);
SendAsync(jT809Response);
}
else
{
await Channel.CloseAsync();
Channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
await channel.CloseAsync();
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
//从链路连接请求消息
var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001()
{
VerifyCode = verifyCode
});
logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
JT809Response jT809Response = new JT809Response(package, 100);
SendAsync(jT809Response);
}
}
catch (AggregateException ex)
catch (ConnectException ex)
{
subordinateLinkNotifyService.Notify(JT809_0x9007_ReasonCode.无法连接下级平台指定的服务IP与端口);
logger.LogError(ex.InnerException, $"ip:{ip},port:{port},verifycode:{verifyCode}");
}
catch (Exception ex)
{
subordinateLinkNotifyService.Notify(JT809_0x9007_ReasonCode.其他原因);
logger.LogError(ex, $"ip:{ip},port:{port},verifycode:{verifyCode}");
}
}

public async Task<bool> ReConnectAsync(EndPoint ipEndPoint)
{
var verifyCode = verifyCodeGenerator.Get();
try
{
logger.LogInformation($"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}");
channel = await bootstrap.ConnectAsync(ipEndPoint);
//从链路连接请求消息
var package = JT809BusinessType.从链路连接请求消息.Create(new JT809_0x9001()
{
VerifyCode = verifyCode
});
logger.LogInformation($"从链路连接请求消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
JT809Response jT809Response = new JT809Response(package, 100);
SendAsync(jT809Response);
return channel.Open;
}
catch (ConnectException ex)
{
logger.LogError(ex.InnerException, $"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}");
return false;
}
catch (Exception ex)
{
logger.LogError(ex, $"IPAddress:{ipEndPoint.ToString()},verifycode:{verifyCode}");
return false;
}
}

public async void SendAsync(JT809Response jT809Response)
{
if (Channel == null) throw new NullReferenceException("Channel Not Open");
if (channel == null) throw new NullReferenceException("Channel Not Open");
if (jT809Response == null) throw new ArgumentNullException("Data is null");
if (Channel.Open && Channel.Active)
if (channel.Open && channel.Active)
{
await Channel.WriteAndFlushAsync(jT809Response);
await channel.WriteAndFlushAsync(jT809Response);
}
}

@@ -115,8 +167,8 @@ namespace JT809.DotNetty.Core.Clients
{
get
{
if (Channel == null) return false;
return Channel.Open && Channel.Active;
if (channel == null) return false;
return channel.Open && channel.Active;
}
}

@@ -136,7 +188,7 @@ namespace JT809.DotNetty.Core.Clients
VerifyCode = verifyCodeGenerator.Get()
});
JT809Response jT809Response = new JT809Response(package, 100);
Channel.WriteAndFlushAsync(jT809Response);
SendAsync(jT809Response);
logger.LogInformation($"发送从链路注销请求>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
}
catch (Exception ex)
@@ -146,7 +198,7 @@ namespace JT809.DotNetty.Core.Clients
finally
{
//清理托管资源
Channel.CloseAsync();
channel.CloseAsync();
group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
}
}


+ 15
- 0
src/JT809.DotNetty.Core/Handlers/JT809SubordinateClientConnectionHandler.cs Ver fichero

@@ -1,9 +1,11 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT809.DotNetty.Core.Clients;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using Polly;
using System;
using System.Threading.Tasks;

@@ -16,11 +18,14 @@ namespace JT809.DotNetty.Core.Handlers
{

private readonly ILogger<JT809SubordinateClientConnectionHandler> logger;
private readonly JT809SubordinateClient subordinateClient;

public JT809SubordinateClientConnectionHandler(
JT809SubordinateClient jT809SubordinateClient,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT809SubordinateClientConnectionHandler>();
subordinateClient = jT809SubordinateClient;
}

/// <summary>
@@ -41,6 +46,16 @@ namespace JT809.DotNetty.Core.Handlers
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
Policy.HandleResult(context.Channel.Open)
.WaitAndRetryForeverAsync(retryAttempt =>
{
return retryAttempt > 3 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试3次,之后重试都是接近12个小时重试一次
},
(exception, timespan, ctx) =>
{
logger.LogError($"服务端断开{context.Channel.RemoteAddress},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}");
})
.ExecuteAsync(async () => await subordinateClient.ReConnectAsync(context.Channel.RemoteAddress));
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");


+ 3
- 1
src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerConnectionHandler.cs Ver fichero

@@ -1,5 +1,7 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using JT809.DotNetty.Core.Clients;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;
@@ -11,10 +13,10 @@ namespace JT809.DotNetty.Core.Handlers
/// </summary>
internal class JT809SubordinateServerConnectionHandler: ChannelHandlerAdapter
{

private readonly ILogger<JT809SubordinateServerConnectionHandler> logger;

public JT809SubordinateServerConnectionHandler(
JT809SubordinateClient subordinateClient,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT809SubordinateServerConnectionHandler>();


+ 7
- 12
src/JT809.DotNetty.Core/Handlers/JT809SuperiorMsgIdReceiveHandlerBase.cs Ver fichero

@@ -20,21 +20,20 @@ namespace JT809.DotNetty.Core.Handlers
public abstract class JT809SuperiorMsgIdReceiveHandlerBase
{
protected IJT809VerifyCodeGenerator VerifyCodeGenerator { get; }
protected JT809SubordinateClient SubordinateLinkClient { get; }
protected JT809Configuration Configuration { get; }
protected ILogger Logger { get; }
protected IJT809SubordinateLoginService SubordinateLoginService { get; }

/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT809SuperiorMsgIdReceiveHandlerBase(
ILoggerFactory loggerFactory,
IOptions<JT809Configuration> jT809ConfigurationAccessor,
IJT809VerifyCodeGenerator verifyCodeGenerator,
JT809SubordinateClient subordinateLinkClient)
IJT809SubordinateLoginService jT809SubordinateLoginService,
IJT809VerifyCodeGenerator verifyCodeGenerator)
{
this.Logger = loggerFactory.CreateLogger<JT809SuperiorMsgIdReceiveHandlerBase>();
this.VerifyCodeGenerator = verifyCodeGenerator;
this.SubordinateLinkClient = subordinateLinkClient;
this.SubordinateLoginService = jT809SubordinateLoginService;
HandlerDict = new Dictionary<JT809BusinessType, Func<JT809Request, JT809Response>>
{
{JT809BusinessType.主链路登录请求消息, Msg0x1001},
@@ -74,11 +73,8 @@ namespace JT809.DotNetty.Core.Handlers
Result = JT809_0x1002_Result.成功,
VerifyCode = verifyCode
});
if (Configuration.SubordinateClientEnable)
{
var jT809_0x1001 = request.Package.Bodies as JT809_0x1001;
SubordinateLinkClient.ConnectAsync(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort, verifyCode);
}
var jT809_0x1001 = request.Package.Bodies as JT809_0x1001;
SubordinateLoginService.Login(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort);
return new JT809Response(package, 100);
}

@@ -165,7 +161,6 @@ namespace JT809.DotNetty.Core.Handlers
}



/// <summary>
/// 主链路动态信息交换消息
/// </summary>


+ 19
- 0
src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLinkNotifyService.cs Ver fichero

@@ -0,0 +1,19 @@
using JT809.Protocol.Enums;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Interfaces
{
/// <summary>
/// 上级平台
/// 从链路服务
/// </summary>
public interface IJT809SubordinateLinkNotifyService
{
void Notify(JT809_0x9007_ReasonCode reasonCode);

void Notify(JT809_0x9007_ReasonCode reasonCode,uint msgGNSSCENTERID);
}
}

+ 16
- 0
src/JT809.DotNetty.Core/Interfaces/IJT809SubordinateLoginService.cs Ver fichero

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Interfaces
{
/// <summary>
/// 上级平台
/// 从链路登录服务
/// </summary>
public interface IJT809SubordinateLoginService
{
void Login(string ip, int port);
}
}

+ 58
- 0
src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs Ver fichero

@@ -0,0 +1,58 @@
using JT809.DotNetty.Core.Configurations;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Session;
using JT809.Protocol;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using JT809.Protocol.MessageBody;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Internal
{
class JT809SubordinateLinkNotifyImplService: IJT809SubordinateLinkNotifyService
{
private readonly JT809Configuration configuration;
private readonly JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager;
private readonly ILogger logger;

public JT809SubordinateLinkNotifyImplService(
ILoggerFactory loggerFactory,
IOptions<JT809Configuration> jT809ConfigurationAccessor,
JT809SuperiorMainSessionManager jT809SuperiorMainSessionManager
)
{
this.logger = loggerFactory.CreateLogger<JT809SubordinateLinkNotifyImplService>();
configuration = jT809ConfigurationAccessor.Value;
this.jT809SuperiorMainSessionManager = jT809SuperiorMainSessionManager;
}

public void Notify(JT809_0x9007_ReasonCode reasonCode)
{
Notify(reasonCode, JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID);
}

public void Notify(JT809_0x9007_ReasonCode reasonCode, uint msgGNSSCENTERID)
{
if (configuration.SubordinateClientEnable)
{
var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID);
if (session != null)
{
//发送从链路注销请求
var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007()
{
ReasonCode = reasonCode
});
JT809Response jT809Response = new JT809Response(package, 100);
logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
session.Channel.WriteAndFlushAsync(jT809Response);
}
}
}
}
}

+ 46
- 0
src/JT809.DotNetty.Core/Internal/JT809SubordinateLoginImplService.cs Ver fichero

@@ -0,0 +1,46 @@
using JT809.DotNetty.Core.Clients;
using JT809.DotNetty.Core.Configurations;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Session;
using JT809.Protocol;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using JT809.Protocol.MessageBody;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Internal
{
/// <summary>
/// 上级平台
/// 从链路登录服务实现
/// </summary>
class JT809SubordinateLoginImplService : IJT809SubordinateLoginService
{

private readonly JT809SubordinateClient subordinateLinkClient;
private readonly JT809Configuration configuration;

public JT809SubordinateLoginImplService(
IOptions<JT809Configuration> jT809ConfigurationAccessor,
JT809SubordinateClient subordinateLinkClient
)
{
this.subordinateLinkClient = subordinateLinkClient;
configuration = jT809ConfigurationAccessor.Value;
}

public void Login(string ip, int port)
{
if (configuration.SubordinateClientEnable)
{
subordinateLinkClient.ConnectAsync(ip, port);
}
}
}
}

+ 1
- 1
src/JT809.DotNetty.Core/Internal/JT809SuperiorMsgIdReceiveDefaultHandler.cs Ver fichero

@@ -16,7 +16,7 @@ namespace JT809.DotNetty.Core.Internal
/// </summary>
internal class JT809SuperiorMsgIdReceiveDefaultHandler : JT809SuperiorMsgIdReceiveHandlerBase
{
public JT809SuperiorMsgIdReceiveDefaultHandler(ILoggerFactory loggerFactory, IOptions<JT809Configuration> jT809ConfigurationAccessor, IJT809VerifyCodeGenerator verifyCodeGenerator, JT809SubordinateClient subordinateLinkClient) : base(loggerFactory, jT809ConfigurationAccessor, verifyCodeGenerator, subordinateLinkClient)
public JT809SuperiorMsgIdReceiveDefaultHandler(ILoggerFactory loggerFactory, IJT809SubordinateLoginService jT809SubordinateLoginService, IJT809VerifyCodeGenerator verifyCodeGenerator) : base(loggerFactory, jT809SubordinateLoginService, verifyCodeGenerator)
{
}
}


+ 1
- 0
src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj Ver fichero

@@ -18,6 +18,7 @@
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
<PackageReference Include="Polly" Version="7.0.3" />
</ItemGroup>
<ItemGroup>


+ 5
- 4
src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs Ver fichero

@@ -72,9 +72,9 @@ namespace JT809.DotNetty.Core
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <returns></returns>
public static IServiceCollection AddJT809InferiorPlatform(this IServiceCollection serviceDescriptors, Action<IOptions<JT809InferiorPlatformOptions>> options)
public static IServiceCollection AddJT809InferiorPlatform(this IServiceCollection serviceDescriptors, Action<JT809InferiorPlatformOptions> options)
{
serviceDescriptors.Configure<JT809InferiorPlatformOptions>(options);
serviceDescriptors.Configure(options);
//主从链路客户端和服务端连接处理器
serviceDescriptors.TryAddScoped<JT809MainClientConnectionHandler>();
serviceDescriptors.TryAddScoped<JT809SubordinateServerConnectionHandler>();
@@ -99,9 +99,9 @@ namespace JT809.DotNetty.Core
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <returns></returns>
public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action<IOptions<JT809SuperiorPlatformOptions>> options)
public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action<JT809SuperiorPlatformOptions> options)
{
serviceDescriptors.Configure<JT809SuperiorPlatformOptions>(options);
serviceDescriptors.Configure(options);
serviceDescriptors.TryAddSingleton<IJT809VerifyCodeGenerator, JT809VerifyCodeGeneratorDefaultImpl>();
//主从链路客户端和服务端连接处理器
serviceDescriptors.TryAddScoped<JT809MainServerConnectionHandler>();
@@ -113,6 +113,7 @@ namespace JT809.DotNetty.Core
//主从链路消息接收处理器
serviceDescriptors.TryAddScoped<JT809MainServerHandler>();
serviceDescriptors.TryAddScoped<JT809SubordinateClientHandler>();
serviceDescriptors.TryAddSingleton<IJT809SubordinateLoginService, JT809SubordinateLoginImplService>();
//从链路客户端
serviceDescriptors.TryAddSingleton<JT809SubordinateClient>();
//主链路服务端


+ 1
- 1
src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj Ver fichero

@@ -12,7 +12,7 @@
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\JT809.DotNetty.Tcp\JT809.DotNetty.Tcp.csproj" />
<ProjectReference Include="..\..\JT809.DotNetty.Core\JT809.DotNetty.Core.csproj" />
</ItemGroup>
<ItemGroup>
<None Update="appsettings.json">


+ 4
- 2
src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs Ver fichero

@@ -1,5 +1,5 @@
using JT809.DotNetty.Core;
using JT809.DotNetty.Tcp;
using JT809.DotNetty.Core.Configurations;
using JT809.Protocol.Configs;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -45,7 +45,9 @@ namespace JT809.DotNetty.Host.Test
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT809Core(hostContext.Configuration)
.AddJT809TcpHost();
.AddJT809SuperiorPlatform(options=> {
options.TcpPort = 839;
});
});

await serverHostBuilder.RunConsoleAsync();


Cargando…
Cancelar
Guardar