diff --git a/README.md b/README.md index a499e1d..d54bf37 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,6 @@ # JT808DotNetty -基于DotNetty封装的JT808DotNetty专注消息业务处理 +基于DotNetty封装的JT808DotNetty通用消息业务处理 [了解JT808协议进这边](https://github.com/SmallChi/JT808) diff --git a/src/JT808.DotNetty/Configurations/JT808Configuration.cs b/src/JT808.DotNetty/Configurations/JT808Configuration.cs index 7f8248f..6fb33e5 100644 --- a/src/JT808.DotNetty/Configurations/JT808Configuration.cs +++ b/src/JT808.DotNetty/Configurations/JT808Configuration.cs @@ -27,14 +27,24 @@ namespace JT808.DotNetty.Configurations public int AllIdleTimeSeconds { get; set; } = 3600; /// - /// WebAPI服务 + /// WebApi服务 /// 默认828端口 /// - public int WebAPIPort { get; set; } = 828; + public int WebApiPort { get; set; } = 828; /// /// 源包分发器配置 /// public List SourcePackageDispatcherClientConfigurations { get; set; } + + /// + /// 转发远程地址 (可选项)知道转发的地址有利于提升性能 + /// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + /// + public List ForwardingRemoteAddress { get; set; } } } diff --git a/src/JT808.DotNetty/Dtos/JT808AtomicCounterDto.cs b/src/JT808.DotNetty/Dtos/JT808AtomicCounterDto.cs new file mode 100644 index 0000000..e1b1627 --- /dev/null +++ b/src/JT808.DotNetty/Dtos/JT808AtomicCounterDto.cs @@ -0,0 +1,16 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Dtos +{ + /// + /// 包计数器服务 + /// + public class JT808AtomicCounterDto + { + public long MsgSuccessCount { get; set; } + + public long MsgFailCount { get; set; } + } +} diff --git a/src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs b/src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs index 58b18dc..7b94584 100644 --- a/src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs +++ b/src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs @@ -9,7 +9,7 @@ namespace JT808.DotNetty.Dtos public JT808DefaultResultDto() { Data = "Hello,JT808 WebAPI"; - Code = 200; + Code = JT808ResultCode.Ok; } } } diff --git a/src/JT808.DotNetty/Dtos/JT808IPAddressDto.cs b/src/JT808.DotNetty/Dtos/JT808IPAddressDto.cs new file mode 100644 index 0000000..3ce22dd --- /dev/null +++ b/src/JT808.DotNetty/Dtos/JT808IPAddressDto.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Dtos +{ + public class JT808IPAddressDto + { + public string Host { get; set; } + + public int Port { get; set; } + + private EndPoint endPoint; + + public EndPoint EndPoint + { + get + { + if (endPoint == null) + { + if (IPAddress.TryParse(Host, out IPAddress ip)) + { + endPoint = new IPEndPoint(ip, Port); + } + else + { + endPoint = new DnsEndPoint(Host, Port); + } + } + return endPoint; + } + } + } +} diff --git a/src/JT808.DotNetty/Dtos/JT808ResultDto.cs b/src/JT808.DotNetty/Dtos/JT808ResultDto.cs index cb334c8..da5cbf5 100644 --- a/src/JT808.DotNetty/Dtos/JT808ResultDto.cs +++ b/src/JT808.DotNetty/Dtos/JT808ResultDto.cs @@ -12,4 +12,12 @@ namespace JT808.DotNetty.Dtos public T Data { get; set; } } + + internal class JT808ResultCode + { + public const int Ok = 200; + public const int Empty = 201; + public const int NotFound = 404; + public const int Error = 500; + } } diff --git a/src/JT808.DotNetty/Dtos/JT808SourcePackageChannelInfoDto.cs b/src/JT808.DotNetty/Dtos/JT808SourcePackageChannelInfoDto.cs new file mode 100644 index 0000000..82b9c5a --- /dev/null +++ b/src/JT808.DotNetty/Dtos/JT808SourcePackageChannelInfoDto.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Dtos +{ + /// + /// 原包通道信息 + /// + public class JT808SourcePackageChannelInfoDto + { + /// + /// 远程地址 + /// + public string RemoteAddress { get; set; } + /// + /// 本地地址 + /// + public string LocalAddress { get; set; } + /// + /// 是否注册 + /// + public bool Registered { get; set; } + /// + /// 是否活动 + /// + public bool Active { get; set; } + /// + /// 是否打开 + /// + public bool Open { get; set; } + } +} diff --git a/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs index 8d7073e..b590767 100644 --- a/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs @@ -75,11 +75,19 @@ namespace JT808.DotNetty.Handlers IdleStateEvent idleStateEvent = evt as IdleStateEvent; if (idleStateEvent != null) { - string channelId = context.Channel.Id.AsShortText(); - logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); - // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 - jT808SessionManager.RemoveSessionByID(channelId); - context.CloseAsync(); + if(idleStateEvent.State== IdleState.ReaderIdle) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 + jT808SessionManager.RemoveSessionByID(channelId); + context.CloseAsync(); + } + // 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 } base.UserEventTriggered(context, evt); } diff --git a/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs index 2598e4a..747b6b3 100644 --- a/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Text; using JT808.DotNetty.Metadata; +using JT808.DotNetty.Internal; namespace JT808.DotNetty.Handlers { @@ -14,12 +15,19 @@ namespace JT808.DotNetty.Handlers private readonly JT808SessionManager jT808SessionManager; - public JT808ServerHandler(JT808MsgIdHandlerBase handler, JT808SessionManager jT808SessionManager) + private readonly JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService; + + public JT808ServerHandler( + JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService, + JT808MsgIdHandlerBase handler, + JT808SessionManager jT808SessionManager) { + this.jT808RemoteAddressTransmitConfigurationService = jT808RemoteAddressTransmitConfigurationService; this.handler = handler; this.jT808SessionManager = jT808SessionManager; } + protected override void ChannelRead0(IChannelHandlerContext ctx, JT808Package msg) { try @@ -31,7 +39,10 @@ namespace JT808.DotNetty.Handlers JT808Response jT808Package = handlerFunc(new JT808Request(msg)); if (jT808Package != null) { - ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize))); + if (!jT808RemoteAddressTransmitConfigurationService.Contains(ctx.Channel.RemoteAddress)) + { + ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize))); + } } } } diff --git a/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs b/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs index 32e9833..ab5ff7e 100644 --- a/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs @@ -14,12 +14,12 @@ namespace JT808.DotNetty.Handlers { private readonly ILogger logger; - private readonly JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl; + private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService; - public JT808SourcePackageDispatcherHandler(JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl) + public JT808SourcePackageDispatcherHandler(JT808SourcePackageChannelService jT808SourcePackageChannelService) { - logger=jT808SourcePackageDispatcherDefaultImpl.loggerFactory.CreateLogger(); - this.jT808SourcePackageDispatcherDefaultImpl = jT808SourcePackageDispatcherDefaultImpl; + logger= jT808SourcePackageChannelService.LoggerFactory.CreateLogger(); + this.jT808SourcePackageChannelService = jT808SourcePackageChannelService; } public override void ChannelInactive(IChannelHandlerContext context) @@ -30,26 +30,26 @@ namespace JT808.DotNetty.Handlers return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,接近12个小时链接一次 },(exception, timespan, ctx) => { - logger.LogError($"服务端断开{context.Channel.RemoteAddress.ToString()},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}"); + logger.LogError($"Server Disconnection {context.Channel.RemoteAddress.ToString()},Retry Results{exception.Result},Retry Number{timespan},Next Retry Interval(s){ctx.TotalSeconds}"); }) .ExecuteAsync(async () => { try { - var newChannel = jT808SourcePackageDispatcherDefaultImpl.channels.FirstOrDefault(m => m.Value == context.Channel); + var newChannel = jT808SourcePackageChannelService.channels.FirstOrDefault(m => m.Value == context.Channel); if (default(KeyValuePair).Equals(newChannel)) { if(logger.IsEnabled(LogLevel.Debug)) - logger.LogDebug($"服务器已经删除了{context.Channel.RemoteAddress.ToString()}远程服务器配置"); + logger.LogDebug($"Server already deleted {context.Channel.RemoteAddress.ToString()} remote server configuration"); return true; } - var channel = await jT808SourcePackageDispatcherDefaultImpl.bootstrap.ConnectAsync(context.Channel.RemoteAddress); - jT808SourcePackageDispatcherDefaultImpl.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel); + var channel = await jT808SourcePackageChannelService.bootstrap.ConnectAsync(context.Channel.RemoteAddress); + jT808SourcePackageChannelService.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel); return channel.Open; } catch (Exception ex) { - logger.LogError($"服务端断开后{context.Channel.RemoteAddress.ToString()},重连异常:{ex}"); + logger.LogError(ex,$"Reconnection abnormal:After the server is disconnected {context.Channel.RemoteAddress.ToString()}"); return false; } }); @@ -58,8 +58,7 @@ namespace JT808.DotNetty.Handlers public override void ChannelRead(IChannelHandlerContext context, object message) { if(logger.IsEnabled(LogLevel.Debug)) - logger.LogError($"服务端返回消息{message.ToString()}"); - //throw new Exception("test"); + logger.LogError($"The server returns a message {message.ToString()}"); } public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) diff --git a/src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs b/src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs index 31a0365..79e0de2 100644 --- a/src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs @@ -2,6 +2,7 @@ using DotNetty.Codecs.Http; using DotNetty.Common.Utilities; using DotNetty.Transport.Channels; +using JT808.DotNetty.Internal; using JT808.DotNetty.Metadata; using Microsoft.Extensions.Logging; using System; diff --git a/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs b/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs new file mode 100644 index 0000000..0292dea --- /dev/null +++ b/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs @@ -0,0 +1,96 @@ +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Dtos; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Internal +{ + /// + /// JT808远程地址转发配置服务 + /// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + /// + public class JT808RemoteAddressTransmitConfigurationService : IDisposable + { + private readonly IOptionsMonitor jT808ConfigurationOptionsMonitor; + + private ConcurrentBag ForwardingRemoteAddresss; + + private IDisposable jT808ConfigurationOptionsMonitorDisposable; + + public JT808RemoteAddressTransmitConfigurationService( + IOptionsMonitor jT808ConfigurationOptionsMonitor) + { + this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; + ForwardingRemoteAddresss = new ConcurrentBag(); + InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress); + //OnChange 源码多播委托 + jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options => + { + InitForwardingRemoteAddress(options.ForwardingRemoteAddress); + }); + } + + private void InitForwardingRemoteAddress(List jT808ClientConfigurations) + { + if (jT808ClientConfigurations != null && jT808ClientConfigurations.Count > 0) + { + foreach (var item in jT808ClientConfigurations) + { + string host = item.EndPoint.ToString(); + if (!ForwardingRemoteAddresss.Contains(host)) + { + ForwardingRemoteAddresss.Add(host); + } + } + } + } + + public bool Contains(EndPoint endPoint) + { + return ForwardingRemoteAddresss.Contains(endPoint.ToString()); + } + + public JT808ResultDto Add(JT808IPAddressDto jT808IPAddressDto) + { + string host = jT808IPAddressDto.EndPoint.ToString(); + if (!ForwardingRemoteAddresss.Contains(host)) + { + ForwardingRemoteAddresss.Add(host); + } + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = true }; + } + + public JT808ResultDto Remove(JT808IPAddressDto jT808IPAddressDto) + { + string host = jT808IPAddressDto.EndPoint.ToString(); + if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null && + jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== jT808IPAddressDto.ToString())) + { + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" }; + } + else + { + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.TryTake(out var temp) }; + } + } + + public JT808ResultDto> GetAll() + { + return new JT808ResultDto>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.ToList() }; + } + + public void Dispose() + { + jT808ConfigurationOptionsMonitorDisposable.Dispose(); + } + } +} diff --git a/src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs b/src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs index a4b558c..37a062a 100644 --- a/src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs +++ b/src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs @@ -35,16 +35,16 @@ namespace JT808.DotNetty.Internal LastActiveTime = s.LastActiveTime, StartTime = s.StartTime, TerminalPhoneNo = s.TerminalPhoneNo, - WebApiPort = jT808Configuration.WebAPIPort, + WebApiPort = jT808Configuration.WebApiPort, LoaclAddressIP = s.Channel.LocalAddress.ToString(), RemoteAddressIP = s.Channel.RemoteAddress.ToString(), }).ToList(); - resultDto.Code = 200; + resultDto.Code = JT808ResultCode.Ok; } catch (Exception ex) { resultDto.Data = null; - resultDto.Code = 500; + resultDto.Code = JT808ResultCode.Error; resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); } return resultDto; @@ -60,19 +60,19 @@ namespace JT808.DotNetty.Internal { session.Channel.CloseAsync(); } - resultDto.Code = 200; + resultDto.Code = JT808ResultCode.Ok; resultDto.Data = true; } catch (AggregateException ex) { resultDto.Data = false; - resultDto.Code = 500; + resultDto.Code = JT808ResultCode.Error; resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); } catch (Exception ex) { resultDto.Data = false; - resultDto.Code = 500; + resultDto.Code = JT808ResultCode.Error; resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); } return resultDto; @@ -88,7 +88,7 @@ namespace JT808.DotNetty.Internal { session.Channel.CloseAsync(); } - resultDto.Code = 200; + resultDto.Code = JT808ResultCode.Ok; resultDto.Data = true; } catch (AggregateException ex) @@ -100,7 +100,7 @@ namespace JT808.DotNetty.Internal catch (Exception ex) { resultDto.Data = false; - resultDto.Code = 500; + resultDto.Code = JT808ResultCode.Error; resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); } return resultDto; diff --git a/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs b/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs new file mode 100644 index 0000000..ccdcd2b --- /dev/null +++ b/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs @@ -0,0 +1,300 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Dtos; +using JT808.DotNetty.Handlers; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Internal +{ + /// + /// 原包分发器通道服务 + /// + internal class JT808SourcePackageChannelService:IDisposable + { + private readonly IOptionsMonitor jT808ConfigurationOptionsMonitor; + + internal readonly ConcurrentDictionary channels; + + private readonly ILogger logger; + + private readonly MultithreadEventLoopGroup group; + + internal readonly ILoggerFactory LoggerFactory; + + internal readonly Bootstrap bootstrap; + + private IDisposable jT808ConfigurationOptionsMonitorDisposable; + + public JT808SourcePackageChannelService( + ILoggerFactory loggerFactory, + IOptionsMonitor jT808ConfigurationOptionsMonitor) + { + this.LoggerFactory = loggerFactory; + this.logger = loggerFactory.CreateLogger(); + this.channels = new ConcurrentDictionary(); + this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; + this.group = new MultithreadEventLoopGroup(); + this.bootstrap = new Bootstrap(); + jT808ConfigurationOptionsMonitorDisposable = jT808ConfigurationOptionsMonitor.OnChange(options => + { + List chgRemoteServers = new List(); + if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) + { + chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; + } + DelRemoteServsers(chgRemoteServers); + AddRemoteServsers(chgRemoteServers); + }); + StartAsync(); + } + + /// + /// 下发数据 + /// + /// + /// + public Task SendAsync(byte[] data) + { + foreach (var item in channels) + { + try + { + if (item.Value.Open && item.Value.Active) + { + item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + } + else + { + logger.LogInformation($"{item} link closed."); + } + } + catch (AggregateException ex) + { + logger.LogError(ex, $"{item} Send Data Error."); + } + catch (Exception ex) + { + logger.LogError(ex, $"{item} Send Data Error."); + } + } + return Task.CompletedTask; + } + + /// + /// 获取通道信息集合 + /// + /// + public JT808ResultDto> GetAll() + { + JT808ResultDto> jT808ResultDto = new JT808ResultDto>(); + jT808ResultDto.Data = new List(); + jT808ResultDto.Code = JT808ResultCode.Ok; + foreach (var item in channels) + { + JT808SourcePackageChannelInfoDto jT808SourcePackageChannelInfoDto = new JT808SourcePackageChannelInfoDto(); + jT808SourcePackageChannelInfoDto.Active = item.Value.Active; + jT808SourcePackageChannelInfoDto.Open = item.Value.Open; + jT808SourcePackageChannelInfoDto.Registered = item.Value.Registered; + jT808SourcePackageChannelInfoDto.LocalAddress = item.Value.LocalAddress.ToString(); + jT808SourcePackageChannelInfoDto.RemoteAddress = item.Value.RemoteAddress.ToString(); + jT808ResultDto.Data.Add(jT808SourcePackageChannelInfoDto); + } + return jT808ResultDto; + } + + /// + /// 添加地址 + /// + /// + public async Task> Add(JT808IPAddressDto jT808IPAddressDto) + { + JT808ResultDto jT808ResultDto = new JT808ResultDto(); + jT808ResultDto.Code= JT808ResultCode.Ok; + jT808ResultDto.Data = true; + if (!channels.ContainsKey(jT808IPAddressDto.EndPoint)) + { + try + { + IChannel clientChannel = await bootstrap.ConnectAsync(jT808IPAddressDto.EndPoint); + channels.TryAdd(jT808IPAddressDto.EndPoint, clientChannel); + } + catch (AggregateException ex) + { + jT808ResultDto.Data = false; + jT808ResultDto.Code = JT808ResultCode.Error; + jT808ResultDto.Message = JsonConvert.SerializeObject(ex); + } + catch (Exception ex) + { + jT808ResultDto.Data = false; + jT808ResultDto.Code= JT808ResultCode.Error; + jT808ResultDto.Message = JsonConvert.SerializeObject(ex); + } + } + return jT808ResultDto; + } + + /// + /// 删除地址 + /// + /// + public async Task> Remove(JT808IPAddressDto jT808IPAddressDto) + { + JT808ResultDto jT808ResultDto = new JT808ResultDto(); + jT808ResultDto.Code = JT808ResultCode.Ok; + jT808ResultDto.Data = true; + + if(jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations!=null && + jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Any(a=>a.EndPoint.ToString()== jT808IPAddressDto.EndPoint.ToString()) + ) + { + jT808ResultDto.Data = false; + jT808ResultDto.Message = "不能删除服务器配置的地址"; + } + else + { + if (channels.TryRemove(jT808IPAddressDto.EndPoint, out var channel)) + { + try + { + await channel.CloseAsync(); + } + catch (AggregateException ex) + { + jT808ResultDto.Data = false; + jT808ResultDto.Code = JT808ResultCode.Error; + jT808ResultDto.Message = JsonConvert.SerializeObject(ex); + } + catch (Exception ex) + { + jT808ResultDto.Data = false; + jT808ResultDto.Code = JT808ResultCode.Error; + jT808ResultDto.Message = JsonConvert.SerializeObject(ex); + } + } + } + return jT808ResultDto; + } + + private void StartAsync() + { + bootstrap + .Group(group) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Handler(new ActionChannelInitializer(channel => + { + channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this)); + })); + jT808ConfigurationOptionsMonitor.OnChange(options => + { + List chgRemoteServers = new List(); + if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) + { + chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; + } + DelRemoteServsers(chgRemoteServers); + AddRemoteServsers(chgRemoteServers); + }); + if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && + jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) + { + foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations) + { + try + { + Task.Run(async () => + { + IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint); + channels.TryAdd(item.EndPoint, clientChannel); + logger.LogInformation($"init remote link {item.EndPoint.ToString()}."); + }); + } + catch (Exception ex) + { + logger.LogError(ex, $"there is an exception in sending data {item}."); + } + } + } + } + + private Task StopAsync() + { + foreach (var channel in channels) + { + try + { + channel.Value.CloseAsync(); + } + catch + { + + } + } + group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan); + return Task.CompletedTask; + } + + /// + /// 动态删除远程服务器 + /// + /// + private void DelRemoteServsers(List chgRemoteServers) + { + var delChannels = channels.Keys.Except(chgRemoteServers.Select(s => s.EndPoint)).ToList(); + foreach (var item in delChannels) + { + try + { + channels.TryRemove(item, out var channel); + channel.CloseAsync(); + } + catch + { + + } + } + } + + /// + /// 动态添加远程服务器 + /// + /// + /// + private async void AddRemoteServsers(List chgRemoteServers) + { + var addChannels = chgRemoteServers.Select(s => s.EndPoint).Except(channels.Keys).ToList(); + foreach (var item in addChannels) + { + try + { + IChannel clientChannel = await bootstrap.ConnectAsync(item); + channels.TryAdd(item, clientChannel); + logger.LogInformation($"link to the remote server after the change {item}."); + } + catch (Exception ex) + { + logger.LogError(ex, $"reconnect the remote server after the exception changes {item}."); + } + } + } + + public void Dispose() + { + jT808ConfigurationOptionsMonitorDisposable.Dispose(); + StopAsync(); + } + } +} diff --git a/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs b/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs index b2cc6bc..2c106e5 100644 --- a/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs +++ b/src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs @@ -20,155 +20,20 @@ using System.Threading.Tasks; namespace JT808.DotNetty.Internal { /// - /// 源包分发器默认实现 + /// 原包分发器默认实现 /// - internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher, IDisposable + internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher { - private readonly MultithreadEventLoopGroup group = new MultithreadEventLoopGroup(); - internal readonly Bootstrap bootstrap = new Bootstrap(); - internal readonly ConcurrentDictionary channels = new ConcurrentDictionary(); - private readonly ILogger logger; - private IOptionsMonitor jT808ConfigurationOptionsMonitor; - internal readonly ILoggerFactory loggerFactory; + private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService; - public JT808SourcePackageDispatcherDefaultImpl(ILoggerFactory loggerFactory, - IOptionsMonitor jT808ConfigurationOptionsMonitor) + public JT808SourcePackageDispatcherDefaultImpl(JT808SourcePackageChannelService jT808SourcePackageChannelService) { - this.loggerFactory = loggerFactory; - this.logger = loggerFactory.CreateLogger(); - this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; - StartAsync(); + this.jT808SourcePackageChannelService = jT808SourcePackageChannelService; } public async Task SendAsync(byte[] data) { - foreach (var item in channels) - { - try - { - if (item.Value.Open && item.Value.Active) - { - await item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); - } - else - { - logger.LogInformation($"{item} link closed."); - } - } - catch (Exception ex) - { - logger.LogError(ex,$"{item} Send Data Error."); - } - } - await Task.CompletedTask; - } - - public void StartAsync() - { - bootstrap - .Group(group) - .Channel() - .Option(ChannelOption.TcpNodelay, true) - .Handler(new ActionChannelInitializer(channel => - { - channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this)); - })); - jT808ConfigurationOptionsMonitor.OnChange(options => - { - List chgRemoteServers = new List(); - if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) - { - chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; - } - DelRemoteServsers(chgRemoteServers); - AddRemoteServsers(chgRemoteServers); - }); - if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && - jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) - { - foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations) - { - try - { - Task.Run(async () => - { - IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint); - channels.TryAdd(item.EndPoint, clientChannel); - logger.LogInformation($"init remote link {item.EndPoint.ToString()}."); - }); - } - catch (Exception ex) - { - logger.LogError(ex,$"there is an exception in sending data {item}."); - } - } - } - } - - public Task StopAsync() - { - foreach (var channel in channels) - { - try - { - channel.Value.CloseAsync(); - } - catch - { - - } - } - group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan); - return Task.CompletedTask; - } - - /// - /// 动态删除远程服务器 - /// - /// - private void DelRemoteServsers(List chgRemoteServers) - { - var delChannels = channels.Keys.Except(chgRemoteServers.Select(s=>s.EndPoint)).ToList(); - foreach (var item in delChannels) - { - try - { - channels.TryRemove(item, out var channel); - channel.CloseAsync(); - } - catch - { - - } - } - } - - /// - /// 动态添加远程服务器 - /// - /// - /// - private async void AddRemoteServsers(List chgRemoteServers) - { - var addChannels = chgRemoteServers.Select(s=>s.EndPoint).Except(channels.Keys).ToList(); - foreach (var item in addChannels) - { - try - { - IChannel clientChannel =await bootstrap.ConnectAsync(item); - channels.TryAdd(item, clientChannel); - logger.LogInformation($"link to the remote server after the change {item}."); - } - catch (Exception ex) - { - logger.LogError(ex,$"reconnect the remote server after the exception changes {item}."); - } - } - } - - public void Dispose() - { - StopAsync(); + await jT808SourcePackageChannelService.SendAsync(data); } } } diff --git a/src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs b/src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs index 79fa3d4..d6c8075 100644 --- a/src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs +++ b/src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs @@ -25,12 +25,12 @@ namespace JT808.DotNetty.Internal if (session != null) { session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); - resultDto.Code = 200; + resultDto.Code = JT808ResultCode.Ok; resultDto.Data = true; } else { - resultDto.Code = 200; + resultDto.Code = JT808ResultCode.Ok; resultDto.Data = false; resultDto.Message = "not session"; } @@ -38,7 +38,7 @@ namespace JT808.DotNetty.Internal catch (Exception ex) { resultDto.Data = false; - resultDto.Code = 500; + resultDto.Code = JT808ResultCode.Error; resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); } return resultDto; diff --git a/src/JT808.DotNetty/JT808WebAPIService.cs b/src/JT808.DotNetty/Internal/JT808WebAPIService.cs similarity index 59% rename from src/JT808.DotNetty/JT808WebAPIService.cs rename to src/JT808.DotNetty/Internal/JT808WebAPIService.cs index 05e0ce8..11a48b9 100644 --- a/src/JT808.DotNetty/JT808WebAPIService.cs +++ b/src/JT808.DotNetty/Internal/JT808WebAPIService.cs @@ -6,7 +6,7 @@ using System; using System.Collections.Generic; using System.Text; -namespace JT808.DotNetty +namespace JT808.DotNetty.Internal { internal class JT808WebAPIService { @@ -16,25 +16,39 @@ namespace JT808.DotNetty private const string sessionRoutePrefix = "Session"; + private const string sourcePackagePrefix = "SourcePackage"; + private readonly IJT808SessionService jT808SessionService; private readonly IJT808UnificationSendService jT808UnificationSendService; + private readonly JT808AtomicCounterService jT808AtomicCounterService; + + private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService; + /// /// 初始化消息处理业务 /// - public JT808WebAPIService( + public JT808WebAPIService( + JT808AtomicCounterService jT808AtomicCounterService, + JT808SourcePackageChannelService jT808SourcePackageChannelService, IJT808SessionService jT808SessionService, IJT808UnificationSendService jT808UnificationSendService) { + this.jT808AtomicCounterService = jT808AtomicCounterService; + this.jT808SourcePackageChannelService = jT808SourcePackageChannelService; this.jT808SessionService = jT808SessionService; this.jT808UnificationSendService = jT808UnificationSendService; HandlerDict = new Dictionary> { {$"{RouteTablePrefix}/UnificationSend", UnificationSend}, - {$"{RouteTablePrefix}/{sessionRoutePrefix}/GetAll", GetAll}, + {$"{RouteTablePrefix}/{sessionRoutePrefix}/GetAll", GetSessionAll}, {$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByChannelId", RemoveByChannelId}, {$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByTerminalPhoneNo", RemoveByTerminalPhoneNo}, + {$"{RouteTablePrefix}/GetAtomicCounter", GetAtomicCounter}, + {$"{RouteTablePrefix}/{sourcePackagePrefix}/Add", AddSourcePackageAddress}, + {$"{RouteTablePrefix}/{sourcePackagePrefix}/Remove", RemoveSourcePackageAddress}, + {$"{RouteTablePrefix}/{sourcePackagePrefix}/GetAll", GetSourcePackageAll} }; } @@ -59,7 +73,7 @@ namespace JT808.DotNetty /// /// /// - public JT808HttpResponse GetAll(JT808HttpRequest request) + public JT808HttpResponse GetSessionAll(JT808HttpRequest request) { var result = jT808SessionService.GetAll(); return CreateJT808HttpResponse(result); @@ -95,6 +109,59 @@ namespace JT808.DotNetty return CreateJT808HttpResponse(result); } + /// + /// 获取包计数器 + /// + /// + /// + public JT808HttpResponse GetAtomicCounter(JT808HttpRequest request) + { + JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto(); + jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount; + jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount; + return CreateJT808HttpResponse(jT808AtomicCounterDto); + } + + /// + /// 添加原包转发地址 + /// + /// + /// + public JT808HttpResponse AddSourcePackageAddress(JT808HttpRequest request) + { + if (string.IsNullOrEmpty(request.Json)) + { + return EmptyHttpResponse(); + } + JT808IPAddressDto jT808IPAddressDto = JsonConvert.DeserializeObject(request.Json); + return CreateJT808HttpResponse(jT808SourcePackageChannelService.Add(jT808IPAddressDto).Result); + } + + /// + /// 删除原包转发地址(不能删除在网关服务器配置文件配的地址) + /// + /// + /// + public JT808HttpResponse RemoveSourcePackageAddress(JT808HttpRequest request) + { + if (string.IsNullOrEmpty(request.Json)) + { + return EmptyHttpResponse(); + } + JT808IPAddressDto jT808IPAddressDto = JsonConvert.DeserializeObject(request.Json); + return CreateJT808HttpResponse(jT808SourcePackageChannelService.Remove(jT808IPAddressDto).Result); + } + + /// + /// 获取原包信息集合 + /// + /// + /// + public JT808HttpResponse GetSourcePackageAll(JT808HttpRequest request) + { + return CreateJT808HttpResponse(jT808SourcePackageChannelService.GetAll()); + } + private JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject) { byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject)); @@ -114,8 +181,8 @@ namespace JT808.DotNetty { byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() { - Code=201, - Message="内容为空", + Code = JT808ResultCode.Empty, + Message ="内容为空", Data="Content Empty" })); return new JT808HttpResponse(json); @@ -125,7 +192,7 @@ namespace JT808.DotNetty { byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() { - Code=404, + Code= JT808ResultCode.NotFound, Message="没有该服务", Data= "没有该服务" })); @@ -136,7 +203,7 @@ namespace JT808.DotNetty { byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() { - Code = 500, + Code = JT808ResultCode.Error, Message = JsonConvert.SerializeObject(ex), Data= ex.Message })); diff --git a/src/JT808.DotNetty/JT808DotnettyExtensions.cs b/src/JT808.DotNetty/JT808DotnettyExtensions.cs index ee833dc..50716e9 100644 --- a/src/JT808.DotNetty/JT808DotnettyExtensions.cs +++ b/src/JT808.DotNetty/JT808DotnettyExtensions.cs @@ -38,6 +38,7 @@ namespace JT808.DotNetty services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddScoped(); services.TryAddScoped(); diff --git a/src/JT808.DotNetty/JT808WebAPIServerHost.cs b/src/JT808.DotNetty/JT808WebAPIServerHost.cs index 7aea178..c5b1cb1 100644 --- a/src/JT808.DotNetty/JT808WebAPIServerHost.cs +++ b/src/JT808.DotNetty/JT808WebAPIServerHost.cs @@ -66,8 +66,8 @@ namespace JT808.DotNetty pipeline.AddLast("http_jt808webapihandler", scope.ServiceProvider.GetRequiredService()); } })); - logger.LogInformation($"WebAPI Server start at {IPAddress.Any}:{configuration.WebAPIPort}."); - return bootstrap.BindAsync(configuration.WebAPIPort).ContinueWith(i => bootstrapChannel = i.Result); + logger.LogInformation($"WebAPI Server start at {IPAddress.Any}:{configuration.WebApiPort}."); + return bootstrap.BindAsync(configuration.WebApiPort).ContinueWith(i => bootstrapChannel = i.Result); } public async Task StopAsync(CancellationToken cancellationToken)