diff --git a/src/JT808.DotNetty.Abstractions/IJT808ClientBuilder.cs b/src/JT808.DotNetty.Abstractions/IJT808ClientBuilder.cs new file mode 100644 index 0000000..124791f --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/IJT808ClientBuilder.cs @@ -0,0 +1,15 @@ +using JT808.DotNetty.Abstractions; +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Abstractions +{ + public interface IJT808ClientBuilder + { + IJT808Builder JT808Builder { get; } + IJT808Builder Builder(); + } +} diff --git a/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs b/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs index 378b3a0..b268ef2 100644 --- a/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs +++ b/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs @@ -22,11 +22,5 @@ namespace JT808.DotNetty.Core.Impls { return JT808Builder; } - - //public IJT808NettyBuilder ReplaceSessionPublishing() where T : IJT808SessionPublishing - //{ - // JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionPublishing), typeof(T), ServiceLifetime.Singleton)); - // return this; - //} } } \ No newline at end of file diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/DataTransferOptions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/DataTransferOptions.cs new file mode 100644 index 0000000..4aff0a4 --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/DataTransferOptions.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Transmit.Configs +{ + public class DataTransferOptions + { + public string Host { get; set; } + + public List TerminalNos { get; set; } + } +} diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/RemoteServerOptions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/RemoteServerOptions.cs new file mode 100644 index 0000000..69c108f --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/RemoteServerOptions.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Transmit.Configs +{ + public class RemoteServerOptions + { + public List DataTransfer { get; set; } + } +} diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Handlers/ClientConnectionHandler.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Handlers/ClientConnectionHandler.cs new file mode 100644 index 0000000..da9926a --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Handlers/ClientConnectionHandler.cs @@ -0,0 +1,76 @@ +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using Polly; +using System; +using System.Linq; +using System.Collections.Generic; +using System.Net; +using System.Text; +using Microsoft.Extensions.Logging; + +namespace JT808.DotNetty.Transmit.Handlers +{ + public class ClientConnectionHandler : ChannelHandlerAdapter + { + private readonly Bootstrap bootstrap; + public Dictionary channeldic; + private readonly ILogger logger; + public ClientConnectionHandler(Bootstrap bootstrap, + Dictionary channeldic, + ILoggerFactory loggerFactory) + { + this.bootstrap = bootstrap; + this.channeldic = channeldic; + logger = loggerFactory.CreateLogger(); + } + public override void ChannelInactive(IChannelHandlerContext context) + { + Policy.HandleResult(context.Channel.Open) + .WaitAndRetryForeverAsync(retryAttempt => + { + return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,之后重试都是接近12个小时重试一次 + }, + (exception, timespan, ctx) => + { + logger.LogError($"服务端断开{context.Channel.RemoteAddress},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}"); + }) + .ExecuteAsync(async () => + { + try + { + var oldChannel = channeldic.FirstOrDefault(m => m.Value == context.Channel); + if (default(KeyValuePair).Equals(oldChannel)) + { + if(logger.IsEnabled( LogLevel.Debug)) + logger.LogDebug($"服务器已经删除了{oldChannel.Key}远程服务器配置"); + return true; + } + var channel = await bootstrap.ConnectAsync(context.Channel.RemoteAddress); + channeldic.Remove(oldChannel.Key); + channeldic.Add(oldChannel.Key, channel); + return channel.Open; + } + catch (Exception ex) + { + logger.LogError($"服务端断开后{context.Channel.RemoteAddress},重连异常:{ex}"); + return false; + } + }); + } + + public override void ChannelRead(IChannelHandlerContext context, object message) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogError($"服务端返回消息{message}"); + } + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + logger.LogError($"服务端Exception: {exception}"); + context.CloseAsync(); + } + } +} diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808.DotNetty.Transmit.csproj b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808.DotNetty.Transmit.csproj new file mode 100644 index 0000000..f7b1f9c --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808.DotNetty.Transmit.csproj @@ -0,0 +1,30 @@ + + + + + JT808.DotNetty.Transmit + JT808.DotNetty.Transmit + 基于DotNetty实现的JT808数据转发服务 + 基于DotNetty实现的JT808数据转发服务 + LICENSE + + + + + + + + + + + + + True + + + + + + + + diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitExtensions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitExtensions.cs new file mode 100644 index 0000000..eef4fbb --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitExtensions.cs @@ -0,0 +1,23 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.Protocol; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Transmit; +using Microsoft.Extensions.Configuration; +using JT808.DotNetty.Transmit.Configs; + +namespace JT808.DotNetty.Client +{ + public static class JT808DotNettyTransmitExtensions + { + public static IJT808ClientBuilder AddJT808Transmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("RemoteServerOptions")); + jT808ClientBuilder.JT808Builder.Services.AddSingleton(); + jT808ClientBuilder.JT808Builder.Services.AddHostedService(); + return jT808ClientBuilder; + } + } +} diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs new file mode 100644 index 0000000..d9e3b7a --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs @@ -0,0 +1,52 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; +using System.Threading.Tasks; +using DotNetty.Handlers.Logging; +using Polly; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using JT808.DotNetty.Transmit.Configs; +using System.Linq; +using JT808.DotNetty.Transmit.Handlers; +using JT808.DotNetty.Abstractions; +using JT808.Protocol; +using JT808.Protocol.Interfaces; +using Microsoft.Extensions.Hosting; +using System.Threading; + +namespace JT808.DotNetty.Transmit +{ + public class JT808DotNettyTransmitHostedService:IHostedService + { + private readonly JT808DotNettyTransmitService jT808DotNettyTransmitService; + private readonly IJT808MsgConsumer jT808MsgConsumer; + public JT808DotNettyTransmitHostedService( + IJT808MsgConsumer jT808MsgConsumer, + JT808DotNettyTransmitService jT808DotNettyTransmitService) + { + this.jT808DotNettyTransmitService = jT808DotNettyTransmitService; + this.jT808MsgConsumer = jT808MsgConsumer; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Subscribe(); + jT808MsgConsumer.OnMessage(item=> { + jT808DotNettyTransmitService.SendAsync(item.TerminalNo,item.Data); + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + jT808MsgConsumer.Unsubscribe(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs new file mode 100644 index 0000000..2bbd8b1 --- /dev/null +++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs @@ -0,0 +1,236 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using System; +using System.Collections.Generic; +using System.Net; +using System.Threading.Tasks; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using JT808.DotNetty.Transmit.Configs; +using System.Linq; +using JT808.DotNetty.Transmit.Handlers; + +namespace JT808.DotNetty.Transmit +{ + public class JT808DotNettyTransmitService + { + private readonly ILogger logger; + private readonly ILoggerFactory loggerFactory; + private IOptionsMonitor optionsMonitor; + public Dictionary channeldic = new Dictionary(); + public JT808DotNettyTransmitService(ILoggerFactory loggerFactory, + IOptionsMonitor optionsMonitor) + { + this.loggerFactory = loggerFactory; + logger = loggerFactory.CreateLogger("JT808DotNettyTransmitService"); + this.optionsMonitor = optionsMonitor; + InitialDispatcherClient(); + } + public void SendAsync(string terminalNo,byte[] data) + { + if (optionsMonitor.CurrentValue.DataTransfer != null) + { + foreach (var item in optionsMonitor.CurrentValue.DataTransfer) + { + if (channeldic.TryGetValue($"all_{item.Host}", out var allClientChannel)) + { + try + { + if (allClientChannel.Open) + { + if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug)) + { + logger.LogDebug($"转发所有数据到该网关{item.Host}"); + } + allClientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + else + { + logger.LogError($"{item.Host}链接已关闭"); + } + } + catch (Exception ex) + { + logger.LogError($"{item.Host}发送数据出现异常:{ex}"); + } + } + else + { + if (item.TerminalNos.Contains(terminalNo) && channeldic.TryGetValue($"{terminalNo}_{item.Host}", out var clientChannel)) + { + try + { + if (clientChannel.Open) + { + if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug)) + logger.LogDebug($"转发{terminalNo}到该网关{item.Host}"); + clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + else + { + logger.LogError($"{item.Host},{terminalNo}链接已关闭"); + } + } + catch (Exception ex) + { + logger.LogError($"{item.Host},{terminalNo}发送数据出现异常:{ex}"); + } + } + } + } + } + } + + public void InitialDispatcherClient() + { + Task.Run(async () => + { + var group = new MultithreadEventLoopGroup(); + var bootstrap = new Bootstrap(); + bootstrap.Group(group) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + pipeline.AddLast(new ClientConnectionHandler(bootstrap, channeldic, loggerFactory)); + })); + optionsMonitor.OnChange(options => + { + List lastRemoteServers = new List(); + if (options.DataTransfer != null) + { + if (options.DataTransfer.Any()) + { + foreach (var item in options.DataTransfer) + { + if (item.TerminalNos != null) + { + if (item.TerminalNos.Any()) + { + foreach (var terminal in item.TerminalNos) + { + lastRemoteServers.Add($"{terminal}_{item.Host}"); + } + } + else + { + lastRemoteServers.Add($"all_{item.Host}"); + } + } + else + { + lastRemoteServers.Add($"all_{item.Host}"); + } + } + } + } + DelRemoteServsers(lastRemoteServers); + AddRemoteServsers(bootstrap, lastRemoteServers); + }); + await InitRemoteServsers(bootstrap); + }); + } + /// + /// 初始化远程服务器 + /// + /// + /// + /// + private async Task InitRemoteServsers(Bootstrap bootstrap) + { + List remoteServers = new List(); + if (optionsMonitor.CurrentValue.DataTransfer != null) + { + if (optionsMonitor.CurrentValue.DataTransfer.Any()) + { + foreach (var item in optionsMonitor.CurrentValue.DataTransfer) + { + if (item.TerminalNos != null) + { + if (item.TerminalNos.Any()) + { + foreach (var terminal in item.TerminalNos) + { + remoteServers.Add($"{terminal}_{item.Host}"); + } + } + else + { + remoteServers.Add($"all_{item.Host}"); + } + } + else + { + remoteServers.Add($"all_{item.Host}"); + } + } + } + } + foreach (var item in remoteServers) + { + try + { + string ip_port = item.Split('_')[1]; + IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip_port.Split(':')[0]), int.Parse(ip_port.Split(':')[1]))); + channeldic.Add(item, clientChannel); + if (clientChannel.Open) + { + if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug)) + { + logger.LogDebug($"该终端{item.Replace("_", "已连接上该服务器")}"); + } + } + } + catch (Exception ex) + { + logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}"); + } + } + await Task.CompletedTask; + } + /// + /// 动态删除远程服务器 + /// + /// + private void DelRemoteServsers(List lastRemoteServers) + { + var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList(); + foreach (var item in delChannels) + { + channeldic[item].CloseAsync(); + channeldic.Remove(item); + } + } + /// + /// 动态添加远程服务器 + /// + /// + /// + private void AddRemoteServsers(Bootstrap bootstrap, List lastRemoteServers) + { + var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList(); + foreach (var item in addChannels) + { + try + { + var ip_port = item.Split('_')[1]; + IChannel clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip_port.Split(':')[0]), int.Parse(ip_port.Split(':')[1]))).Result; + channeldic.Add(item, clientChannel); + if (clientChannel.Open) { + if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug)) + { + logger.LogDebug($"该终端{item.Replace("_", "已连接上该服务器")}"); + } + } + } + catch (Exception ex) + { + logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}"); + } + } + } + } +} diff --git a/src/JT808.DotNetty.sln b/src/JT808.DotNetty.sln index c50bbdc..a97688e 100644 --- a/src/JT808.DotNetty.sln +++ b/src/JT808.DotNetty.sln @@ -43,6 +43,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Services", "Services", "{7F077BD5-8E4C-402A-9E24-DECAF251A420}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Transmit", "JT808.DotNetty.Services\JT808.DotNetty.Transmit\JT808.DotNetty.Transmit.csproj", "{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -121,6 +125,10 @@ Global {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.Build.0 = Release|Any CPU + {A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -135,6 +143,7 @@ Global {CCE6AEFB-1AB0-4BD9-8EA2-8B4CDD097E88} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6} {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} + {A3A0D0E1-F9AC-4004-A306-9491D2EA6883} = {7F077BD5-8E4C-402A-9E24-DECAF251A420} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B}