@@ -51,14 +51,14 @@ namespace JT808.DotNetty.Test | |||
} | |||
[Fact] | |||
public void Test7() | |||
{ | |||
var channel = new EmbeddedChannel(); | |||
var sessionInfo = jT808SessionManager.RemoveSessionByTerminalPhoneNo(TerminalPhoneNo); | |||
Assert.Equal(TerminalPhoneNo, sessionInfo.TerminalPhoneNo); | |||
Assert.Equal("embedded", sessionInfo.SessionID); | |||
} | |||
//[Fact] | |||
//public void Test7() | |||
//{ | |||
// var channel = new EmbeddedChannel(); | |||
// var sessionInfo = jT808SessionManager.RemoveSessionByTerminalPhoneNo(TerminalPhoneNo); | |||
// Assert.Equal(TerminalPhoneNo, sessionInfo.TerminalPhoneNo); | |||
// Assert.Equal("embedded", sessionInfo.SessionID); | |||
//} | |||
[Fact] | |||
public void Test8() | |||
@@ -47,7 +47,7 @@ namespace JT808.DotNetty.Test | |||
string content = result.Content.ReadAsStringAsync().Result; | |||
JT808ResultDto<IEnumerable<JT808SessionInfoDto>> jt808Result = JsonConvert.DeserializeObject<JT808ResultDto<IEnumerable<JT808SessionInfoDto>>>(content); | |||
Assert.Equal(200, jt808Result.Code); | |||
Assert.Single(jt808Result.Data); | |||
Assert.Equal(10,jt808Result.Data.Count()); | |||
} | |||
[Fact] | |||
@@ -57,7 +57,7 @@ namespace JT808.DotNetty.Test | |||
JT808Package jT808Package1 = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create("99"); | |||
SimpleTcpClient.WriteAsync(JT808Serializer.Serialize(jT808Package1)); | |||
var result4 = httpClient.PostAsync($"{Url}/{sessionRoutePrefix}/RemoveByChannelId", new StringContent("")).Result; | |||
var result4 = httpClient.PostAsync($"{Url}/{sessionRoutePrefix}/RemoveByChannelId", new StringContent("99")).Result; | |||
string content4 = result4.Content.ReadAsStringAsync().Result; | |||
JT808ResultDto<bool> jt808Result4= JsonConvert.DeserializeObject<JT808ResultDto<bool>>(content4); | |||
Assert.Equal(200, jt808Result4.Code); | |||
@@ -1,7 +1,7 @@ | |||
| |||
Microsoft Visual Studio Solution File, Format Version 12.00 | |||
# Visual Studio 15 | |||
VisualStudioVersion = 15.0.28010.2016 | |||
# Visual Studio Version 16 | |||
VisualStudioVersion = 16.0.28407.52 | |||
MinimumVisualStudioVersion = 10.0.40219.1 | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty", "JT808.DotNetty\JT808.DotNetty.csproj", "{80C7F67E-6B7C-4178-8726-ADD3695622DD}" | |||
EndProject | |||
@@ -16,57 +16,13 @@ namespace JT808.DotNetty.Codecs | |||
/// </summary> | |||
internal class JT808Decoder : ByteToMessageDecoder | |||
{ | |||
private readonly ILogger<JT808Decoder> logger; | |||
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; | |||
private readonly JT808AtomicCounterService jT808AtomicCounterService; | |||
public JT808Decoder( | |||
JT808AtomicCounterService jT808AtomicCounterService, | |||
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT808AtomicCounterService = jT808AtomicCounterService; | |||
this.logger = loggerFactory.CreateLogger<JT808Decoder>(); | |||
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; | |||
} | |||
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) | |||
{ | |||
byte[] buffer = new byte[input.Capacity + 2]; | |||
try | |||
{ | |||
input.ReadBytes(buffer, 1, input.Capacity); | |||
buffer[0] = JT808Package.BeginFlag; | |||
buffer[input.Capacity + 1] = JT808Package.EndFlag; | |||
jT808SourcePackageDispatcher?.SendAsync(buffer); | |||
JT808Package jT808Package = JT808Serializer.Deserialize<JT808Package>(buffer); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
output.Add(jT808Package); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
} | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + buffer); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + buffer); | |||
} | |||
} | |||
input.ReadBytes(buffer, 1, input.Capacity); | |||
buffer[0] = JT808Package.BeginFlag; | |||
buffer[input.Capacity + 1] = JT808Package.EndFlag; | |||
output.Add(buffer); | |||
} | |||
} | |||
} |
@@ -6,13 +6,15 @@ using System.Collections.Generic; | |||
using System.Text; | |||
using JT808.DotNetty.Metadata; | |||
using JT808.DotNetty.Internal; | |||
using JT808.DotNetty.Interfaces; | |||
using Microsoft.Extensions.Logging; | |||
namespace JT808.DotNetty.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808服务端处理程序 | |||
/// </summary> | |||
internal class JT808ServerHandler : SimpleChannelInboundHandler<JT808.Protocol.JT808Package> | |||
internal class JT808ServerHandler : SimpleChannelInboundHandler<byte[]> | |||
{ | |||
private readonly JT808MsgIdHandlerBase handler; | |||
@@ -20,38 +22,71 @@ namespace JT808.DotNetty.Handlers | |||
private readonly JT808TransmitAddressFilterService jT808TransmitAddressFilterService; | |||
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; | |||
private readonly JT808AtomicCounterService jT808AtomicCounterService; | |||
private readonly ILogger<JT808ServerHandler> logger; | |||
public JT808ServerHandler( | |||
ILoggerFactory loggerFactory, | |||
JT808TransmitAddressFilterService jT808TransmitAddressFilterService, | |||
JT808MsgIdHandlerBase handler, | |||
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, | |||
JT808MsgIdHandlerBase handler, | |||
JT808AtomicCounterService jT808AtomicCounterService, | |||
JT808SessionManager jT808SessionManager) | |||
{ | |||
this.jT808TransmitAddressFilterService = jT808TransmitAddressFilterService; | |||
this.handler = handler; | |||
this.jT808SessionManager = jT808SessionManager; | |||
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; | |||
this.jT808AtomicCounterService = jT808AtomicCounterService; | |||
logger = loggerFactory.CreateLogger<JT808ServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, JT808Package msg) | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) | |||
{ | |||
try | |||
{ | |||
jT808SessionManager.TryAddOrUpdateSession(new JT808Session(ctx.Channel, msg.Header.TerminalPhoneNo)); | |||
jT808SourcePackageDispatcher?.SendAsync(msg); | |||
JT808Package jT808Package = JT808Serializer.Deserialize(msg); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
jT808SessionManager.TryAddOrUpdateSession(new JT808Session(ctx.Channel, jT808Package.Header.TerminalPhoneNo)); | |||
Func<JT808Request, JT808Response> handlerFunc; | |||
if (handler.HandlerDict.TryGetValue(msg.Header.MsgId, out handlerFunc)) | |||
if (handler.HandlerDict.TryGetValue(jT808Package.Header.MsgId, out handlerFunc)) | |||
{ | |||
JT808Response jT808Package = handlerFunc(new JT808Request(msg)); | |||
if (jT808Package != null) | |||
JT808Response jT808Response = handlerFunc(new JT808Request(jT808Package)); | |||
if (jT808Response != null) | |||
{ | |||
if (!jT808TransmitAddressFilterService.ContainsKey(ctx.Channel.RemoteAddress)) | |||
{ | |||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize))); | |||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize))); | |||
} | |||
} | |||
} | |||
} | |||
catch | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
} | |||
} | |||