Explorar el Código

整理项目文件

tags/old
SmallChi hace 6 años
padre
commit
58bd8e9636
Se han modificado 30 ficheros con 1043 adiciones y 183 borrados
  1. +4
    -1
      src/JT809.DotNetty.Core/Codecs/JT809Decoder.cs
  2. +27
    -0
      src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs
  3. +12
    -0
      src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs
  4. +16
    -9
      src/JT809.DotNetty.Core/Handlers/JT809MainMsgIdHandlerBase.cs
  5. +97
    -0
      src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs
  6. +82
    -0
      src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs
  7. +98
    -0
      src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs
  8. +41
    -0
      src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs
  9. +79
    -0
      src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs
  10. +2
    -1
      src/JT809.DotNetty.Core/Interfaces/IJT809VerifyCodeGenerator.cs
  11. +22
    -0
      src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs
  12. +18
    -0
      src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs
  13. +23
    -0
      src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs
  14. +0
    -15
      src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs
  15. +23
    -2
      src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs
  16. +137
    -0
      src/JT809.DotNetty.Core/Links/JT809MainClient.cs
  17. +176
    -0
      src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs
  18. +0
    -113
      src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs
  19. +7
    -0
      src/JT809.DotNetty.Core/Metadata/JT809Response.cs
  20. +2
    -2
      src/JT809.DotNetty.Core/Metadata/JT809Session.cs
  21. +9
    -6
      src/JT809.DotNetty.Core/Services/JT809AtomicCounterService.cs
  22. +31
    -0
      src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs
  23. +98
    -0
      src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs
  24. +17
    -16
      src/JT809.DotNetty.Core/Session/JT809SessionManager.cs
  25. +6
    -2
      src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs
  26. +2
    -2
      src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs
  27. +9
    -8
      src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs
  28. +3
    -4
      src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs
  29. +1
    -1
      src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs
  30. +1
    -1
      src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs

src/JT809.DotNetty.Core/Codecs/JT809TcpDecoder.cs → src/JT809.DotNetty.Core/Codecs/JT809Decoder.cs Ver fichero

@@ -6,7 +6,10 @@ using JT809.Protocol;

namespace JT809.DotNetty.Core.Codecs
{
public class JT809TcpDecoder : ByteToMessageDecoder
/// <summary>
/// JT809解码
/// </summary>
public class JT809Decoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{

+ 27
- 0
src/JT809.DotNetty.Core/Codecs/JT809Encoder.cs Ver fichero

@@ -0,0 +1,27 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using System.Collections.Generic;
using DotNetty.Transport.Channels;
using JT809.Protocol;
using JT809.DotNetty.Core.Metadata;

namespace JT809.DotNetty.Core.Codecs
{
/// <summary>
/// JT809编码
/// </summary>
public class JT809Encoder : MessageToByteEncoder<JT809Response>
{
protected override void Encode(IChannelHandlerContext context, JT809Response message, IByteBuffer output)
{
if (message.Package != null) {
var sendData = JT809Serializer.Serialize(message.Package, message.MinBufferSize);
output.WriteBytes(sendData);
}
else if (message.HexData != null)
{
output.WriteBytes(message.HexData);
}
}
}
}

+ 12
- 0
src/JT809.DotNetty.Core/Enums/JT809AtomicCounterType.cs Ver fichero

@@ -0,0 +1,12 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Enums
{
public enum JT809AtomicCounterType
{
ClientSubordinate=1,
ServerMain=2
}
}

src/JT809.DotNetty.Core/Handlers/JT809MsgIdTcpHandlerBase.cs → src/JT809.DotNetty.Core/Handlers/JT809MainMsgIdHandlerBase.cs Ver fichero

@@ -1,6 +1,7 @@
using System;
using System.Collections.Generic;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Links;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
@@ -9,26 +10,29 @@ using JT809.Protocol.MessageBody;
namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// 基于Tcp模式抽象消息处理业务
/// 抽象消息处理业务
/// 自定义消息处理业务
/// 注意:
/// 1.ConfigureServices:
/// services.Replace(new ServiceDescriptor(typeof(JT809MsgIdTcpHandlerBase),typeof(JT809MsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton));
/// 2.解析具体的消息体,具体消息调用具体的JT809Serializer.Deserialize<T>
/// </summary>
public abstract class JT809MsgIdTcpHandlerBase
public abstract class JT809MainMsgIdHandlerBase
{
protected JT809TcpSessionManager sessionManager { get; }
protected IVerifyCodeGenerator verifyCodeGenerator { get; }
protected JT809SessionManager sessionManager { get; }
protected IJT809VerifyCodeGenerator verifyCodeGenerator { get; }
protected JT809SubordinateClient subordinateLinkClient { get; }
/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT809MsgIdTcpHandlerBase(
IVerifyCodeGenerator verifyCodeGenerator,
JT809TcpSessionManager sessionManager)
protected JT809MainMsgIdHandlerBase(
IJT809VerifyCodeGenerator verifyCodeGenerator,
JT809SubordinateClient subordinateLinkClient,
JT809SessionManager sessionManager)
{
this.sessionManager = sessionManager;
this.verifyCodeGenerator = verifyCodeGenerator;
this.subordinateLinkClient = subordinateLinkClient;
HandlerDict = new Dictionary<JT809BusinessType, Func<JT809Request, JT809Response>>
{
{JT809BusinessType.主链路登录请求消息, Msg0x1001},
@@ -42,6 +46,7 @@ namespace JT809.DotNetty.Core.Handlers
//};
}

public Dictionary<JT809BusinessType, Func<JT809Request, JT809Response>> HandlerDict { get; protected set; }

//public Dictionary<JT809SubBusinessType, Func<JT809Request, JT809Response>> SubHandlerDict { get; protected set; }
@@ -53,12 +58,14 @@ namespace JT809.DotNetty.Core.Handlers
/// <returns></returns>
public virtual JT809Response Msg0x1001(JT809Request request)
{
var verifyCode = verifyCodeGenerator.Create();
var package= JT809BusinessType.主链路登录应答消息.Create(new JT809_0x1002()
{
Result= JT809_0x1002_Result.成功,
VerifyCode= verifyCodeGenerator.Create()
VerifyCode= verifyCode
});

var jT809_0x1001 = request.Package.Bodies as JT809_0x1001;
subordinateLinkClient.ConnectAsync(jT809_0x1001.DownLinkIP, jT809_0x1001.DownLinkPort, verifyCode);
return new JT809Response(package,100);
}


+ 97
- 0
src/JT809.DotNetty.Core/Handlers/JT809MainServerConnectionHandler.cs Ver fichero

@@ -0,0 +1,97 @@
using DotNetty.Buffers;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// JT809主链路服务端连接处理器
/// </summary>
public class JT809MainServerConnectionHandler : ChannelHandlerAdapter
{

private readonly ILogger<JT809MainServerConnectionHandler> logger;

public JT809MainServerConnectionHandler(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT809MainServerConnectionHandler>();
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 客户端主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if (idleStateEvent.State == IdleState.WriterIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>Heartbeat-{channelId}");
//发送主链路保持请求数据包
var package = JT809BusinessType.主链路连接保持请求消息.Create();
JT809Response jT809Response = new JT809Response(package, 100);
context.WriteAndFlushAsync(jT809Response);
}
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception, $"{channelId} {exception.Message}");
context.CloseAsync();
}
}
}

+ 82
- 0
src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs Ver fichero

@@ -0,0 +1,82 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT809.Protocol;
using System;
using Microsoft.Extensions.Logging;
using JT809.Protocol.Exceptions;
using JT809.DotNetty.Core.Services;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Enums;

namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// JT809主链路服务端处理程序
/// </summary>
internal class JT809MainServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT809MainMsgIdHandlerBase handler;
private readonly JT809SessionManager jT809SessionManager;

private readonly JT809AtomicCounterService jT809AtomicCounterService;

private readonly ILogger<JT809MainServerHandler> logger;

public JT809MainServerHandler(
ILoggerFactory loggerFactory,
JT809MainMsgIdHandlerBase handler,
JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty,
JT809SessionManager jT809SessionManager
)
{
this.handler = handler;
this.jT809SessionManager = jT809SessionManager;
this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ServerMain.ToString()); ;
logger = loggerFactory.CreateLogger<JT809MainServerHandler>();
}


protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
JT809Package jT809Package = JT809Serializer.Deserialize(msg);
jT809AtomicCounterService.MsgSuccessIncrement();
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString());
}
jT809SessionManager.TryAdd(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID);
Func<JT809Request, JT809Response> handlerFunc;
if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc))
{
JT809Response jT808Response = handlerFunc(new JT809Request(jT809Package, msg));
if (jT808Response != null)
{
var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize);
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData));
}
}
}
catch (JT809Exception ex)
{
jT809AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
catch (Exception ex)
{
jT809AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
}
}
}

+ 98
- 0
src/JT809.DotNetty.Core/Handlers/JT809SubordinateConnectionHandler.cs Ver fichero

@@ -0,0 +1,98 @@
using DotNetty.Buffers;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// JT809从链路连接处理器
/// </summary>
public class JT809SubordinateConnectionHandler: ChannelHandlerAdapter
{

private readonly ILogger<JT809SubordinateConnectionHandler> logger;

public JT809SubordinateConnectionHandler(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT809SubordinateConnectionHandler>();
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 客户端主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if (idleStateEvent.State == IdleState.WriterIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>Heartbeat-{channelId}");
//发送从链路保持请求数据包
var package = JT809BusinessType.从链路连接保持请求消息.Create();
JT809Response jT809Response = new JT809Response(package, 100);
context.WriteAndFlushAsync(jT809Response);
//context.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT809Serializer.Serialize(package,100)));
}
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception, $"{channelId} {exception.Message}");
context.CloseAsync();
}
}
}

+ 41
- 0
src/JT809.DotNetty.Core/Handlers/JT809SubordinateMsgIdHandlerBase.cs Ver fichero

@@ -0,0 +1,41 @@
using System;
using System.Collections.Generic;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol.Enums;

namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// 抽象从链路消息处理业务
/// 自定义消息处理业务
/// 注意:
/// 1.ConfigureServices:
/// services.Replace(new ServiceDescriptor(typeof(JT809SubordinateMsgIdTcpHandlerBase),typeof(JT809SubordinateMsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton));
/// 2.解析具体的消息体,具体消息调用具体的JT809Serializer.Deserialize<T>
/// </summary>
public abstract class JT809SubordinateMsgIdHandlerBase
{
/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT809SubordinateMsgIdHandlerBase()
{
HandlerDict = new Dictionary<JT809BusinessType, Func<JT809Request, JT809Response>>
{
{JT809BusinessType.从链路注销应答消息, Msg0x9004},
};
}

public Dictionary<JT809BusinessType, Func<JT809Request, JT809Response>> HandlerDict { get; protected set; }

/// <summary>
/// 从链路注销应答消息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public virtual JT809Response Msg0x9004(JT809Request request)
{
return null;
}
}
}

+ 79
- 0
src/JT809.DotNetty.Core/Handlers/JT809SubordinateServerHandler.cs Ver fichero

@@ -0,0 +1,79 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT809.Protocol;
using System;
using Microsoft.Extensions.Logging;
using JT809.Protocol.Exceptions;
using JT809.DotNetty.Core.Services;
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Internal;
using JT809.DotNetty.Core.Enums;

namespace JT809.DotNetty.Core.Handlers
{
/// <summary>
/// JT809从链路业务处理程序
/// </summary>
internal class JT809SubordinateServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT809SubordinateMsgIdHandlerBase handler;
private readonly JT809AtomicCounterService jT809AtomicCounterService;

private readonly ILogger<JT809SubordinateServerHandler> logger;

public JT809SubordinateServerHandler(
ILoggerFactory loggerFactory,
JT809SubordinateMsgIdHandlerBase handler,
JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty
)
{
this.handler = handler;
this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ClientSubordinate.ToString());
logger = loggerFactory.CreateLogger<JT809SubordinateServerHandler>();
}


protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
JT809Package jT809Package = JT809Serializer.Deserialize(msg);
jT809AtomicCounterService.MsgSuccessIncrement();
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString());
}
Func<JT809Request, JT809Response> handlerFunc;
if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc))
{
JT809Response jT808Response = handlerFunc(new JT809Request(jT809Package, msg));
if (jT808Response != null)
{
ctx.WriteAndFlushAsync(jT808Response);
}
}
}
catch (JT809Exception ex)
{
jT809AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
catch (Exception ex)
{
jT809AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + jT809AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
}
}
}

src/JT809.DotNetty.Core/Interfaces/IVerifyCodeGenerator.cs → src/JT809.DotNetty.Core/Interfaces/IJT809VerifyCodeGenerator.cs Ver fichero

@@ -7,8 +7,9 @@ namespace JT809.DotNetty.Core.Interfaces
/// <summary>
/// 校验码生成器
/// </summary>
public interface IVerifyCodeGenerator
public interface IJT809VerifyCodeGenerator
{
uint Create();
uint Get();
}
}

+ 22
- 0
src/JT809.DotNetty.Core/Internal/JT809MainMsgIdDefaultHandler.cs Ver fichero

@@ -0,0 +1,22 @@
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Links;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Internal
{
/// <summary>
/// 默认主链路服务端消息处理业务实现
/// </summary>
internal class JT809MainMsgIdDefaultHandler : JT809MainMsgIdHandlerBase
{
public JT809MainMsgIdDefaultHandler(IJT809VerifyCodeGenerator verifyCodeGenerator,
JT809SubordinateClient subordinateLinkClient, JT809SessionManager sessionManager)
: base(verifyCodeGenerator, subordinateLinkClient, sessionManager)
{
}
}
}

+ 18
- 0
src/JT809.DotNetty.Core/Internal/JT809SubordinateMsgIdDefaultHandler.cs Ver fichero

@@ -0,0 +1,18 @@
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Links;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Internal
{
/// <summary>
/// 默认从链路客户端消息处理业务实现
/// </summary>
internal class JT809SubordinateMsgIdDefaultHandler : JT809SubordinateMsgIdHandlerBase
{
}
}

+ 23
- 0
src/JT809.DotNetty.Core/Internal/JT809VerifyCodeGeneratorDefaultImpl.cs Ver fichero

@@ -0,0 +1,23 @@
using JT809.DotNetty.Core.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Internal
{
internal class JT809VerifyCodeGeneratorDefaultImpl : IJT809VerifyCodeGenerator
{
private uint VerifyCode;

public uint Create()
{
VerifyCode= (uint)Guid.NewGuid().GetHashCode();
return VerifyCode;
}

public uint Get()
{
return VerifyCode;
}
}
}

+ 0
- 15
src/JT809.DotNetty.Core/Internal/VerifyCodeGeneratorDefaultImpl.cs Ver fichero

@@ -1,15 +0,0 @@
using JT809.DotNetty.Core.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.DotNetty.Core.Internal
{
internal class VerifyCodeGeneratorDefaultImpl : IVerifyCodeGenerator
{
public uint Create()
{
return (uint)Guid.NewGuid().GetHashCode();
}
}
}

+ 23
- 2
src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs Ver fichero

@@ -1,8 +1,13 @@
using JT809.DotNetty.Abstractions;
using JT809.DotNetty.Core.Codecs;
using JT809.DotNetty.Core.Configurations;
using JT809.DotNetty.Core.Converters;
using JT809.DotNetty.Core.Enums;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Internal;
using JT809.DotNetty.Core.Links;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
@@ -50,9 +55,25 @@ namespace JT809.DotNetty.Core
});
}
serviceDescriptors.Configure<JT809Configuration>(configuration.GetSection("JT809Configuration"));
serviceDescriptors.TryAddSingleton<IVerifyCodeGenerator, VerifyCodeGeneratorDefaultImpl>();
serviceDescriptors.TryAddSingleton<IJT809SessionPublishing, JT809SessionPublishingEmptyImpl>();
serviceDescriptors.TryAddSingleton<JT809SimpleSystemCollectService>();
serviceDescriptors.TryAddSingleton<IJT809VerifyCodeGenerator, JT809VerifyCodeGeneratorDefaultImpl>();
serviceDescriptors.TryAddSingleton<JT809AtomicCounterServiceFactory>();
//JT809编解码器
serviceDescriptors.TryAddScoped<JT809Decoder>();
serviceDescriptors.TryAddScoped<JT809Encoder>();
//主从链路连接处理器
serviceDescriptors.TryAddScoped<JT809SubordinateConnectionHandler>();
serviceDescriptors.TryAddScoped<JT809MainServerConnectionHandler>();
//从链路客户端
serviceDescriptors.TryAddSingleton<JT809SubordinateClient>();
//主从链路消息默认业务处理器实现
serviceDescriptors.TryAddSingleton<JT809MainMsgIdHandlerBase, JT809MainMsgIdDefaultHandler>();
serviceDescriptors.TryAddSingleton<JT809SubordinateMsgIdHandlerBase, JT809SubordinateMsgIdDefaultHandler>();
//主从链路消息接收处理器
serviceDescriptors.TryAddScoped<JT809MainServerHandler>();
serviceDescriptors.TryAddScoped<JT809SubordinateServerHandler>();
//主链路服务端
serviceDescriptors.AddHostedService<JT809MainServerHost>();
return serviceDescriptors;
}
}

+ 137
- 0
src/JT809.DotNetty.Core/Links/JT809MainClient.cs Ver fichero

@@ -0,0 +1,137 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT809.DotNetty.Core.Codecs;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT809.DotNetty.Core.Links
{
/// <summary>
/// 主链路客户端
/// 针对企业与监管平台之间
/// </summary>
public sealed class JT809MainClient : IDisposable
{
private Bootstrap bootstrap;

private MultithreadEventLoopGroup group;

private IChannel channel;

private readonly ILogger<JT809MainClient> logger;

private readonly ILoggerFactory loggerFactory;

private readonly IServiceProvider serviceProvider;

private bool disposed = false;

public JT809MainClient(
IServiceProvider provider,
ILoggerFactory loggerFactory)
{
this.serviceProvider = provider;
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<JT809MainClient>();
group = new MultithreadEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//下级平台1分钟发送心跳
//上级平台是3分钟没有发送就断开连接
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("jt809MainLinkTcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }),
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG })));
pipeline.AddLast("jt809MainLinkSystemIdleState", new IdleStateHandler(180, 60, 200));
pipeline.AddLast("jt809MainLinkTcpEncode", scope.ServiceProvider.GetRequiredService<JT809Encoder>());
pipeline.AddLast("jt809MainLinkTcpDecode", scope.ServiceProvider.GetRequiredService<JT809Decoder>());
pipeline.AddLast("jt809MainLinkConnection", scope.ServiceProvider.GetRequiredService<JT809MainServerConnectionHandler>());

}
}));
}

public async void ConnectAsync(string ip,int port)
{
logger.LogInformation($"ip:{ip},port:{port}");
try
{
if (channel == null)
{
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
}
else
{
await channel.CloseAsync();
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
}
}
catch (AggregateException ex)
{
logger.LogError(ex.InnerException, $"ip:{ip},port:{port}");
}
catch (Exception ex)
{
logger.LogError(ex, $"ip:{ip},port:{port}");
}
}

public async void SendAsync(JT809Response jT809Response)
{
if (channel == null) throw new NullReferenceException("Channel Not Open");
if (jT809Response == null) throw new ArgumentNullException("Data is null");
if (channel.Open && channel.Active)
{
await channel.WriteAndFlushAsync(jT809Response);
}
}

private void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
//清理托管资源
channel.CloseAsync();
group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
}
//让类型知道自己已经被释放
disposed = true;
}

~JT809MainClient()
{
//必须为false
//这表明,隐式清理时,只要处理非托管资源就可以了。
Dispose(false);
}

public void Dispose()
{
//必须为true
Dispose(true);
//通知垃圾回收机制不再调用终结器(析构器)
GC.SuppressFinalize(this);
}
}
}

+ 176
- 0
src/JT809.DotNetty.Core/Links/JT809SubordinateClient.cs Ver fichero

@@ -0,0 +1,176 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT809.DotNetty.Core.Codecs;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Metadata;
using JT809.Protocol;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using JT809.Protocol.MessageBody;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.DotNetty.Core.Links
{
/// <summary>
/// 从链路客户端
/// 针对企业与企业之间
/// </summary>
public sealed class JT809SubordinateClient:IDisposable
{
private Bootstrap bootstrap;

private MultithreadEventLoopGroup group;

private IChannel channel;

private readonly ILogger<JT809SubordinateClient> logger;

private readonly ILoggerFactory loggerFactory;

private readonly IServiceProvider serviceProvider;

private readonly IJT809VerifyCodeGenerator verifyCodeGenerator;

private bool disposed = false;

public JT809SubordinateClient(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IJT809VerifyCodeGenerator verifyCodeGenerator)
{
this.serviceProvider = provider;
this.loggerFactory = loggerFactory;
this.verifyCodeGenerator = verifyCodeGenerator;
this.logger = loggerFactory.CreateLogger<JT809SubordinateClient>();
group = new MultithreadEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//下级平台1分钟发送心跳
//上级平台是3分钟没有发送就断开连接

using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("jt809SubLinkTcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }),
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG })));
pipeline.AddLast("jt809SubLinkSystemIdleState", new IdleStateHandler(180, 10, 200));
pipeline.AddLast("jt809SubLinkTcpEncode", scope.ServiceProvider.GetRequiredService<JT809Encoder>());
pipeline.AddLast("jt809SubLinkTcpDecode", scope.ServiceProvider.GetRequiredService<JT809Decoder>());
pipeline.AddLast("jt809SubLinkConnection", scope.ServiceProvider.GetRequiredService<JT809SubordinateConnectionHandler>());

}
}));
}

public async void ConnectAsync(string ip,int port,uint verifyCode,int delay=3000)
{
logger.LogInformation($"ip:{ip},port:{port},verifycode:{verifyCode}");
await Task.Delay(delay);
try
{
if (channel == null)
{
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
}
else
{
await channel.CloseAsync();
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
}
}
catch (AggregateException ex)
{
logger.LogError(ex.InnerException, $"ip:{ip},port:{port},verifycode:{verifyCode}");
}
catch (Exception ex)
{
logger.LogError(ex, $"ip:{ip},port:{port},verifycode:{verifyCode}");
}
}

public async void SendAsync(JT809Response jT809Response)
{
if (channel == null) throw new NullReferenceException("Channel Not Open");
if (jT809Response == null) throw new ArgumentNullException("Data is null");
if (channel.Open && channel.Active)
{
await channel.WriteAndFlushAsync(jT809Response);
}
}

public bool IsOpen
{
get
{
if (channel == null) return false;
return channel.Open && channel.Active;
}
}

private void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
try
{
//发送从链路注销请求
var package = JT809BusinessType.从链路注销请求消息.Create(new JT809_0x9003()
{
VerifyCode = verifyCodeGenerator.Get()
});
JT809Response jT809Response = new JT809Response(package, 100);
channel.WriteAndFlushAsync(jT809Response);
logger.LogInformation($"发送从链路注销请求>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
}
catch (Exception ex)
{
logger.LogError(ex,"发送从链路注销请求");
}
finally
{
//清理托管资源
channel.CloseAsync();
group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
}
}
//让类型知道自己已经被释放
disposed = true;
}

~JT809SubordinateClient()
{
//必须为false
//这表明,隐式清理时,只要处理非托管资源就可以了。
Dispose(false);
}

public void Dispose()
{
//必须为true
Dispose(true);
//通知垃圾回收机制不再调用终结器(析构器)
GC.SuppressFinalize(this);
}
}
}

+ 0
- 113
src/JT809.DotNetty.Core/Links/SubordinateLinkClient.cs Ver fichero

@@ -1,113 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT809.DotNetty.Core.Codecs;
using JT809.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT809.DotNetty.Core.Links
{
/// <summary>
/// 从链路客户端
/// </summary>
public sealed class SubordinateLinkClient:IDisposable
{
private Bootstrap bootstrap;

private MultithreadEventLoopGroup group;

private IChannel channel;

private readonly ILogger<SubordinateLinkClient> logger;

private readonly ILoggerFactory loggerFactory;

private readonly IServiceProvider serviceProvider;

private bool disposed = false;

public SubordinateLinkClient(
IServiceProvider provider,
ILoggerFactory loggerFactory)
{
this.serviceProvider = provider;
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<SubordinateLinkClient>();
}

public async void ConnectAsync(string ip,int port,uint verifyCode)
{
group = new MultithreadEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
//下级平台1分钟发送心跳
//上级平台是3分钟没有发送就断开连接
channel.Pipeline.AddLast("jt809SystemIdleState", new IdleStateHandler(60,180,200));
//pipeline.AddLast(new ClientConnectionHandler(bootstrap, channeldic, loggerFactory));
channel.Pipeline.AddLast("jt809TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }),
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG })));
using (var scope = serviceProvider.CreateScope())
{
channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService<JT809TcpDecoder>());

}
}));
channel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip), port));
}

public async void SendAsync(byte[] data)
{
if (channel == null) throw new NullReferenceException("Channel Not Open");
if (data == null) throw new ArgumentNullException("data is null");
if (channel.Open && channel.Active)
{
await channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
}

private void Dispose(bool disposing)
{
if (disposed)
{
return;
}
if (disposing)
{
//清理托管资源
channel.CloseAsync();
group.ShutdownGracefullyAsync(TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(3));
}
//让类型知道自己已经被释放
disposed = true;
}

~SubordinateLinkClient()
{
//必须为false
//这表明,隐式清理时,只要处理非托管资源就可以了。
Dispose(false);
}

public void Dispose()
{
//必须为true
Dispose(true);
//通知垃圾回收机制不再调用终结器(析构器)
GC.SuppressFinalize(this);
}
}
}

+ 7
- 0
src/JT809.DotNetty.Core/Metadata/JT809Response.cs Ver fichero

@@ -13,6 +13,8 @@ namespace JT809.DotNetty.Core.Metadata
/// </summary>
public int MinBufferSize { get; set; }

public byte[] HexData { get; set; }

public JT809Response()
{

@@ -23,5 +25,10 @@ namespace JT809.DotNetty.Core.Metadata
Package = package;
MinBufferSize = minBufferSize;
}

public JT809Response(byte[] hexData)
{
HexData = hexData;
}
}
}

src/JT809.DotNetty.Core/Metadata/JT809TcpSession.cs → src/JT809.DotNetty.Core/Metadata/JT809Session.cs Ver fichero

@@ -4,9 +4,9 @@ using JT809.Protocol.Enums;

namespace JT809.DotNetty.Core.Metadata
{
public class JT809TcpSession
public class JT809Session
{
public JT809TcpSession(IChannel channel, uint msgGNSSCENTERID)
public JT809Session(IChannel channel, uint msgGNSSCENTERID)
{
MsgGNSSCENTERID = msgGNSSCENTERID;
Channel = channel;

src/JT809.DotNetty.Core/Services/JT809TcpAtomicCounterService.cs → src/JT809.DotNetty.Core/Services/JT809AtomicCounterService.cs Ver fichero

@@ -1,19 +1,22 @@
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Enums;
using JT809.DotNetty.Core.Internal;
using JT809.DotNetty.Core.Metadata;

namespace JT809.DotNetty.Core.Services
{
/// <summary>
/// Tcp计数包服务
/// </summary>
public class JT809TcpAtomicCounterService
public class JT809AtomicCounterService
{
private readonly JT809AtomicCounter MsgSuccessCounter = new JT809AtomicCounter();
private readonly JT809AtomicCounter MsgSuccessCounter;

private readonly JT809AtomicCounter MsgFailCounter = new JT809AtomicCounter();
private readonly JT809AtomicCounter MsgFailCounter;

public JT809TcpAtomicCounterService()
public JT809AtomicCounterService()
{

MsgSuccessCounter=new JT809AtomicCounter();
MsgFailCounter = new JT809AtomicCounter();
}

public void Reset()

+ 31
- 0
src/JT809.DotNetty.Core/Services/JT809AtomicCounterServiceFactory.cs Ver fichero

@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Concurrent;

namespace JT809.DotNetty.Core.Services
{
public class JT809AtomicCounterServiceFactory
{
private static readonly ConcurrentDictionary<string, JT809AtomicCounterService> cache;

static JT809AtomicCounterServiceFactory()
{
cache = new ConcurrentDictionary<string, JT809AtomicCounterService>(StringComparer.OrdinalIgnoreCase);
}

public JT809AtomicCounterService Create(string type)
{
if(cache.TryGetValue(type,out var service))
{
return service;
}
else
{
var serviceNew = new JT809AtomicCounterService();
cache.TryAdd(type, serviceNew);
return serviceNew;
}
}
}
}

+ 98
- 0
src/JT809.DotNetty.Core/Services/JT809MainServerHost.cs Ver fichero

@@ -0,0 +1,98 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT809.DotNetty.Core.Configurations;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;
using JT809.Protocol;
using JT809.DotNetty.Core.Codecs;
using JT809.DotNetty.Core.Handlers;

namespace JT809.DotNetty.Core.Services
{
/// <summary>
/// JT809 Tcp网关服务
/// </summary>
internal class JT809MainServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT809Configuration configuration;
private readonly ILogger<JT809MainServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;
private IByteBufferAllocator serverBufferAllocator;
private ILoggerFactory loggerFactory;

public JT809MainServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT809Configuration> jT809ConfigurationAccessor)
{
serviceProvider = provider;
configuration = jT809ConfigurationAccessor.Value;
logger = loggerFactory.CreateLogger<JT809MainServerHost>();
this.loggerFactory = loggerFactory;
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, configuration.SoBacklog)
.ChildOption(ChannelOption.Allocator, serverBufferAllocator)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
channel.Pipeline.AddLast("jt809MainBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }),
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG })));
channel.Pipeline.AddLast("jt809MainSystemIdleState", new IdleStateHandler(
configuration.ReaderIdleTimeSeconds,
configuration.WriterIdleTimeSeconds,
configuration.AllIdleTimeSeconds));
pipeline.AddLast("jt809MainEncode", new JT809Encoder());
pipeline.AddLast("jt809MainDecode", new JT809Decoder());
channel.Pipeline.AddLast("jt809MainConnection", new JT809MainServerConnectionHandler(loggerFactory));
using (var scope = serviceProvider.CreateScope())
{
channel.Pipeline.AddLast("jt809MainService", scope.ServiceProvider.GetRequiredService<JT809MainServerHandler>());
}
}));
logger.LogInformation($"JT809 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}.");
return bootstrap.BindAsync(configuration.TcpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

src/JT809.DotNetty.Core/Session/JT809TcpSessionManager.cs → src/JT809.DotNetty.Core/Session/JT809SessionManager.cs Ver fichero

@@ -12,21 +12,21 @@ namespace JT809.DotNetty.Core
/// <summary>
/// JT809 Tcp会话管理
/// </summary>
public class JT809TcpSessionManager
public class JT809SessionManager
{
private readonly ILogger<JT809TcpSessionManager> logger;
private readonly ILogger<JT809SessionManager> logger;

private readonly IJT809SessionPublishing jT809SessionPublishing;

public JT809TcpSessionManager(
public JT809SessionManager(
IJT809SessionPublishing jT809SessionPublishing,
ILoggerFactory loggerFactory)
{
this.jT809SessionPublishing = jT809SessionPublishing;
logger = loggerFactory.CreateLogger<JT809TcpSessionManager>();
logger = loggerFactory.CreateLogger<JT809SessionManager>();
}

private ConcurrentDictionary<uint, JT809TcpSession> SessionIdDict = new ConcurrentDictionary<uint, JT809TcpSession>();
private ConcurrentDictionary<uint, JT809Session> SessionIdDict = new ConcurrentDictionary<uint, JT809Session>();

public int SessionCount
{
@@ -36,9 +36,9 @@ namespace JT809.DotNetty.Core
}
}

public JT809TcpSession GetSession(uint msgGNSSCENTERID)
public JT809Session GetSession(uint msgGNSSCENTERID)
{
if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession targetSession))
if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session targetSession))
{
return targetSession;
}
@@ -50,29 +50,30 @@ namespace JT809.DotNetty.Core

public void Heartbeat(uint msgGNSSCENTERID)
{
if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession oldjT808Session))
if (SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session oldjT808Session))
{
oldjT808Session.LastActiveTime = DateTime.Now;
SessionIdDict.TryUpdate(msgGNSSCENTERID, oldjT808Session, oldjT808Session);
}
}

public void TryAdd(JT809TcpSession appSession)
public void TryAdd(IChannel channel, uint msgGNSSCENTERID)
{
if (SessionIdDict.TryAdd(appSession.MsgGNSSCENTERID, appSession))
if (SessionIdDict.ContainsKey(msgGNSSCENTERID)) return;
if (SessionIdDict.TryAdd(msgGNSSCENTERID, new JT809Session(channel, msgGNSSCENTERID)))
{
jT809SessionPublishing.PublishAsync(JT809Constants.SessionOnline, appSession.MsgGNSSCENTERID.ToString());
jT809SessionPublishing.PublishAsync(JT809Constants.SessionOnline, msgGNSSCENTERID.ToString());
}
}

public JT809TcpSession RemoveSession(uint msgGNSSCENTERID)
public JT809Session RemoveSession(uint msgGNSSCENTERID)
{
//可以使用任意mq的发布订阅
if (!SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809TcpSession jT808Session))
if (!SessionIdDict.TryGetValue(msgGNSSCENTERID, out JT809Session jT808Session))
{
return default;
}
if (SessionIdDict.TryRemove(msgGNSSCENTERID, out JT809TcpSession jT808SessionRemove))
if (SessionIdDict.TryRemove(msgGNSSCENTERID, out JT809Session jT808SessionRemove))
{
logger.LogInformation($">>>{msgGNSSCENTERID} Session Remove.");
jT809SessionPublishing.PublishAsync(JT809Constants.SessionOffline, msgGNSSCENTERID.ToString());
@@ -91,7 +92,7 @@ namespace JT809.DotNetty.Core
{
foreach (var key in keys)
{
SessionIdDict.TryRemove(key, out JT809TcpSession jT808SessionRemove);
SessionIdDict.TryRemove(key, out JT809Session jT808SessionRemove);
}
string nos = string.Join(",", keys);
logger.LogInformation($">>>{nos} Channel Remove.");
@@ -99,7 +100,7 @@ namespace JT809.DotNetty.Core
}
}

public IEnumerable<JT809TcpSession> GetAll()
public IEnumerable<JT809Session> GetAll()
{
return SessionIdDict.Select(s => s.Value).ToList();
}

+ 6
- 2
src/JT809.DotNetty.Tcp/Handlers/JT809MsgIdDefaultTcpHandler.cs Ver fichero

@@ -1,5 +1,7 @@
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Interfaces;
using JT809.DotNetty.Core.Links;
using System;
using System.Collections.Generic;
using System.Text;
@@ -9,9 +11,11 @@ namespace JT809.DotNetty.Tcp.Handlers
/// <summary>
/// 默认消息处理业务实现
/// </summary>
internal class JT809MsgIdDefaultTcpHandler : JT809MsgIdTcpHandlerBase
internal class JT809MsgIdDefaultTcpHandler : JT809MainMsgIdHandlerBase
{
public JT809MsgIdDefaultTcpHandler(JT809TcpSessionManager sessionManager) : base(sessionManager)
public JT809MsgIdDefaultTcpHandler(IJT809VerifyCodeGenerator verifyCodeGenerator,
JT809SubordinateClient subordinateLinkClient, JT809SessionManager sessionManager)
: base(verifyCodeGenerator, subordinateLinkClient, sessionManager)
{
}
}


+ 2
- 2
src/JT809.DotNetty.Tcp/Handlers/JT809TcpConnectionHandler.cs Ver fichero

@@ -14,10 +14,10 @@ namespace JT809.DotNetty.Tcp.Handlers
{
private readonly ILogger<JT809TcpConnectionHandler> logger;

private readonly JT809TcpSessionManager jT809SessionManager;
private readonly JT809SessionManager jT809SessionManager;

public JT809TcpConnectionHandler(
JT809TcpSessionManager jT809SessionManager,
JT809SessionManager jT809SessionManager,
ILoggerFactory loggerFactory)
{
this.jT809SessionManager = jT809SessionManager;


+ 9
- 8
src/JT809.DotNetty.Tcp/Handlers/JT809TcpServerHandler.cs Ver fichero

@@ -8,6 +8,7 @@ using JT809.DotNetty.Core.Services;
using JT809.DotNetty.Core;
using JT809.DotNetty.Core.Handlers;
using JT809.DotNetty.Core.Metadata;
using JT809.DotNetty.Core.Enums;

namespace JT809.DotNetty.Tcp.Handlers
{
@@ -16,24 +17,24 @@ namespace JT809.DotNetty.Tcp.Handlers
/// </summary>
internal class JT809TcpServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT809MsgIdTcpHandlerBase handler;
private readonly JT809MainMsgIdHandlerBase handler;
private readonly JT809TcpSessionManager jT809SessionManager;
private readonly JT809SessionManager jT809SessionManager;

private readonly JT809TcpAtomicCounterService jT809AtomicCounterService;
private readonly JT809AtomicCounterService jT809AtomicCounterService;

private readonly ILogger<JT809TcpServerHandler> logger;

public JT809TcpServerHandler(
ILoggerFactory loggerFactory,
JT809MsgIdTcpHandlerBase handler,
JT809TcpAtomicCounterService jT809AtomicCounterService,
JT809TcpSessionManager jT809SessionManager
JT809MainMsgIdHandlerBase handler,
JT809AtomicCounterServiceFactory jT809AtomicCounterServiceFactorty,
JT809SessionManager jT809SessionManager
)
{
this.handler = handler;
this.jT809SessionManager = jT809SessionManager;
this.jT809AtomicCounterService = jT809AtomicCounterService;
this.jT809AtomicCounterService = jT809AtomicCounterServiceFactorty.Create(JT809AtomicCounterType.ServerMain.ToString()); ;
logger = loggerFactory.CreateLogger<JT809TcpServerHandler>();
}

@@ -48,7 +49,7 @@ namespace JT809.DotNetty.Tcp.Handlers
{
logger.LogDebug("accept package success count<<<" + jT809AtomicCounterService.MsgSuccessCount.ToString());
}
jT809SessionManager.TryAdd(new JT809TcpSession(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID));
jT809SessionManager.TryAdd(ctx.Channel, jT809Package.Header.MsgGNSSCENTERID);
Func<JT809Request, JT809Response> handlerFunc;
if (handler.HandlerDict.TryGetValue(jT809Package.Header.BusinessType, out handlerFunc))
{


+ 3
- 4
src/JT809.DotNetty.Tcp/JT809TcpDotnettyExtensions.cs Ver fichero

@@ -19,11 +19,10 @@ namespace JT809.DotNetty.Tcp
{
public static IServiceCollection AddJT809TcpHost(this IServiceCollection serviceDescriptors)
{
serviceDescriptors.TryAddSingleton<JT809TcpSessionManager>();
serviceDescriptors.TryAddSingleton<JT809TcpAtomicCounterService>();
serviceDescriptors.TryAddSingleton<JT809MsgIdTcpHandlerBase, JT809MsgIdDefaultTcpHandler>();
serviceDescriptors.TryAddSingleton<JT809SessionManager>();
serviceDescriptors.TryAddSingleton<JT809MainMsgIdHandlerBase, JT809MsgIdDefaultTcpHandler>();
serviceDescriptors.TryAddScoped<JT809TcpConnectionHandler>();
serviceDescriptors.TryAddScoped<JT809TcpDecoder>();
serviceDescriptors.TryAddScoped<JT809Decoder>();
serviceDescriptors.TryAddScoped<JT809TcpServerHandler>();
serviceDescriptors.AddHostedService<JT809TcpServerHost>();
return serviceDescriptors;


+ 1
- 1
src/JT809.DotNetty.Tcp/JT809TcpServerHost.cs Ver fichero

@@ -74,7 +74,7 @@ namespace JT809.DotNetty.Tcp
channel.Pipeline.AddLast("jt809TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT809Package.BEGINFLAG }),
Unpooled.CopiedBuffer(new byte[] { JT809Package.ENDFLAG })));
channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService<JT809TcpDecoder>());
channel.Pipeline.AddLast("jt809TcpDecode", scope.ServiceProvider.GetRequiredService<JT809Decoder>());
channel.Pipeline.AddLast("jt809TcpService", scope.ServiceProvider.GetRequiredService<JT809TcpServerHandler>());
}
}));


+ 1
- 1
src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs Ver fichero

@@ -23,7 +23,7 @@ namespace JT809.DotNetty.Host.Test
});

//主链路登录请求消息
//5B000000480000008510010134140E010000000000270F0134140E32303139303232323132372E302E302E31000000000000000000000000000000000000000000000003297B8D5D
//5B00000048000000851001013353D5010000000000270F0133530D32303134303831333132372E302E302E3100000000000000000000000000000000000000000000001FA3275F5D
//主链路注销请求消息
//5B000000260000008510030134140E010000000000270F0001E24031323334353600003FE15D
//主链路连接保持请求消息


Cargando…
Cancelar
Guardar