@@ -24,7 +24,7 @@ namespace JT808.DotNetty.Hosting | |||
.ConfigureLogging((context, logging) => | |||
{ | |||
logging.AddConsole(); | |||
logging.SetMinimumLevel(LogLevel.Error); | |||
logging.SetMinimumLevel(LogLevel.Debug); | |||
}) | |||
.ConfigureServices((hostContext, services) => | |||
{ | |||
@@ -0,0 +1,90 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Dtos; | |||
using JT808.DotNetty.Metadata; | |||
using JT808.Protocol; | |||
using JT808.Protocol.Extensions; | |||
using JT808.Protocol.MessageBody; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Net.Http; | |||
using System.Runtime.InteropServices; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Xunit; | |||
namespace JT808.DotNetty.Test.Internal | |||
{ | |||
public class JT808AtomicCounterServiceTest: TestBase | |||
{ | |||
private IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); | |||
public JT808SimpleTcpClient SimpleTcpClient; | |||
public JT808AtomicCounterServiceTest() | |||
{ | |||
SimpleTcpClient = new JT808SimpleTcpClient(endPoint); | |||
} | |||
[Fact] | |||
public void Test1() | |||
{ | |||
JT808AtomicCounter jT808AtomicCounter = new JT808AtomicCounter(); | |||
JT808AtomicCounter jT808AtomicCounter1 = new JT808AtomicCounter(); | |||
Parallel.For(0, 1000, (i) => | |||
{ | |||
jT808AtomicCounter.Increment(); | |||
}); | |||
Assert.Equal(1000, jT808AtomicCounter.Count); | |||
Parallel.For(0, 1000, (i) => | |||
{ | |||
jT808AtomicCounter1.Increment(); | |||
}); | |||
Assert.Equal(1000, jT808AtomicCounter1.Count); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
// 心跳会话包 | |||
JT808Package jT808Package1 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789001"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package1)); | |||
// 心跳会话包 | |||
JT808Package jT808Package2 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789002"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package2)); | |||
// 心跳会话包 | |||
JT808Package jT808Package3 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789003"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package3)); | |||
// 心跳会话包 | |||
JT808Package jT808Package4 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789004"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package4)); | |||
// 心跳会话包 | |||
JT808Package jT808Package5 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789005"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package5)); | |||
// 异步的需要延时下 | |||
Thread.Sleep(1000); | |||
HttpClient httpClient = new HttpClient(); | |||
// 调用内置的http服务接收文本信息下发 | |||
var result = httpClient.GetAsync("http://127.0.0.1:828/jt808api/GetAtomicCounter").Result; | |||
string content = result.Content.ReadAsStringAsync().Result; | |||
JT808ResultDto<JT808AtomicCounterDto> jt808Result = JsonConvert.DeserializeObject<JT808ResultDto<JT808AtomicCounterDto>>(content); | |||
Assert.Equal(200, jt808Result.Code); | |||
Assert.Equal(5,jt808Result.Data.MsgSuccessCount); | |||
Assert.Equal(0, jt808Result.Data.MsgFailCount); | |||
SimpleTcpClient.Down(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,104 @@ | |||
using JT808.DotNetty.Configurations; | |||
using JT808.DotNetty.Dtos; | |||
using JT808.DotNetty.Internal; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using Xunit; | |||
namespace JT808.DotNetty.Test.Internal | |||
{ | |||
public class JT808RemoteAddressTransmitConfigurationServiceTest | |||
{ | |||
private JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService; | |||
public JT808RemoteAddressTransmitConfigurationServiceTest() | |||
{ | |||
var serverHostBuilder = new HostBuilder() | |||
.ConfigureAppConfiguration((hostingContext, config) => | |||
{ | |||
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); | |||
}) | |||
.ConfigureServices((hostContext, services) => | |||
{ | |||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
services.Configure<JT808Configuration>(hostContext.Configuration.GetSection("JT808Configuration")); | |||
services.AddSingleton<JT808RemoteAddressTransmitConfigurationService>(); | |||
}); | |||
var serviceProvider = serverHostBuilder.Build().Services; | |||
jT808RemoteAddressTransmitConfigurationService = serviceProvider.GetService<JT808RemoteAddressTransmitConfigurationService>(); | |||
jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12345 | |||
}); | |||
jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12346 | |||
}); | |||
jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12347 | |||
}); | |||
jT808RemoteAddressTransmitConfigurationService.Add(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12348 | |||
}); | |||
} | |||
[Fact] | |||
public void Test1() | |||
{ | |||
Assert.True(jT808RemoteAddressTransmitConfigurationService.ContainsKey(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12348 | |||
}.EndPoint)); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
var result = jT808RemoteAddressTransmitConfigurationService.GetAll(); | |||
} | |||
[Fact] | |||
public void Test3() | |||
{ | |||
var ip1 = new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 12349 | |||
}; | |||
var result1= jT808RemoteAddressTransmitConfigurationService.Add(ip1); | |||
Assert.Equal(JT808ResultCode.Ok, result1.Code); | |||
Assert.True(result1.Data); | |||
var result2 = jT808RemoteAddressTransmitConfigurationService.Remove(ip1); | |||
Assert.Equal(JT808ResultCode.Ok, result2.Code); | |||
Assert.True(result2.Data); | |||
} | |||
[Fact] | |||
public void Test4() | |||
{ | |||
var configIp = new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 6561 | |||
}; | |||
var result2 = jT808RemoteAddressTransmitConfigurationService.Remove(configIp); | |||
Assert.Equal(JT808ResultCode.Ok, result2.Code); | |||
Assert.False(result2.Data); | |||
Assert.Equal("不能删除服务器配置的地址", result2.Message); | |||
} | |||
} | |||
} |
@@ -0,0 +1,66 @@ | |||
using JT808.DotNetty.Internal; | |||
using System; | |||
using System.Linq; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using System.Net; | |||
using JT808.Protocol; | |||
using JT808.Protocol.Extensions; | |||
using System.Threading; | |||
using Xunit; | |||
using JT808.DotNetty.Interfaces; | |||
namespace JT808.DotNetty.Test.Internal | |||
{ | |||
public class JT808SessionServiceDefaultImplTest : TestBase | |||
{ | |||
static IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); | |||
[Fact] | |||
public void Test1() | |||
{ | |||
IJT808SessionService jT808SessionServiceDefaultImpl = ServiceProvider.GetService<IJT808SessionService>(); | |||
JT808SimpleTcpClient SimpleTcpClient1 = new JT808SimpleTcpClient(endPoint); | |||
JT808SimpleTcpClient SimpleTcpClient2 = new JT808SimpleTcpClient(endPoint); | |||
JT808SimpleTcpClient SimpleTcpClient3 = new JT808SimpleTcpClient(endPoint); | |||
JT808SimpleTcpClient SimpleTcpClient4 = new JT808SimpleTcpClient(endPoint); | |||
JT808SimpleTcpClient SimpleTcpClient5 = new JT808SimpleTcpClient(endPoint); | |||
// 心跳会话包 | |||
JT808Package jT808Package1 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789001"); | |||
SimpleTcpClient1.WriteAsync(JT808Serializer.Serialize(jT808Package1)); | |||
// 心跳会话包 | |||
JT808Package jT808Package2 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789002"); | |||
SimpleTcpClient2.WriteAsync(JT808Serializer.Serialize(jT808Package2)); | |||
// 心跳会话包 | |||
JT808Package jT808Package3 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789003"); | |||
SimpleTcpClient3.WriteAsync(JT808Serializer.Serialize(jT808Package3)); | |||
// 心跳会话包 | |||
JT808Package jT808Package4 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789004"); | |||
SimpleTcpClient4.WriteAsync(JT808Serializer.Serialize(jT808Package4)); | |||
// 心跳会话包 | |||
JT808Package jT808Package5 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("123456789005"); | |||
SimpleTcpClient5.WriteAsync(JT808Serializer.Serialize(jT808Package5)); | |||
Thread.Sleep(1000); | |||
var result = jT808SessionServiceDefaultImpl.GetAll(); | |||
var info5 = result.Data.FirstOrDefault(f => f.TerminalPhoneNo == "123456789005"); | |||
var remove5 = jT808SessionServiceDefaultImpl.RemoveByChannelId(info5.ChannelId); | |||
var result1 = jT808SessionServiceDefaultImpl.GetAll(); | |||
Thread.Sleep(10000); | |||
SimpleTcpClient1.Down(); | |||
SimpleTcpClient2.Down(); | |||
SimpleTcpClient3.Down(); | |||
SimpleTcpClient4.Down(); | |||
SimpleTcpClient5.Down(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,136 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Configurations; | |||
using JT808.DotNetty.Internal; | |||
using JT808.Protocol; | |||
using JT808.Protocol.Extensions; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using Xunit; | |||
namespace JT808.DotNetty.Test.Internal | |||
{ | |||
public class JT808SourcePackageChannelServiceTest:TestBase | |||
{ | |||
private JT808SourcePackageChannelService jT808SourcePackageChannelService; | |||
private IPEndPoint endPoint = new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6565); | |||
public JT808SimpleTcpClient SimpleTcpClient; | |||
/// <summary> | |||
/// 需要使用 SocketTool 创建tcp服务器 | |||
/// </summary> | |||
public JT808SourcePackageChannelServiceTest() | |||
{ | |||
SimpleTcpClient = new JT808SimpleTcpClient(endPoint, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6555)); | |||
//作为设备5秒上传 | |||
Task.Run(() => | |||
{ | |||
Random random = new Random(); | |||
while (true) | |||
{ | |||
JT808Package jT808Package = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("12345678900"+ random.Next(0,2).ToString()); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package)); | |||
Thread.Sleep(1000); | |||
} | |||
}); | |||
// 作为源包转发服务端 | |||
DispatcherEventLoopGroup bossGroup = new DispatcherEventLoopGroup(); | |||
WorkerEventLoopGroup workerGroup = new WorkerEventLoopGroup(bossGroup, 1); | |||
ServerBootstrap bootstrap = new ServerBootstrap(); | |||
bootstrap.Group(bossGroup, workerGroup); | |||
bootstrap.Channel<TcpServerChannel>(); | |||
bootstrap | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); | |||
})); | |||
bootstrap.BindAsync(6655); | |||
DispatcherEventLoopGroup bossGroup1 = new DispatcherEventLoopGroup(); | |||
WorkerEventLoopGroup workerGroup1 = new WorkerEventLoopGroup(bossGroup1, 1); | |||
ServerBootstrap bootstrap1 = new ServerBootstrap(); | |||
bootstrap1.Group(bossGroup1, workerGroup1); | |||
bootstrap1.Channel<TcpServerChannel>(); | |||
bootstrap1 | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); | |||
})); | |||
bootstrap1.BindAsync(6656); | |||
} | |||
[Fact] | |||
public void Test1() | |||
{ | |||
//预热 | |||
Thread.Sleep(3000); | |||
jT808SourcePackageChannelService = ServiceProvider.GetService<JT808SourcePackageChannelService>(); | |||
var result = jT808SourcePackageChannelService.GetAll(); | |||
//创建服务 | |||
DispatcherEventLoopGroup bossGroup2 = new DispatcherEventLoopGroup(); | |||
WorkerEventLoopGroup workerGroup2 = new WorkerEventLoopGroup(bossGroup2, 1); | |||
ServerBootstrap bootstrap2 = new ServerBootstrap(); | |||
bootstrap2.Group(bossGroup2, workerGroup2); | |||
bootstrap2.Channel<TcpServerChannel>(); | |||
bootstrap2 | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); | |||
})); | |||
bootstrap2.BindAsync(6522); | |||
//添加服务 | |||
var addResult = jT808SourcePackageChannelService.Add(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 6522 | |||
}).Result; | |||
Thread.Sleep(3000); | |||
var result1 = jT808SourcePackageChannelService.GetAll(); | |||
//删除 | |||
var result2 = jT808SourcePackageChannelService.Remove(new Dtos.JT808IPAddressDto | |||
{ | |||
Host = "127.0.0.1", | |||
Port = 6522 | |||
}).Result; | |||
//[::ffff:127.0.0.1]:13196 | |||
var result3 = jT808SourcePackageChannelService.GetAll(); | |||
} | |||
} | |||
} |
@@ -23,7 +23,7 @@ namespace JT808.DotNetty.Test.Internal | |||
public JT808SourcePackageDispatcherDefaultImplTest() | |||
{ | |||
SimpleTcpClient = new JT808SimpleTcpClient(endPoint); | |||
SimpleTcpClient = new JT808SimpleTcpClient(endPoint, new IPEndPoint(IPAddress.Parse("127.0.0.1"), 6561)); | |||
} | |||
[Fact] | |||
@@ -58,5 +58,15 @@ namespace JT808.DotNetty.Test.Internal | |||
Thread.Sleep(10000); | |||
SimpleTcpClient.Down(); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
//原包转发 不下发 - 貔貅 | |||
byte[] bytes = "7E 02 00 00 26 12 34 56 78 90 12 00 7D 02 00 00 00 01 00 00 00 02 00 BA 7F 0E 07 E4 F1 1C 00 28 00 3C 00 00 18 10 15 10 10 10 01 04 00 00 00 64 02 02 00 7D 01 13 7E".ToHexBytes(); | |||
SimpleTcpClient.WriteAsync(bytes); | |||
Thread.Sleep(3000); | |||
SimpleTcpClient.Down(); | |||
} | |||
} | |||
} |
@@ -16,6 +16,8 @@ namespace JT808.DotNetty.Test | |||
{ | |||
public class TestBase:IDisposable | |||
{ | |||
public static IServiceProvider ServiceProvider; | |||
static TestBase() | |||
{ | |||
var serverHostBuilder = new HostBuilder() | |||
@@ -30,7 +32,10 @@ namespace JT808.DotNetty.Test | |||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
}) | |||
.UseJT808Host(); | |||
serverHostBuilder.RunConsoleAsync(); | |||
var build=serverHostBuilder.Build(); | |||
build.Start(); | |||
ServiceProvider = build.Services; | |||
} | |||
public virtual void Dispose() | |||
@@ -15,7 +15,24 @@ | |||
"JT808Configuration": { | |||
"Port": 6565, | |||
"SourcePackageDispatcherClientConfigurations": [ | |||
{"Host": "127.0.0.1","Port": 6655} | |||
{ | |||
"Host": "127.0.0.1", | |||
"Port": 6655 | |||
}, | |||
{ | |||
"Host": "127.0.0.1", | |||
"Port": 6656 | |||
} | |||
], | |||
"ForwardingRemoteAddress": [ | |||
{ | |||
"Host": "127.0.0.1", | |||
"Port": 6561 | |||
}, | |||
{ | |||
"Host": "127.0.0.1", | |||
"Port": 6562 | |||
} | |||
] | |||
} | |||
} |
@@ -8,6 +8,7 @@ using System.Text; | |||
using JT808.Protocol; | |||
using JT808.DotNetty.Internal; | |||
using JT808.DotNetty.Interfaces; | |||
using JT808.DotNetty.Metadata; | |||
namespace JT808.DotNetty.Codecs | |||
{ | |||
@@ -18,8 +19,10 @@ namespace JT808.DotNetty.Codecs | |||
{ | |||
private static readonly ILogger<JT808ClientDecoder> logger=new LoggerFactory().CreateLogger<JT808ClientDecoder>(); | |||
private static readonly JT808AtomicCounterService jT808AtomicCounterService=new JT808AtomicCounterService (); | |||
private static readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); | |||
private static readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); | |||
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) | |||
{ | |||
byte[] buffer = new byte[input.Capacity + 2]; | |||
@@ -29,28 +32,28 @@ namespace JT808.DotNetty.Codecs | |||
buffer[0] = JT808Package.BeginFlag; | |||
buffer[input.Capacity + 1] = JT808Package.EndFlag; | |||
JT808Package jT808Package = JT808Serializer.Deserialize<JT808Package>(buffer); | |||
MsgSuccessCounter.Increment(); | |||
output.Add(jT808Package); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString()); | |||
} | |||
} | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
MsgFailCounter.Increment(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + buffer); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
MsgFailCounter.Increment(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + buffer); | |||
} | |||
} | |||
@@ -42,8 +42,8 @@ namespace JT808.DotNetty.Codecs | |||
buffer[input.Capacity + 1] = JT808Package.EndFlag; | |||
jT808SourcePackageDispatcher?.SendAsync(buffer); | |||
JT808Package jT808Package = JT808Serializer.Deserialize<JT808Package>(buffer); | |||
output.Add(jT808Package); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
output.Add(jT808Package); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
@@ -10,6 +10,9 @@ using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808服务通道处理程序 | |||
/// </summary> | |||
internal class JT808ConnectionHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT808ConnectionHandler> logger; | |||
@@ -9,6 +9,9 @@ using JT808.DotNetty.Internal; | |||
namespace JT808.DotNetty.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808服务端处理程序 | |||
/// </summary> | |||
internal class JT808ServerHandler : SimpleChannelInboundHandler<JT808.Protocol.JT808Package> | |||
{ | |||
private readonly JT808MsgIdHandlerBase handler; | |||
@@ -39,7 +42,7 @@ namespace JT808.DotNetty.Handlers | |||
JT808Response jT808Package = handlerFunc(new JT808Request(msg)); | |||
if (jT808Package != null) | |||
{ | |||
if (!jT808RemoteAddressTransmitConfigurationService.Contains(ctx.Channel.RemoteAddress)) | |||
if (!jT808RemoteAddressTransmitConfigurationService.ContainsKey(ctx.Channel.RemoteAddress)) | |||
{ | |||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize))); | |||
} | |||
@@ -14,6 +14,11 @@ namespace JT808.DotNetty.Internal | |||
private static readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); | |||
public JT808AtomicCounterService() | |||
{ | |||
} | |||
public long MsgSuccessIncrement() | |||
{ | |||
return MsgSuccessCounter.Increment(); | |||
@@ -22,7 +22,7 @@ namespace JT808.DotNetty.Internal | |||
{ | |||
private readonly IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor; | |||
private ConcurrentBag<string> ForwardingRemoteAddresss; | |||
private ConcurrentDictionary<string,int> ForwardingRemoteAddresssDict; | |||
private IDisposable jT808ConfigurationOptionsMonitorDisposable; | |||
@@ -30,7 +30,7 @@ namespace JT808.DotNetty.Internal | |||
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor) | |||
{ | |||
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; | |||
ForwardingRemoteAddresss = new ConcurrentBag<string>(); | |||
ForwardingRemoteAddresssDict = new ConcurrentDictionary<string, int>(); | |||
InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress); | |||
//OnChange 源码多播委托 | |||
jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options => | |||
@@ -46,46 +46,39 @@ namespace JT808.DotNetty.Internal | |||
foreach (var item in jT808ClientConfigurations) | |||
{ | |||
string host = item.EndPoint.ToString(); | |||
if (!ForwardingRemoteAddresss.Contains(host)) | |||
{ | |||
ForwardingRemoteAddresss.Add(host); | |||
} | |||
ForwardingRemoteAddresssDict.TryAdd(host, 0); | |||
} | |||
} | |||
} | |||
public bool Contains(EndPoint endPoint) | |||
public bool ContainsKey(EndPoint endPoint) | |||
{ | |||
return ForwardingRemoteAddresss.Contains(endPoint.ToString()); | |||
return ForwardingRemoteAddresssDict.ContainsKey(endPoint.ToString()); | |||
} | |||
public JT808ResultDto<bool> Add(JT808IPAddressDto jT808IPAddressDto) | |||
{ | |||
string host = jT808IPAddressDto.EndPoint.ToString(); | |||
if (!ForwardingRemoteAddresss.Contains(host)) | |||
{ | |||
ForwardingRemoteAddresss.Add(host); | |||
} | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = true }; | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryAdd(host,0) }; | |||
} | |||
public JT808ResultDto<bool> Remove(JT808IPAddressDto jT808IPAddressDto) | |||
{ | |||
string host = jT808IPAddressDto.EndPoint.ToString(); | |||
if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null && | |||
jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== jT808IPAddressDto.ToString())) | |||
jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== host)) | |||
{ | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" }; | |||
} | |||
else | |||
{ | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.TryTake(out var temp) }; | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryRemove(host,out var temp) }; | |||
} | |||
} | |||
public JT808ResultDto<List<string>> GetAll() | |||
{ | |||
return new JT808ResultDto<List<string>>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.ToList() }; | |||
return new JT808ResultDto<List<string>>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.Select(s=>s.Key).ToList() }; | |||
} | |||
public void Dispose() | |||
@@ -198,16 +198,6 @@ namespace JT808.DotNetty.Internal | |||
{ | |||
channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this)); | |||
})); | |||
jT808ConfigurationOptionsMonitor.OnChange(options => | |||
{ | |||
List<JT808ClientConfiguration> chgRemoteServers = new List<JT808ClientConfiguration>(); | |||
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) | |||
{ | |||
chgRemoteServers = options.SourcePackageDispatcherClientConfigurations; | |||
} | |||
DelRemoteServsers(chgRemoteServers); | |||
AddRemoteServsers(chgRemoteServers); | |||
}); | |||
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && | |||
jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0) | |||
{ | |||
@@ -8,6 +8,9 @@ using System.Text; | |||
namespace JT808.DotNetty.Internal | |||
{ | |||
/// <summary> | |||
/// JT808 WebApi 业务服务 | |||
/// </summary> | |||
internal class JT808WebAPIService | |||
{ | |||
public Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>> HandlerDict { get; protected set; } | |||
@@ -116,10 +119,14 @@ namespace JT808.DotNetty.Internal | |||
/// <returns></returns> | |||
public JT808HttpResponse GetAtomicCounter(JT808HttpRequest request) | |||
{ | |||
JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto(); | |||
jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount; | |||
jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount; | |||
return CreateJT808HttpResponse(jT808AtomicCounterDto); | |||
JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto(); | |||
jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount; | |||
jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount; | |||
return CreateJT808HttpResponse(new JT808ResultDto<JT808AtomicCounterDto> | |||
{ | |||
Code=JT808ResultCode.Ok, | |||
Data= jT808AtomicCounterDto | |||
}); | |||
} | |||
/// <summary> | |||
@@ -14,9 +14,9 @@ | |||
JT808解码 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Configurations.JT808Configuration.WebAPIPort"> | |||
<member name="P:JT808.DotNetty.Configurations.JT808Configuration.WebApiPort"> | |||
<summary> | |||
WebAPI服务 | |||
WebApi服务 | |||
默认828端口 | |||
</summary> | |||
</member> | |||
@@ -25,6 +25,12 @@ | |||
源包分发器配置 | |||
</summary> | |||
</member> | |||
<!-- 对于成员“P:JT808.DotNetty.Configurations.JT808Configuration.ForwardingRemoteAddress”忽略有格式错误的 XML 注释 --> | |||
<member name="T:JT808.DotNetty.Dtos.JT808AtomicCounterDto"> | |||
<summary> | |||
包计数器服务 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SessionInfoDto.ChannelId"> | |||
<summary> | |||
通道Id | |||
@@ -60,6 +66,36 @@ | |||
远程ip地址 | |||
</summary> | |||
</member> | |||
<member name="T:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto"> | |||
<summary> | |||
原包通道信息 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto.RemoteAddress"> | |||
<summary> | |||
远程地址 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto.LocalAddress"> | |||
<summary> | |||
本地地址 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto.Registered"> | |||
<summary> | |||
是否注册 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto.Active"> | |||
<summary> | |||
是否活动 | |||
</summary> | |||
</member> | |||
<member name="P:JT808.DotNetty.Dtos.JT808SourcePackageChannelInfoDto.Open"> | |||
<summary> | |||
是否打开 | |||
</summary> | |||
</member> | |||
<member name="T:JT808.DotNetty.Dtos.JT808UnificationSendRequestDto"> | |||
<summary> | |||
统一下发请求参数 | |||
@@ -146,24 +182,116 @@ | |||
默认消息处理业务实现 | |||
</summary> | |||
</member> | |||
<member name="T:JT808.DotNetty.Internal.JT808SourcePackageDispatcherDefaultImpl"> | |||
<!-- 对于成员“T:JT808.DotNetty.Internal.JT808RemoteAddressTransmitConfigurationService”忽略有格式错误的 XML 注释 --> | |||
<member name="T:JT808.DotNetty.Internal.JT808SourcePackageChannelService"> | |||
<summary> | |||
源包分发器默认实现 | |||
原包分发器通道服务 | |||
</summary> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageDispatcherDefaultImpl.DelRemoteServsers(System.Collections.Generic.List{JT808.DotNetty.Configurations.JT808ClientConfiguration})"> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.SendAsync(System.Byte[])"> | |||
<summary> | |||
下发数据 | |||
</summary> | |||
<param name="data"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.GetAll"> | |||
<summary> | |||
获取通道信息集合 | |||
</summary> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.Add(JT808.DotNetty.Dtos.JT808IPAddressDto)"> | |||
<summary> | |||
添加地址 | |||
</summary> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.Remove(JT808.DotNetty.Dtos.JT808IPAddressDto)"> | |||
<summary> | |||
删除地址 | |||
</summary> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.DelRemoteServsers(System.Collections.Generic.List{JT808.DotNetty.Configurations.JT808ClientConfiguration})"> | |||
<summary> | |||
动态删除远程服务器 | |||
</summary> | |||
<param name="chgRemoteServers"></param> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageDispatcherDefaultImpl.AddRemoteServsers(System.Collections.Generic.List{JT808.DotNetty.Configurations.JT808ClientConfiguration})"> | |||
<member name="M:JT808.DotNetty.Internal.JT808SourcePackageChannelService.AddRemoteServsers(System.Collections.Generic.List{JT808.DotNetty.Configurations.JT808ClientConfiguration})"> | |||
<summary> | |||
动态添加远程服务器 | |||
</summary> | |||
<param name="bootstrap"></param> | |||
<param name="chgRemoteServers"></param> | |||
</member> | |||
<member name="T:JT808.DotNetty.Internal.JT808SourcePackageDispatcherDefaultImpl"> | |||
<summary> | |||
原包分发器默认实现 | |||
</summary> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.#ctor(JT808.DotNetty.Internal.JT808AtomicCounterService,JT808.DotNetty.Internal.JT808SourcePackageChannelService,JT808.DotNetty.Interfaces.IJT808SessionService,JT808.DotNetty.Interfaces.IJT808UnificationSendService)"> | |||
<summary> | |||
初始化消息处理业务 | |||
</summary> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.UnificationSend(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
统一下发信息 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.GetSessionAll(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务集合 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.RemoveByChannelId(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务-通过通道Id移除对应会话 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.RemoveByTerminalPhoneNo(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务-通过设备终端号移除对应会话 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.GetAtomicCounter(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
获取包计数器 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.AddSourcePackageAddress(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
添加原包转发地址 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.RemoveSourcePackageAddress(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
删除原包转发地址(不能删除在网关服务器配置文件配的地址) | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.Internal.JT808WebAPIService.GetSourcePackageAll(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
获取原包信息集合 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="T:JT808.DotNetty.JT808MsgIdHandlerBase"> | |||
<summary> | |||
抽象消息处理业务 | |||
@@ -270,39 +398,6 @@ | |||
集成一个webapi服务 | |||
</summary> | |||
</member> | |||
<member name="M:JT808.DotNetty.JT808WebAPIService.#ctor(JT808.DotNetty.Interfaces.IJT808SessionService,JT808.DotNetty.Interfaces.IJT808UnificationSendService)"> | |||
<summary> | |||
初始化消息处理业务 | |||
</summary> | |||
</member> | |||
<member name="M:JT808.DotNetty.JT808WebAPIService.UnificationSend(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
统一下发信息 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.JT808WebAPIService.GetAll(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务集合 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.JT808WebAPIService.RemoveByChannelId(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务-通过通道Id移除对应会话 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="M:JT808.DotNetty.JT808WebAPIService.RemoveByTerminalPhoneNo(JT808.DotNetty.Metadata.JT808HttpRequest)"> | |||
<summary> | |||
会话服务-通过设备终端号移除对应会话 | |||
</summary> | |||
<param name="request"></param> | |||
<returns></returns> | |||
</member> | |||
<member name="T:JT808.DotNetty.Metadata.JT808AtomicCounter"> | |||
<summary> | |||
@@ -37,6 +37,7 @@ namespace JT808.DotNetty | |||
services.Configure<JT808Configuration>(hostContext.Configuration.GetSection("JT808Configuration")); | |||
services.TryAddSingleton<JT808SessionManager>(); | |||
services.TryAddSingleton<JT808AtomicCounterService>(); | |||
services.TryAddSingleton<JT808RemoteAddressTransmitConfigurationService>(); | |||
services.TryAddSingleton<JT808MsgIdHandlerBase,JT808MsgIdDefaultHandler>(); | |||
services.TryAddSingleton<JT808SourcePackageChannelService>(); | |||
services.TryAddSingleton<IJT808SourcePackageDispatcher, JT808SourcePackageDispatcherDefaultImpl>(); | |||
@@ -21,6 +21,9 @@ using System.Threading.Tasks; | |||
namespace JT808.DotNetty | |||
{ | |||
/// <summary> | |||
/// JT808 网关服务 | |||
/// </summary> | |||
internal class JT808ServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
@@ -7,6 +7,9 @@ using JT808.DotNetty.Metadata; | |||
namespace JT808.DotNetty | |||
{ | |||
/// <summary> | |||
/// JT808会话管理 | |||
/// </summary> | |||
public class JT808SessionManager | |||
{ | |||
private readonly ILogger<JT808SessionManager> logger; | |||
@@ -5,6 +5,7 @@ using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Configurations; | |||
using JT808.DotNetty.Handlers; | |||
using System; | |||
using System.Collections.Generic; | |||
@@ -40,7 +41,26 @@ namespace JT808.DotNetty | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); | |||
})); | |||
clientChannel = cb.ConnectAsync(remoteAddress).Result; | |||
clientChannel = cb.ConnectAsync(remoteAddress).Result; | |||
} | |||
public JT808SimpleTcpClient(EndPoint remoteAddress, EndPoint localAddress) | |||
{ | |||
clientBufferAllocator = new PooledByteBufferAllocator(); | |||
clientGroup = new MultithreadEventLoopGroup(1); | |||
cb = new Bootstrap() | |||
.Group(clientGroup) | |||
.Channel<TcpSocketChannel>() | |||
.Option(ChannelOption.TcpNodelay, true) | |||
.Option(ChannelOption.Allocator, clientBufferAllocator) | |||
.Handler(new ActionChannelInitializer<TcpSocketChannel>(channel => | |||
{ | |||
channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808Decode", new JT808ClientDecoder()); | |||
})); | |||
clientChannel = cb.ConnectAsync(remoteAddress, localAddress).Result; | |||
} | |||
public void WriteAsync(byte[] data) | |||