diff --git a/src/JT808.DotNetty.Hosting/Program.cs b/src/JT808.DotNetty.Hosting/Program.cs index 36280ac..cf8095d 100644 --- a/src/JT808.DotNetty.Hosting/Program.cs +++ b/src/JT808.DotNetty.Hosting/Program.cs @@ -24,7 +24,7 @@ namespace JT808.DotNetty.Hosting .ConfigureLogging((context, logging) => { logging.AddConsole(); - logging.SetMinimumLevel(LogLevel.Error); + logging.SetMinimumLevel(LogLevel.Debug); }) .ConfigureServices((hostContext, services) => { diff --git a/src/JT808.DotNetty.Test/Internal/JT808AtomicCounterServiceTest.cs b/src/JT808.DotNetty.Test/Internal/JT808AtomicCounterServiceTest.cs new file mode 100644 index 0000000..fce3f8a --- /dev/null +++ b/src/JT808.DotNetty.Test/Internal/JT808AtomicCounterServiceTest.cs @@ -0,0 +1,90 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Dtos; +using JT808.DotNetty.Metadata; +using JT808.Protocol; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Http; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace JT808.DotNetty.Test.Internal +{ + public class JT808AtomicCounterServiceTest: TestBase + { + private IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); + + public JT808SimpleTcpClient SimpleTcpClient; + + public JT808AtomicCounterServiceTest() + { + SimpleTcpClient = new JT808SimpleTcpClient(endPoint); + } + + [Fact] + public void Test1() + { + JT808AtomicCounter jT808AtomicCounter = new JT808AtomicCounter(); + JT808AtomicCounter jT808AtomicCounter1 = new JT808AtomicCounter(); + Parallel.For(0, 1000, (i) => + { + jT808AtomicCounter.Increment(); + }); + Assert.Equal(1000, jT808AtomicCounter.Count); + Parallel.For(0, 1000, (i) => + { + jT808AtomicCounter1.Increment(); + }); + Assert.Equal(1000, jT808AtomicCounter1.Count); + } + + [Fact] + public void Test2() + { + // 心跳会话包 + JT808Package jT808Package1 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789001"); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package1)); + + // 心跳会话包 + JT808Package jT808Package2 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789002"); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package2)); + + // 心跳会话包 + JT808Package jT808Package3 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789003"); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package3)); + + // 心跳会话包 + JT808Package jT808Package4 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789004"); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package4)); + + // 心跳会话包 + JT808Package jT808Package5 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789005"); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package5)); + + // 异步的需要延时下 + Thread.Sleep(1000); + + HttpClient httpClient = new HttpClient(); + // 调用内置的http服务接收文本信息下发 + var result = httpClient.GetAsync("http://127.0.0.1:828/jt808api/GetAtomicCounter").Result; + string content = result.Content.ReadAsStringAsync().Result; + JT808ResultDto jt808Result = JsonConvert.DeserializeObject>(content); + Assert.Equal(200, jt808Result.Code); + Assert.Equal(5,jt808Result.Data.MsgSuccessCount); + Assert.Equal(0, jt808Result.Data.MsgFailCount); + SimpleTcpClient.Down(); + } + } +} diff --git a/src/JT808.DotNetty.Test/Internal/JT808RemoteAddressTransmitConfigurationServiceTest.cs b/src/JT808.DotNetty.Test/Internal/JT808RemoteAddressTransmitConfigurationServiceTest.cs new file mode 100644 index 0000000..e6665b2 --- /dev/null +++ b/src/JT808.DotNetty.Test/Internal/JT808RemoteAddressTransmitConfigurationServiceTest.cs @@ -0,0 +1,104 @@ +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Dtos; +using JT808.DotNetty.Internal; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; + +namespace JT808.DotNetty.Test.Internal +{ + public class JT808RemoteAddressTransmitConfigurationServiceTest + { + private JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService; + + public JT808RemoteAddressTransmitConfigurationServiceTest() + { + var serverHostBuilder = new HostBuilder() + .ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + services.Configure(hostContext.Configuration.GetSection("JT808Configuration")); + services.AddSingleton(); + }); + var serviceProvider = serverHostBuilder.Build().Services; + jT808RemoteAddressTransmitConfigurationService = serviceProvider.GetService(); + jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12345 + }); + jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12346 + }); + jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12347 + }); + jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12348 + }); + } + + [Fact] + public void Test1() + { + Assert.True(jT808RemoteAddressTransmitConfigurationService.ContainsKey(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12348 + }.EndPoint)); + } + + [Fact] + public void Test2() + { + var result = jT808RemoteAddressTransmitConfigurationService.GetAll(); + } + + [Fact] + public void Test3() + { + var ip1 = new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 12349 + }; + var result1= jT808RemoteAddressTransmitConfigurationService.Add(ip1); + Assert.Equal(JT808ResultCode.Ok, result1.Code); + Assert.True(result1.Data); + var result2 = jT808RemoteAddressTransmitConfigurationService.Remove(ip1); + Assert.Equal(JT808ResultCode.Ok, result2.Code); + Assert.True(result2.Data); + } + + [Fact] + public void Test4() + { + var configIp = new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 6561 + }; + var result2 = jT808RemoteAddressTransmitConfigurationService.Remove(configIp); + Assert.Equal(JT808ResultCode.Ok, result2.Code); + Assert.False(result2.Data); + Assert.Equal("不能删除服务器配置的地址", result2.Message); + } + } +} diff --git a/src/JT808.DotNetty.Test/Internal/JT808SessionServiceDefaultImplTest.cs b/src/JT808.DotNetty.Test/Internal/JT808SessionServiceDefaultImplTest.cs new file mode 100644 index 0000000..801dab8 --- /dev/null +++ b/src/JT808.DotNetty.Test/Internal/JT808SessionServiceDefaultImplTest.cs @@ -0,0 +1,66 @@ +using JT808.DotNetty.Internal; +using System; +using System.Linq; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using System.Net; +using JT808.Protocol; +using JT808.Protocol.Extensions; +using System.Threading; +using Xunit; +using JT808.DotNetty.Interfaces; + +namespace JT808.DotNetty.Test.Internal +{ + public class JT808SessionServiceDefaultImplTest : TestBase + { + static IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); + + [Fact] + public void Test1() + { + IJT808SessionService jT808SessionServiceDefaultImpl = ServiceProvider.GetService(); + JT808SimpleTcpClient SimpleTcpClient1 = new JT808SimpleTcpClient(endPoint); + JT808SimpleTcpClient SimpleTcpClient2 = new JT808SimpleTcpClient(endPoint); + JT808SimpleTcpClient SimpleTcpClient3 = new JT808SimpleTcpClient(endPoint); + JT808SimpleTcpClient SimpleTcpClient4 = new JT808SimpleTcpClient(endPoint); + JT808SimpleTcpClient SimpleTcpClient5 = new JT808SimpleTcpClient(endPoint); + // 心跳会话包 + JT808Package jT808Package1 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789001"); + SimpleTcpClient1.WriteAsync(JT808Serializer.Serialize(jT808Package1)); + + // 心跳会话包 + JT808Package jT808Package2 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789002"); + SimpleTcpClient2.WriteAsync(JT808Serializer.Serialize(jT808Package2)); + + // 心跳会话包 + JT808Package jT808Package3 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789003"); + SimpleTcpClient3.WriteAsync(JT808Serializer.Serialize(jT808Package3)); + + // 心跳会话包 + JT808Package jT808Package4 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789004"); + SimpleTcpClient4.WriteAsync(JT808Serializer.Serialize(jT808Package4)); + + // 心跳会话包 + JT808Package jT808Package5 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789005"); + SimpleTcpClient5.WriteAsync(JT808Serializer.Serialize(jT808Package5)); + Thread.Sleep(1000); + var result = jT808SessionServiceDefaultImpl.GetAll(); + + var info5 = result.Data.FirstOrDefault(f => f.TerminalPhoneNo == "123456789005"); + + var remove5 = jT808SessionServiceDefaultImpl.RemoveByChannelId(info5.ChannelId); + + var result1 = jT808SessionServiceDefaultImpl.GetAll(); + + Thread.Sleep(10000); + + SimpleTcpClient1.Down(); + SimpleTcpClient2.Down(); + SimpleTcpClient3.Down(); + SimpleTcpClient4.Down(); + SimpleTcpClient5.Down(); + } + } +} diff --git a/src/JT808.DotNetty.Test/Internal/JT808SourcePackageChannelServiceTest.cs b/src/JT808.DotNetty.Test/Internal/JT808SourcePackageChannelServiceTest.cs new file mode 100644 index 0000000..ff710d1 --- /dev/null +++ b/src/JT808.DotNetty.Test/Internal/JT808SourcePackageChannelServiceTest.cs @@ -0,0 +1,136 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Internal; +using JT808.Protocol; +using JT808.Protocol.Extensions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Net; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace JT808.DotNetty.Test.Internal +{ + public class JT808SourcePackageChannelServiceTest:TestBase + { + private JT808SourcePackageChannelService jT808SourcePackageChannelService; + + private IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); + + public JT808SimpleTcpClient SimpleTcpClient; + + /// + /// 需要使用 SocketTool 创建tcp服务器 + /// + public JT808SourcePackageChannelServiceTest() + { + SimpleTcpClient = new JT808SimpleTcpClient(endPoint, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6555)); + //作为设备5秒上传 + Task.Run(() => + { + Random random = new Random(); + while (true) + { + + JT808Package jT808Package = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("12345678900"+ random.Next(0,2).ToString()); + SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package)); + Thread.Sleep(1000); + } + }); + + // 作为源包转发服务端 + DispatcherEventLoopGroup bossGroup = new DispatcherEventLoopGroup(); + WorkerEventLoopGroup workerGroup = new WorkerEventLoopGroup(bossGroup, 1); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + bootstrap + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + bootstrap.BindAsync(6655); + + DispatcherEventLoopGroup bossGroup1 = new DispatcherEventLoopGroup(); + WorkerEventLoopGroup workerGroup1 = new WorkerEventLoopGroup(bossGroup1, 1); + ServerBootstrap bootstrap1 = new ServerBootstrap(); + bootstrap1.Group(bossGroup1, workerGroup1); + bootstrap1.Channel(); + bootstrap1 + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + bootstrap1.BindAsync(6656); + } + + [Fact] + public void Test1() + { + //预热 + Thread.Sleep(3000); + + jT808SourcePackageChannelService = ServiceProvider.GetService(); + var result = jT808SourcePackageChannelService.GetAll(); + + //创建服务 + DispatcherEventLoopGroup bossGroup2 = new DispatcherEventLoopGroup(); + WorkerEventLoopGroup workerGroup2 = new WorkerEventLoopGroup(bossGroup2, 1); + ServerBootstrap bootstrap2 = new ServerBootstrap(); + bootstrap2.Group(bossGroup2, workerGroup2); + bootstrap2.Channel(); + bootstrap2 + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + bootstrap2.BindAsync(6522); + + //添加服务 + var addResult = jT808SourcePackageChannelService.Add(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 6522 + }).Result; + + Thread.Sleep(3000); + + var result1 = jT808SourcePackageChannelService.GetAll(); + + //删除 + var result2 = jT808SourcePackageChannelService.Remove(new Dtos.JT808IPAddressDto + { + Host = "127.0.0.1", + Port = 6522 + }).Result; + //[::ffff:127.0.0.1]:13196 + var result3 = jT808SourcePackageChannelService.GetAll(); + } + + + } +} diff --git a/src/JT808.DotNetty.Test/Internal/JT808SourcePackageDispatcherDefaultImplTest.cs b/src/JT808.DotNetty.Test/Internal/JT808SourcePackageDispatcherDefaultImplTest.cs index d81ba8e..55ef405 100644 --- a/src/JT808.DotNetty.Test/Internal/JT808SourcePackageDispatcherDefaultImplTest.cs +++ b/src/JT808.DotNetty.Test/Internal/JT808SourcePackageDispatcherDefaultImplTest.cs @@ -23,7 +23,7 @@ namespace JT808.DotNetty.Test.Internal public JT808SourcePackageDispatcherDefaultImplTest() { - SimpleTcpClient = new JT808SimpleTcpClient(endPoint); + SimpleTcpClient = new JT808SimpleTcpClient(endPoint, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6561)); } [Fact] @@ -58,5 +58,15 @@ namespace JT808.DotNetty.Test.Internal Thread.Sleep(10000); SimpleTcpClient.Down(); } + + [Fact] + public void Test2() + { + //原包转发 不下发 - 貔貅 + byte[] bytes = "7E 02 00 00 26 12 34 56 78 90 12 00 7D 02 00 00 00 01 00 00 00 02 00 BA 7F 0E 07 E4 F1 1C 00 28 00 3C 00 00 18 10 15 10 10 10 01 04 00 00 00 64 02 02 00 7D 01 13 7E".ToHexBytes(); + SimpleTcpClient.WriteAsync(bytes); + Thread.Sleep(3000); + SimpleTcpClient.Down(); + } } } diff --git a/src/JT808.DotNetty.Test/TestBase.cs b/src/JT808.DotNetty.Test/TestBase.cs index aadb76c..e30f4eb 100644 --- a/src/JT808.DotNetty.Test/TestBase.cs +++ b/src/JT808.DotNetty.Test/TestBase.cs @@ -16,6 +16,8 @@ namespace JT808.DotNetty.Test { public class TestBase:IDisposable { + public static IServiceProvider ServiceProvider; + static TestBase() { var serverHostBuilder = new HostBuilder() @@ -30,7 +32,10 @@ namespace JT808.DotNetty.Test services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); }) .UseJT808Host(); - serverHostBuilder.RunConsoleAsync(); + var build=serverHostBuilder.Build(); + build.Start(); + ServiceProvider = build.Services; + } public virtual void Dispose() diff --git a/src/JT808.DotNetty.Test/appsettings.json b/src/JT808.DotNetty.Test/appsettings.json index 45b9b82..7d0edb3 100644 --- a/src/JT808.DotNetty.Test/appsettings.json +++ b/src/JT808.DotNetty.Test/appsettings.json @@ -15,7 +15,24 @@ "JT808Configuration": { "Port": 6565, "SourcePackageDispatcherClientConfigurations": [ - {"Host": "127.0.0.1","Port": 6655} + { + "Host": "127.0.0.1", + "Port": 6655 + }, + { + "Host": "127.0.0.1", + "Port": 6656 + } + ], + "ForwardingRemoteAddress": [ + { + "Host": "127.0.0.1", + "Port": 6561 + }, + { + "Host": "127.0.0.1", + "Port": 6562 + } ] } } diff --git a/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs index 662ebff..8a03acd 100644 --- a/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs +++ b/src/JT808.DotNetty/Codecs/JT808ClientDecoder.cs @@ -8,6 +8,7 @@ using System.Text; using JT808.Protocol; using JT808.DotNetty.Internal; using JT808.DotNetty.Interfaces; +using JT808.DotNetty.Metadata; namespace JT808.DotNetty.Codecs { @@ -18,8 +19,10 @@ namespace JT808.DotNetty.Codecs { private static readonly ILogger logger=new LoggerFactory().CreateLogger(); - private static readonly JT808AtomicCounterService jT808AtomicCounterService=new JT808AtomicCounterService (); - + private static readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); + + private static readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) { byte[] buffer = new byte[input.Capacity + 2]; @@ -29,28 +32,28 @@ namespace JT808.DotNetty.Codecs buffer[0] = JT808Package.BeginFlag; buffer[input.Capacity + 1] = JT808Package.EndFlag; JT808Package jT808Package = JT808Serializer.Deserialize(buffer); + MsgSuccessCounter.Increment(); output.Add(jT808Package); - jT808AtomicCounterService.MsgSuccessIncrement(); if (logger.IsEnabled(LogLevel.Debug)) { - logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); + logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString()); } } catch (JT808.Protocol.Exceptions.JT808Exception ex) { - jT808AtomicCounterService.MsgFailIncrement(); + MsgFailCounter.Increment(); if (logger.IsEnabled(LogLevel.Error)) { - logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); logger.LogError(ex, "accept msg<<<" + buffer); } } catch (Exception ex) { - jT808AtomicCounterService.MsgFailIncrement(); + MsgFailCounter.Increment(); if (logger.IsEnabled(LogLevel.Error)) { - logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); logger.LogError(ex, "accept msg<<<" + buffer); } } diff --git a/src/JT808.DotNetty/Codecs/JT808Decoder.cs b/src/JT808.DotNetty/Codecs/JT808Decoder.cs index d082643..293e624 100644 --- a/src/JT808.DotNetty/Codecs/JT808Decoder.cs +++ b/src/JT808.DotNetty/Codecs/JT808Decoder.cs @@ -42,8 +42,8 @@ namespace JT808.DotNetty.Codecs buffer[input.Capacity + 1] = JT808Package.EndFlag; jT808SourcePackageDispatcher?.SendAsync(buffer); JT808Package jT808Package = JT808Serializer.Deserialize(buffer); - output.Add(jT808Package); jT808AtomicCounterService.MsgSuccessIncrement(); + output.Add(jT808Package); if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); diff --git a/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs index b590767..8036792 100644 --- a/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs @@ -10,6 +10,9 @@ using System.Threading.Tasks; namespace JT808.DotNetty.Handlers { + /// + /// JT808服务通道处理程序 + /// internal class JT808ConnectionHandler : ChannelHandlerAdapter { private readonly ILogger logger; diff --git a/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs index 747b6b3..18ae0a0 100644 --- a/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs +++ b/src/JT808.DotNetty/Handlers/JT808ServerHandler.cs @@ -9,6 +9,9 @@ using JT808.DotNetty.Internal; namespace JT808.DotNetty.Handlers { + /// + /// JT808服务端处理程序 + /// internal class JT808ServerHandler : SimpleChannelInboundHandler { private readonly JT808MsgIdHandlerBase handler; @@ -39,7 +42,7 @@ namespace JT808.DotNetty.Handlers JT808Response jT808Package = handlerFunc(new JT808Request(msg)); if (jT808Package != null) { - if (!jT808RemoteAddressTransmitConfigurationService.Contains(ctx.Channel.RemoteAddress)) + if (!jT808RemoteAddressTransmitConfigurationService.ContainsKey(ctx.Channel.RemoteAddress)) { ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize))); } diff --git a/src/JT808.DotNetty/Internal/JT808AtomicCounterService.cs b/src/JT808.DotNetty/Internal/JT808AtomicCounterService.cs index f30de89..6cd3b62 100644 --- a/src/JT808.DotNetty/Internal/JT808AtomicCounterService.cs +++ b/src/JT808.DotNetty/Internal/JT808AtomicCounterService.cs @@ -14,6 +14,11 @@ namespace JT808.DotNetty.Internal private static readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); + public JT808AtomicCounterService() + { + + } + public long MsgSuccessIncrement() { return MsgSuccessCounter.Increment(); diff --git a/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs b/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs index 0292dea..a855162 100644 --- a/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs +++ b/src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs @@ -22,7 +22,7 @@ namespace JT808.DotNetty.Internal { private readonly IOptionsMonitor jT808ConfigurationOptionsMonitor; - private ConcurrentBag ForwardingRemoteAddresss; + private ConcurrentDictionary ForwardingRemoteAddresssDict; private IDisposable jT808ConfigurationOptionsMonitorDisposable; @@ -30,7 +30,7 @@ namespace JT808.DotNetty.Internal IOptionsMonitor jT808ConfigurationOptionsMonitor) { this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; - ForwardingRemoteAddresss = new ConcurrentBag(); + ForwardingRemoteAddresssDict = new ConcurrentDictionary(); InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress); //OnChange 源码多播委托 jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options => @@ -46,46 +46,39 @@ namespace JT808.DotNetty.Internal foreach (var item in jT808ClientConfigurations) { string host = item.EndPoint.ToString(); - if (!ForwardingRemoteAddresss.Contains(host)) - { - ForwardingRemoteAddresss.Add(host); - } + ForwardingRemoteAddresssDict.TryAdd(host, 0); } } } - public bool Contains(EndPoint endPoint) + public bool ContainsKey(EndPoint endPoint) { - return ForwardingRemoteAddresss.Contains(endPoint.ToString()); + return ForwardingRemoteAddresssDict.ContainsKey(endPoint.ToString()); } public JT808ResultDto Add(JT808IPAddressDto jT808IPAddressDto) { string host = jT808IPAddressDto.EndPoint.ToString(); - if (!ForwardingRemoteAddresss.Contains(host)) - { - ForwardingRemoteAddresss.Add(host); - } - return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = true }; + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryAdd(host,0) }; } public JT808ResultDto Remove(JT808IPAddressDto jT808IPAddressDto) { string host = jT808IPAddressDto.EndPoint.ToString(); if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null && - jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== jT808IPAddressDto.ToString())) + jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== host)) { return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" }; } else { - return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.TryTake(out var temp) }; + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryRemove(host,out var temp) }; } } public JT808ResultDto> GetAll() { - return new JT808ResultDto>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.ToList() }; + return new JT808ResultDto>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.Select(s=>s.Key).ToList() }; } public void Dispose() diff --git a/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs b/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs index ccdcd2b..7af1966 100644 --- a/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs +++ b/src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs @@ -198,16 +198,6 @@ namespace JT808.DotNetty.Internal { channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this)); })); - jT808ConfigurationOptionsMonitor.OnChange(options => - { - List chgRemoteServers = new List(); - if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) - { - chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; - } - DelRemoteServsers(chgRemoteServers); - AddRemoteServsers(chgRemoteServers); - }); if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) { diff --git a/src/JT808.DotNetty/Internal/JT808WebAPIService.cs b/src/JT808.DotNetty/Internal/JT808WebAPIService.cs index 11a48b9..762e105 100644 --- a/src/JT808.DotNetty/Internal/JT808WebAPIService.cs +++ b/src/JT808.DotNetty/Internal/JT808WebAPIService.cs @@ -8,6 +8,9 @@ using System.Text; namespace JT808.DotNetty.Internal { + /// + /// JT808 WebApi 业务服务 + /// internal class JT808WebAPIService { public Dictionary> HandlerDict { get; protected set; } @@ -116,10 +119,14 @@ namespace JT808.DotNetty.Internal /// public JT808HttpResponse GetAtomicCounter(JT808HttpRequest request) { - JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto(); - jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount; - jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount; - return CreateJT808HttpResponse(jT808AtomicCounterDto); + JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto(); + jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount; + jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount; + return CreateJT808HttpResponse(new JT808ResultDto + { + Code=JT808ResultCode.Ok, + Data= jT808AtomicCounterDto + }); } /// diff --git a/src/JT808.DotNetty/JT808.DotNetty.xml b/src/JT808.DotNetty/JT808.DotNetty.xml index f19c8ef..59a88d3 100644 --- a/src/JT808.DotNetty/JT808.DotNetty.xml +++ b/src/JT808.DotNetty/JT808.DotNetty.xml @@ -14,9 +14,9 @@ JT808解码 - + - WebAPI服务 + WebApi服务 默认828端口 @@ -25,6 +25,12 @@ 源包分发器配置 + + + + 包计数器服务 + + 通道Id @@ -60,6 +66,36 @@ 远程ip地址 + + + 原包通道信息 + + + + + 远程地址 + + + + + 本地地址 + + + + + 是否注册 + + + + + 是否活动 + + + + + 是否打开 + + 统一下发请求参数 @@ -146,24 +182,116 @@ 默认消息处理业务实现 - + + - 源包分发器默认实现 + 原包分发器通道服务 - + + + 下发数据 + + + + + + + 获取通道信息集合 + + + + + + 添加地址 + + + + + + 删除地址 + + + + 动态删除远程服务器 - + 动态添加远程服务器 + + + 原包分发器默认实现 + + + + + 初始化消息处理业务 + + + + + 统一下发信息 + + + + + + + 会话服务集合 + + + + + + + 会话服务-通过通道Id移除对应会话 + + + + + + + 会话服务-通过设备终端号移除对应会话 + + + + + + + 获取包计数器 + + + + + + + 添加原包转发地址 + + + + + + + 删除原包转发地址(不能删除在网关服务器配置文件配的地址) + + + + + + + 获取原包信息集合 + + + + 抽象消息处理业务 @@ -270,39 +398,6 @@ 集成一个webapi服务 - - - 初始化消息处理业务 - - - - - 统一下发信息 - - - - - - - 会话服务集合 - - - - - - - 会话服务-通过通道Id移除对应会话 - - - - - - - 会话服务-通过设备终端号移除对应会话 - - - - diff --git a/src/JT808.DotNetty/JT808DotnettyExtensions.cs b/src/JT808.DotNetty/JT808DotnettyExtensions.cs index 50716e9..0a8868c 100644 --- a/src/JT808.DotNetty/JT808DotnettyExtensions.cs +++ b/src/JT808.DotNetty/JT808DotnettyExtensions.cs @@ -37,6 +37,7 @@ namespace JT808.DotNetty services.Configure(hostContext.Configuration.GetSection("JT808Configuration")); services.TryAddSingleton(); services.TryAddSingleton(); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/JT808.DotNetty/JT808ServerHost.cs b/src/JT808.DotNetty/JT808ServerHost.cs index d2cdcfd..b93ec05 100644 --- a/src/JT808.DotNetty/JT808ServerHost.cs +++ b/src/JT808.DotNetty/JT808ServerHost.cs @@ -21,6 +21,9 @@ using System.Threading.Tasks; namespace JT808.DotNetty { + /// + /// JT808 网关服务 + /// internal class JT808ServerHost : IHostedService { private readonly IServiceProvider serviceProvider; diff --git a/src/JT808.DotNetty/JT808SessionManager.cs b/src/JT808.DotNetty/JT808SessionManager.cs index 9748b16..49f812f 100644 --- a/src/JT808.DotNetty/JT808SessionManager.cs +++ b/src/JT808.DotNetty/JT808SessionManager.cs @@ -7,6 +7,9 @@ using JT808.DotNetty.Metadata; namespace JT808.DotNetty { + /// + /// JT808会话管理 + /// public class JT808SessionManager { private readonly ILogger logger; diff --git a/src/JT808.DotNetty/JT808SimpleTcpClient.cs b/src/JT808.DotNetty/JT808SimpleTcpClient.cs index c2955f9..5ca8d96 100644 --- a/src/JT808.DotNetty/JT808SimpleTcpClient.cs +++ b/src/JT808.DotNetty/JT808SimpleTcpClient.cs @@ -5,6 +5,7 @@ using DotNetty.Transport.Channels; using DotNetty.Transport.Channels.Sockets; using DotNetty.Transport.Libuv; using JT808.DotNetty.Codecs; +using JT808.DotNetty.Configurations; using JT808.DotNetty.Handlers; using System; using System.Collections.Generic; @@ -40,7 +41,26 @@ namespace JT808.DotNetty Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); })); - clientChannel = cb.ConnectAsync(remoteAddress).Result; + clientChannel = cb.ConnectAsync(remoteAddress).Result; + } + + public JT808SimpleTcpClient(EndPoint remoteAddress, EndPoint localAddress) + { + clientBufferAllocator = new PooledByteBufferAllocator(); + clientGroup = new MultithreadEventLoopGroup(1); + cb = new Bootstrap() + .Group(clientGroup) + .Channel() + .Option(ChannelOption.TcpNodelay, true) + .Option(ChannelOption.Allocator, clientBufferAllocator) + .Handler(new ActionChannelInitializer(channel => + { + channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); + })); + clientChannel = cb.ConnectAsync(remoteAddress, localAddress).Result; } public void WriteAsync(byte[] data)