diff --git a/src/JT808.DotNetty/Codecs/JT808Decoder.cs b/src/JT808.DotNetty/Codecs/JT808Decoder.cs index 782e3b6..6ce54b5 100644 --- a/src/JT808.DotNetty/Codecs/JT808Decoder.cs +++ b/src/JT808.DotNetty/Codecs/JT808Decoder.cs @@ -7,6 +7,7 @@ using System.Collections.Generic; using System.Text; using JT808.Protocol; using JT808.DotNetty.Internal; +using JT808.DotNetty.Interfaces; namespace JT808.DotNetty.Codecs { @@ -17,9 +18,14 @@ namespace JT808.DotNetty.Codecs { private readonly ILogger logger; - public JT808Decoder(ILoggerFactory loggerFactory) + private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; + + public JT808Decoder( + IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, + ILoggerFactory loggerFactory) { this.logger = loggerFactory.CreateLogger(); + this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; } private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter(); @@ -34,6 +40,7 @@ namespace JT808.DotNetty.Codecs input.ReadBytes(buffer, 1, input.Capacity); buffer[0] = JT808Package.BeginFlag; buffer[input.Capacity + 1] = JT808Package.EndFlag; + jT808SourcePackageDispatcher?.SendAsync(buffer); JT808Package jT808Package = JT808Serializer.Deserialize(buffer); output.Add(jT808Package); MsgSuccessCounter.Increment(); diff --git a/src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherInfo.cs b/src/JT808.DotNetty/Configurations/JT808ClientConfiguration.cs similarity index 93% rename from src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherInfo.cs rename to src/JT808.DotNetty/Configurations/JT808ClientConfiguration.cs index 52708be..3d8385d 100644 --- a/src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherInfo.cs +++ b/src/JT808.DotNetty/Configurations/JT808ClientConfiguration.cs @@ -5,7 +5,7 @@ using System.Text; namespace JT808.DotNetty.Configurations { - public class JT808SourcePackageDispatcherInfo + public class JT808ClientConfiguration { public string Host { get; set; } diff --git a/src/JT808.DotNetty/Configurations/JT808Configuration.cs b/src/JT808.DotNetty/Configurations/JT808Configuration.cs index d0ffbe9..adbb104 100644 --- a/src/JT808.DotNetty/Configurations/JT808Configuration.cs +++ b/src/JT808.DotNetty/Configurations/JT808Configuration.cs @@ -30,5 +30,9 @@ namespace JT808.DotNetty.Configurations /// 默认5分钟 /// public int SessionReportTime { get; set; } = 30000; + /// + /// 源包分发器配置 + /// + public List SourcePackageDispatcherClientConfigurations { get; set; } } } diff --git a/src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherConfiguration.cs b/src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherConfiguration.cs deleted file mode 100644 index 94ae975..0000000 --- a/src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherConfiguration.cs +++ /dev/null @@ -1,11 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT808.DotNetty.Configurations -{ - public class JT808SourcePackageDispatcherConfiguration - { - public IList SourcePackageDispatchers { get; set; } - } -} diff --git a/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs b/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs new file mode 100644 index 0000000..4fd2824 --- /dev/null +++ b/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs @@ -0,0 +1,71 @@ +using DotNetty.Transport.Channels; +using JT808.DotNetty.Internal; +using Microsoft.Extensions.Logging; +using Polly; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Handlers +{ + internal class JT808SourcePackageDispatcherHandler: ChannelHandlerAdapter + { + private readonly ILogger logger; + + private readonly JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl; + + public JT808SourcePackageDispatcherHandler(JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl) + { + logger=jT808SourcePackageDispatcherDefaultImpl.loggerFactory.CreateLogger(); + this.jT808SourcePackageDispatcherDefaultImpl = jT808SourcePackageDispatcherDefaultImpl; + } + + public override void ChannelInactive(IChannelHandlerContext context) + { + Policy.HandleResult(context.Channel.Open) + .WaitAndRetryForeverAsync(retryAttempt => + { + return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,接近12个小时链接一次 + },(exception, timespan, ctx) => + { + logger.LogError($"服务端断开{context.Channel.RemoteAddress.ToString()},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}"); + }) + .ExecuteAsync(async () => + { + try + { + var newChannel = jT808SourcePackageDispatcherDefaultImpl.channels.FirstOrDefault(m => m.Value == context.Channel); + if (default(KeyValuePair).Equals(newChannel)) + { + if(logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"服务器已经删除了{context.Channel.RemoteAddress.ToString()}远程服务器配置"); + return true; + } + var channel = await jT808SourcePackageDispatcherDefaultImpl.bootstrap.ConnectAsync(context.Channel.RemoteAddress); + jT808SourcePackageDispatcherDefaultImpl.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel); + return channel.Open; + } + catch (Exception ex) + { + logger.LogError($"服务端断开后{context.Channel.RemoteAddress.ToString()},重连异常:{ex}"); + return false; + } + }); + } + + public override void ChannelRead(IChannelHandlerContext context, object message) + { + if(logger.IsEnabled(LogLevel.Debug)) + logger.LogError($"服务端返回消息{message.ToString()}"); + throw new Exception("test"); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + logger.LogError(exception, context.Channel.RemoteAddress.ToString()); + context.CloseAsync(); + } + } +} diff --git a/src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs b/src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs index 7acf7ab..c183f2d 100644 --- a/src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs +++ b/src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs @@ -7,6 +7,9 @@ namespace JT808.DotNetty.Interfaces { /// /// 源包分发器 + /// 自定义源包分发器业务 + /// ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(IJT808SourcePackageDispatcher),typeof(JT808SourcePackageDispatcherDefaultImpl),ServiceLifetime.Singleton)); /// public interface IJT808SourcePackageDispatcher { diff --git a/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs b/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs new file mode 100644 index 0000000..bd141ee --- /dev/null +++ b/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs @@ -0,0 +1,173 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Handlers; +using JT808.DotNetty.Interfaces; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Internal +{ + /// + /// 源包分发器默认实现 + /// + internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher, IDisposable + { + private readonly MultithreadEventLoopGroup group = new MultithreadEventLoopGroup(); + internal readonly Bootstrap bootstrap = new Bootstrap(); + internal readonly ConcurrentDictionary channels = new ConcurrentDictionary(); + private readonly ILogger logger; + IOptionsMonitor jT808ConfigurationOptionsMonitor; + internal readonly ILoggerFactory loggerFactory; + + public JT808SourcePackageDispatcherDefaultImpl(ILoggerFactory loggerFactory, + IOptionsMonitor jT808ConfigurationOptionsMonitor) + { + this.loggerFactory = loggerFactory; + this.logger = loggerFactory.CreateLogger(); + this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; + StartAsync(); + } + + public async Task SendAsync(byte[] data) + { + foreach (var item in channels) + { + try + { + if (item.Value.Open && item.Value.Active) + { + await item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + else + { + logger.LogError($"{item}链接已关闭"); + } + } + catch (Exception ex) + { + logger.LogError($"{item}发送数据出现异常:{ex}"); + } + } + await Task.CompletedTask; + } + + public void StartAsync() + { + bootstrap + .Group(group) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this)); + })); + jT808ConfigurationOptionsMonitor.OnChange(options => + { + List chgRemoteServers = new List(); + if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) + { + chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; + } + DelRemoteServsers(chgRemoteServers); + AddRemoteServsers(chgRemoteServers); + }); + if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) + { + foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations) + { + try + { + Task.Run(async () => + { + IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint); + channels.TryAdd(item.EndPoint, clientChannel); + logger.LogInformation($"初始化链接远程服务端{item.EndPoint.ToString()}"); + }); + } + catch (Exception ex) + { + logger.LogError($"初始化链接远程服务端{item},链接异常:{ex}"); + } + } + } + } + + public Task StopAsync() + { + foreach (var channel in channels) + { + try + { + channel.Value.CloseAsync(); + } + catch + { + + } + } + group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan); + return Task.CompletedTask; + } + + /// + /// 动态删除远程服务器 + /// + /// + private void DelRemoteServsers(List chgRemoteServers) + { + var delChannels = channels.Keys.Except(chgRemoteServers.Select(s=>s.EndPoint)).ToList(); + foreach (var item in delChannels) + { + try + { + channels.TryRemove(item, out var channel); + channel.CloseAsync(); + } + catch + { + + } + } + } + + /// + /// 动态添加远程服务器 + /// + /// + /// + private async void AddRemoteServsers(List chgRemoteServers) + { + var addChannels = chgRemoteServers.Select(s=>s.EndPoint).Except(channels.Keys).ToList(); + foreach (var item in addChannels) + { + try + { + IChannel clientChannel =await bootstrap.ConnectAsync(item); + channels.TryAdd(item, clientChannel); + logger.LogInformation($"变更后链接远程服务端{item}"); + } + catch (Exception ex) + { + logger.LogError($"变更后链接远程服务端{item},重连异常:{ex}"); + } + } + } + + public void Dispose() + { + StopAsync(); + } + } +} diff --git a/src/JT808.DotNetty/JT808.DotNetty.csproj b/src/JT808.DotNetty/JT808.DotNetty.csproj index 07f0ec5..bb3e910 100644 --- a/src/JT808.DotNetty/JT808.DotNetty.csproj +++ b/src/JT808.DotNetty/JT808.DotNetty.csproj @@ -14,6 +14,7 @@ + diff --git a/src/JT808.DotNetty/JT808DotnettyExtensions.cs b/src/JT808.DotNetty/JT808DotnettyExtensions.cs index de9b161..4b37b26 100644 --- a/src/JT808.DotNetty/JT808DotnettyExtensions.cs +++ b/src/JT808.DotNetty/JT808DotnettyExtensions.cs @@ -1,6 +1,7 @@ using JT808.DotNetty.Codecs; using JT808.DotNetty.Configurations; using JT808.DotNetty.Handlers; +using JT808.DotNetty.Interfaces; using JT808.DotNetty.Internal; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -19,6 +20,7 @@ namespace JT808.DotNetty services.Configure(hostContext.Configuration.GetSection("JT808Configuration")); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddScoped(); services.TryAddScoped(); services.TryAddScoped(); diff --git a/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs b/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs index 2dc1502..fbd62f6 100644 --- a/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs +++ b/src/JT808.DotNetty/JT808MsgIdHandlerBase.cs @@ -11,6 +11,9 @@ namespace JT808.DotNetty { /// /// 抽象消息处理业务 + /// 自定义消息处理业务 + /// ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHandlerBase),typeof(JT808MsgIdCustomHandlerImpl),ServiceLifetime.Singleton)); /// public abstract class JT808MsgIdHandlerBase {