diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808AtomicCounterDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808AtomicCounterDto.cs new file mode 100644 index 0000000..b61c2a8 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808AtomicCounterDto.cs @@ -0,0 +1,12 @@ +namespace JT808.DotNetty.Abstractions.Dtos +{ + /// + /// 包计数器服务 + /// + public class JT808AtomicCounterDto + { + public long MsgSuccessCount { get; set; } + + public long MsgFailCount { get; set; } + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808DefaultResultDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808DefaultResultDto.cs new file mode 100644 index 0000000..7faf2ee --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808DefaultResultDto.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Abstractions.Dtos +{ + public class JT808DefaultResultDto: JT808ResultDto + { + public JT808DefaultResultDto() + { + Data = "Hello,JT808 WebAPI"; + Code = JT808ResultCode.Ok; + } + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808IPAddressDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808IPAddressDto.cs new file mode 100644 index 0000000..6978e87 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808IPAddressDto.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Abstractions.Dtos +{ + public class JT808IPAddressDto + { + public string Host { get; set; } + + public int Port { get; set; } + + private EndPoint endPoint; + + public EndPoint EndPoint + { + get + { + if (endPoint == null) + { + if (IPAddress.TryParse(Host, out IPAddress ip)) + { + endPoint = new IPEndPoint(ip, Port); + } + else + { + endPoint = new DnsEndPoint(Host, Port); + } + } + return endPoint; + } + } + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808ResultDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808ResultDto.cs new file mode 100644 index 0000000..97f00b9 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808ResultDto.cs @@ -0,0 +1,24 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Abstractions.Dtos +{ + public class JT808ResultDto + { + public string Message { get; set; } + + public int Code { get; set; } + + public T Data { get; set; } + } + + public class JT808ResultCode + { + public const int Ok = 200; + public const int Empty = 201; + public const int NotFound = 404; + public const int Fail = 400; + public const int Error = 500; + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808SourcePackageChannelInfoDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808SourcePackageChannelInfoDto.cs new file mode 100644 index 0000000..6ff0c37 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808SourcePackageChannelInfoDto.cs @@ -0,0 +1,33 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Abstractions.Dtos +{ + /// + /// 原包通道信息 + /// + public class JT808SourcePackageChannelInfoDto + { + /// + /// 远程地址 + /// + public string RemoteAddress { get; set; } + /// + /// 本地地址 + /// + public string LocalAddress { get; set; } + /// + /// 是否注册 + /// + public bool Registered { get; set; } + /// + /// 是否活动 + /// + public bool Active { get; set; } + /// + /// 是否打开 + /// + public bool Open { get; set; } + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808TcpSessionInfoDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808TcpSessionInfoDto.cs new file mode 100644 index 0000000..4a71a69 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808TcpSessionInfoDto.cs @@ -0,0 +1,24 @@ +using System; + +namespace JT808.DotNetty.Abstractions.Dtos +{ + public class JT808TcpSessionInfoDto + { + /// + /// 最后上线时间 + /// + public DateTime LastActiveTime { get; set; } + /// + /// 上线时间 + /// + public DateTime StartTime { get; set; } + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + /// + /// 远程ip地址 + /// + public string RemoteAddressIP { get; set; } + } +} diff --git a/src/JT808.DotNetty.Abstractions/Dtos/JT808UnificationSendRequestDto.cs b/src/JT808.DotNetty.Abstractions/Dtos/JT808UnificationSendRequestDto.cs new file mode 100644 index 0000000..8f32473 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/Dtos/JT808UnificationSendRequestDto.cs @@ -0,0 +1,11 @@ +namespace JT808.DotNetty.Abstractions.Dtos +{ + /// + /// 统一下发请求参数 + /// + public class JT808UnificationSendRequestDto + { + public string TerminalPhoneNo { get; set; } + public byte[] Data { get; set; } + } +} diff --git a/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs b/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs new file mode 100644 index 0000000..13968c6 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; + +namespace JT808.DotNetty.Abstractions +{ + /// + /// 会话通知(在线/离线) + /// + public interface IJT808SessionPublishing + { + Task PublishAsync(string topicName, string key, string value); + } +} diff --git a/src/JT808.DotNetty.Abstractions/IJT808SourcePackageDispatcher.cs b/src/JT808.DotNetty.Abstractions/IJT808SourcePackageDispatcher.cs new file mode 100644 index 0000000..f3fa1dd --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/IJT808SourcePackageDispatcher.cs @@ -0,0 +1,15 @@ +using System.Threading.Tasks; + +namespace JT808.DotNetty.Abstractions +{ + /// + /// 源包分发器 + /// 自定义源包分发器业务 + /// ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(IJT808SourcePackageDispatcher),typeof(JT808SourcePackageDispatcherDefaultImpl),ServiceLifetime.Singleton)); + /// + public interface IJT808SourcePackageDispatcher + { + Task SendAsync(byte[] data); + } +} diff --git a/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj b/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj new file mode 100644 index 0000000..3993216 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj @@ -0,0 +1,8 @@ + + + + netstandard2.0 + 7.1 + + + diff --git a/src/JT808.DotNetty.Abstractions/JT808Constants.cs b/src/JT808.DotNetty.Abstractions/JT808Constants.cs new file mode 100644 index 0000000..4f591d5 --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/JT808Constants.cs @@ -0,0 +1,9 @@ +namespace JT808.DotNetty.Abstractions +{ + public static class JT808Constants + { + public const string SessionOnline= "JT808SessionOnline"; + + public const string SessionOffline = "JT808SessionOffline"; + } +} diff --git a/src/JT808.DotNetty.Codecs/JT808.DotNetty.Codecs.csproj b/src/JT808.DotNetty.Codecs/JT808.DotNetty.Codecs.csproj new file mode 100644 index 0000000..19d837a --- /dev/null +++ b/src/JT808.DotNetty.Codecs/JT808.DotNetty.Codecs.csproj @@ -0,0 +1,14 @@ + + + + netstandard2.0 + 7.1 + + + + + + + + + diff --git a/src/JT808.DotNetty.Codecs/JT808TcpDecoder.cs b/src/JT808.DotNetty.Codecs/JT808TcpDecoder.cs new file mode 100644 index 0000000..376194e --- /dev/null +++ b/src/JT808.DotNetty.Codecs/JT808TcpDecoder.cs @@ -0,0 +1,20 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using System.Collections.Generic; +using JT808.Protocol; +using DotNetty.Transport.Channels; + +namespace JT808.DotNetty.Codecs +{ + public class JT808TcpDecoder : ByteToMessageDecoder + { + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + byte[] buffer = new byte[input.Capacity + 2]; + input.ReadBytes(buffer, 1, input.Capacity); + buffer[0] = JT808Package.BeginFlag; + buffer[input.Capacity + 1] = JT808Package.EndFlag; + output.Add(buffer); + } + } +} diff --git a/src/JT808.DotNetty.Codecs/JT808UdpDecoder.cs b/src/JT808.DotNetty.Codecs/JT808UdpDecoder.cs new file mode 100644 index 0000000..fcf3455 --- /dev/null +++ b/src/JT808.DotNetty.Codecs/JT808UdpDecoder.cs @@ -0,0 +1,20 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using System.Collections.Generic; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Codecs +{ + public class JT808UdpDecoder : MessageToMessageDecoder + { + protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List output) + { + IByteBuffer byteBuffer = message.Content; + byte[] buffer = new byte[byteBuffer.ReadableBytes]; + byteBuffer.ReadBytes(buffer); + output.Add(new JT808UdpPackage(buffer, message.Sender)); + } + } +} diff --git a/src/JT808.DotNetty.Configurations/JT808.DotNetty.Configurations.csproj b/src/JT808.DotNetty.Configurations/JT808.DotNetty.Configurations.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/JT808.DotNetty.Configurations/JT808.DotNetty.Configurations.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/src/JT808.DotNetty.Configurations/JT808ClientConfiguration.cs b/src/JT808.DotNetty.Configurations/JT808ClientConfiguration.cs new file mode 100644 index 0000000..e2243d2 --- /dev/null +++ b/src/JT808.DotNetty.Configurations/JT808ClientConfiguration.cs @@ -0,0 +1,32 @@ +using System.Net; + +namespace JT808.DotNetty.Configurations +{ + public class JT808ClientConfiguration + { + public string Host { get; set; } + + public int Port { get; set; } + + private EndPoint endPoint; + + public EndPoint EndPoint + { + get + { + if (endPoint == null) + { + if (IPAddress.TryParse(Host, out IPAddress ip)) + { + endPoint = new IPEndPoint(ip, Port); + } + else + { + endPoint = new DnsEndPoint(Host, Port); + } + } + return endPoint; + } + } + } +} diff --git a/src/JT808.DotNetty.Configurations/JT808Configuration.cs b/src/JT808.DotNetty.Configurations/JT808Configuration.cs new file mode 100644 index 0000000..3565e4b --- /dev/null +++ b/src/JT808.DotNetty.Configurations/JT808Configuration.cs @@ -0,0 +1,54 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Configurations +{ + public class JT808Configuration + { + public int Port { get; set; } = 808; + + public int UDPPort { get; set; } = 809; + + public int QuietPeriodSeconds { get; set; } = 1; + + public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); + + public int ShutdownTimeoutSeconds { get; set; } = 3; + + public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); + + public int SoBacklog { get; set; } = 8192; + + public int EventLoopCount { get; set; } = Environment.ProcessorCount; + + public int ReaderIdleTimeSeconds { get; set; } = 3600; + + public int WriterIdleTimeSeconds { get; set; } = 3600; + + public int AllIdleTimeSeconds { get; set; } = 3600; + + /// + /// WebApi服务 + /// 默认828端口 + /// + public int WebApiPort { get; set; } = 828; + + /// + /// 源包分发器配置 + /// + public List SourcePackageDispatcherClientConfigurations { get; set; } + + /// + /// 转发远程地址 (可选项)知道转发的地址有利于提升性能 + /// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + /// + public List ForwardingRemoteAddress { get; set; } + + public string RedisHost { get; set; } + } +} diff --git a/src/JT808.DotNetty.Configurations/JT808Constants.cs b/src/JT808.DotNetty.Configurations/JT808Constants.cs new file mode 100644 index 0000000..9d92e11 --- /dev/null +++ b/src/JT808.DotNetty.Configurations/JT808Constants.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Configurations +{ + public static class JT808Constants + { + public const string SessionOnline= "JT808SessionOnline"; + + public const string SessionOffline = "JT808SessionOffline"; + } +} diff --git a/src/JT808.DotNetty.Core/Configurations/JT808ClientConfiguration.cs b/src/JT808.DotNetty.Core/Configurations/JT808ClientConfiguration.cs new file mode 100644 index 0000000..dcd9392 --- /dev/null +++ b/src/JT808.DotNetty.Core/Configurations/JT808ClientConfiguration.cs @@ -0,0 +1,35 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Core.Configurations +{ + public class JT808ClientConfiguration + { + public string Host { get; set; } + + public int Port { get; set; } + + private EndPoint endPoint; + + public EndPoint EndPoint + { + get + { + if (endPoint == null) + { + if (IPAddress.TryParse(Host, out IPAddress ip)) + { + endPoint = new IPEndPoint(ip, Port); + } + else + { + endPoint = new DnsEndPoint(Host, Port); + } + } + return endPoint; + } + } + } +} diff --git a/src/JT808.DotNetty.Core/Configurations/JT808Configuration.cs b/src/JT808.DotNetty.Core/Configurations/JT808Configuration.cs new file mode 100644 index 0000000..8b1b163 --- /dev/null +++ b/src/JT808.DotNetty.Core/Configurations/JT808Configuration.cs @@ -0,0 +1,52 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Core.Configurations +{ + public class JT808Configuration + { + public int TcpPort { get; set; } = 808; + + public int UdpPort { get; set; } = 818; + + public int QuietPeriodSeconds { get; set; } = 1; + + public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); + + public int ShutdownTimeoutSeconds { get; set; } = 3; + + public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); + + public int SoBacklog { get; set; } = 8192; + + public int EventLoopCount { get; set; } = Environment.ProcessorCount; + + public int ReaderIdleTimeSeconds { get; set; } = 3600; + + public int WriterIdleTimeSeconds { get; set; } = 3600; + + public int AllIdleTimeSeconds { get; set; } = 3600; + + /// + /// WebApi服务 + /// 默认828端口 + /// + public int WebApiPort { get; set; } = 828; + + /// + /// 源包分发器配置 + /// + public List SourcePackageDispatcherClientConfigurations { get; set; } + + /// + /// 转发远程地址 (可选项)知道转发的地址有利于提升性能 + /// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + /// + public List ForwardingRemoteAddress { get; set; } + } +} diff --git a/src/JT808.DotNetty.Core/Handlers/JT808MsgIdHttpHandlerBase.cs b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdHttpHandlerBase.cs new file mode 100644 index 0000000..88bd089 --- /dev/null +++ b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdHttpHandlerBase.cs @@ -0,0 +1,84 @@ +using System; +using System.Collections.Generic; +using System.Text; +using JT808.DotNetty.Abstractions.Dtos; +using JT808.DotNetty.Core.Metadata; +using Newtonsoft.Json; + +namespace JT808.DotNetty.Core.Handlers +{ + /// + /// 基于webapi http模式抽象消息处理业务 + /// 自定义消息处理业务 + /// 注意: + /// 1.ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHttpHandlerBase),typeof(JT808MsgIdCustomHttpHandlerImpl),ServiceLifetime.Singleton)); + /// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize + /// + public abstract class JT808MsgIdHttpHandlerBase + { + private const string RouteTablePrefix = "/jt808api"; + /// + /// 初始化消息处理业务 + /// + protected JT808MsgIdHttpHandlerBase() + { + HandlerDict = new Dictionary>(); + } + + protected void CreateRoute(string url, Func func) + { + HandlerDict.Add($"{RouteTablePrefix}/{url}", func); + } + + public Dictionary> HandlerDict { get; } + + protected JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject) + { + byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject)); + return new JT808HttpResponse() + { + Data = data + }; + } + + public JT808HttpResponse DefaultHttpResponse() + { + byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808DefaultResultDto())); + return new JT808HttpResponse(json); + } + + public JT808HttpResponse EmptyHttpResponse() + { + byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() + { + Code = JT808ResultCode.Empty, + Message = "内容为空", + Data = "Content Empty" + })); + return new JT808HttpResponse(json); + } + + public JT808HttpResponse NotFoundHttpResponse() + { + byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() + { + Code = JT808ResultCode.NotFound, + Message = "没有该服务", + Data = "没有该服务" + })); + return new JT808HttpResponse(json); + } + + public JT808HttpResponse ErrorHttpResponse(Exception ex) + { + byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto() + { + Code = JT808ResultCode.Error, + Message = JsonConvert.SerializeObject(ex), + Data = ex.Message + })); + return new JT808HttpResponse(json); + } + } +} diff --git a/src/JT808.DotNetty.Core/Handlers/JT808MsgIdTcpHandlerBase.cs b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdTcpHandlerBase.cs new file mode 100644 index 0000000..67bca08 --- /dev/null +++ b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdTcpHandlerBase.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using JT808.DotNetty.Core.Metadata; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; + +namespace JT808.DotNetty.Core.Handlers +{ + /// + /// 基于Tcp模式抽象消息处理业务 + /// 自定义消息处理业务 + /// 注意: + /// 1.ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdTcpHandlerBase),typeof(JT808MsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton)); + /// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize + /// + public abstract class JT808MsgIdTcpHandlerBase + { + protected JT808TcpSessionManager sessionManager { get; } + /// + /// 初始化消息处理业务 + /// + protected JT808MsgIdTcpHandlerBase(JT808TcpSessionManager sessionManager) + { + this.sessionManager = sessionManager; + HandlerDict = new Dictionary> + { + {JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, + {JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, + {JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, + {JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, + {JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, + {JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, + {JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, + {JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } + }; + } + + public Dictionary> HandlerDict { get; protected set; } + /// + /// 终端通用应答 + /// 平台无需回复 + /// 实现自己的业务 + /// + /// + /// + /// + public virtual JT808Response Msg0x0001(JT808Request request) + { + return null; + } + /// + /// 终端心跳 + /// + /// + /// + /// + public virtual JT808Response Msg0x0002(JT808Request request) + { + sessionManager.Heartbeat(request.Package.Header.TerminalPhoneNo); + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端注销 + /// + /// + /// + /// + public virtual JT808Response Msg0x0003(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端注册 + /// + /// + /// + /// + public virtual JT808Response Msg0x0100(JT808Request request) + { + return new JT808Response(JT808MsgId.终端注册应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8100() + { + Code = "J" + request.Package.Header.TerminalPhoneNo, + JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端鉴权 + /// + /// + /// + /// + public virtual JT808Response Msg0x0102(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 位置信息汇报 + /// + /// + /// + /// + public virtual JT808Response Msg0x0200(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 定位数据批量上传 + /// + /// + /// + /// + public virtual JT808Response Msg0x0704(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId =request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 数据上行透传 + /// + /// + /// + /// + public virtual JT808Response Msg0x0900(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId =request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + } +} diff --git a/src/JT808.DotNetty.Core/Handlers/JT808MsgIdUdpHandlerBase.cs b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdUdpHandlerBase.cs new file mode 100644 index 0000000..5012734 --- /dev/null +++ b/src/JT808.DotNetty.Core/Handlers/JT808MsgIdUdpHandlerBase.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using JT808.DotNetty.Core.Metadata; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; + +namespace JT808.DotNetty.Core.Handlers +{ + /// + /// 基于Udp模式的抽象消息处理业务 + /// 自定义消息处理业务 + /// 注意: + /// 1.ConfigureServices: + /// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdUdpHandlerBase),typeof(JT808MsgIdCustomUdpHandlerImpl),ServiceLifetime.Singleton)); + /// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize + /// + public abstract class JT808MsgIdUdpHandlerBase + { + protected JT808UdpSessionManager sessionManager { get; } + /// + /// 初始化消息处理业务 + /// + protected JT808MsgIdUdpHandlerBase(JT808UdpSessionManager sessionManager) + { + this.sessionManager = sessionManager; + HandlerDict = new Dictionary> + { + {JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, + {JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, + {JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, + {JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, + {JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, + {JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, + {JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, + {JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } + }; + } + + public Dictionary> HandlerDict { get; protected set; } + /// + /// 终端通用应答 + /// 平台无需回复 + /// 实现自己的业务 + /// + /// + /// + /// + public virtual JT808Response Msg0x0001(JT808Request request) + { + return null; + } + /// + /// 终端心跳 + /// + /// + /// + /// + public virtual JT808Response Msg0x0002(JT808Request request) + { + sessionManager.Heartbeat(request.Package.Header.TerminalPhoneNo); + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端注销 + /// + /// + /// + /// + public virtual JT808Response Msg0x0003(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端注册 + /// + /// + /// + /// + public virtual JT808Response Msg0x0100(JT808Request request) + { + return new JT808Response(JT808MsgId.终端注册应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8100() + { + Code = "J" + request.Package.Header.TerminalPhoneNo, + JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 终端鉴权 + /// + /// + /// + /// + public virtual JT808Response Msg0x0102(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 位置信息汇报 + /// + /// + /// + /// + public virtual JT808Response Msg0x0200(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId = request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 定位数据批量上传 + /// + /// + /// + /// + public virtual JT808Response Msg0x0704(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId =request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + /// + /// 数据上行透传 + /// + /// + /// + /// + public virtual JT808Response Msg0x0900(JT808Request request) + { + return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() + { + MsgId =request.Package.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Package.Header.MsgNum + })); + } + } +} diff --git a/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs b/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs new file mode 100644 index 0000000..7c5d99b --- /dev/null +++ b/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs @@ -0,0 +1,13 @@ +using JT808.DotNetty.Abstractions; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Core +{ + internal class JT808SessionPublishingEmptyImpl : IJT808SessionPublishing + { + public Task PublishAsync(string topicName, string key, string value) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.DotNetty.Core/Impls/JT808SourcePackageDispatcherEmptyImpl.cs b/src/JT808.DotNetty.Core/Impls/JT808SourcePackageDispatcherEmptyImpl.cs new file mode 100644 index 0000000..f17ae75 --- /dev/null +++ b/src/JT808.DotNetty.Core/Impls/JT808SourcePackageDispatcherEmptyImpl.cs @@ -0,0 +1,16 @@ +using JT808.DotNetty.Abstractions; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Core.Impls +{ + /// + /// 原包分发器默认空实现 + /// + public class JT808SourcePackageDispatcherEmptyImpl : IJT808SourcePackageDispatcher + { + public Task SendAsync(byte[] data) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.DotNetty.Core/Interfaces/IJT808TcpSessionService.cs b/src/JT808.DotNetty.Core/Interfaces/IJT808TcpSessionService.cs new file mode 100644 index 0000000..e18982b --- /dev/null +++ b/src/JT808.DotNetty.Core/Interfaces/IJT808TcpSessionService.cs @@ -0,0 +1,25 @@ +using JT808.DotNetty.Abstractions.Dtos; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Core.Interfaces +{ + /// + /// JT808 Tcp会话服务 + /// + public interface IJT808TcpSessionService + { + /// + /// 获取会话集合 + /// + /// + JT808ResultDto> GetAll(); + /// + /// 通过设备终端号移除对应会话 + /// + /// + /// + JT808ResultDto RemoveByTerminalPhoneNo(string terminalPhoneNo); + } +} diff --git a/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationTcpSendService.cs b/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationTcpSendService.cs new file mode 100644 index 0000000..c4cc5db --- /dev/null +++ b/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationTcpSendService.cs @@ -0,0 +1,12 @@ +using JT808.DotNetty.Abstractions.Dtos; + +namespace JT808.DotNetty.Core.Interfaces +{ + /// + /// JT808基于tcp的统一下发命令服务 + /// + internal interface IJT808UnificationTcpSendService + { + JT808ResultDto Send(string terminalPhoneNo, byte[] data); + } +} diff --git a/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationUdpSendService.cs b/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationUdpSendService.cs new file mode 100644 index 0000000..722f02d --- /dev/null +++ b/src/JT808.DotNetty.Core/Interfaces/IJT808UnificationUdpSendService.cs @@ -0,0 +1,12 @@ +using JT808.DotNetty.Abstractions.Dtos; + +namespace JT808.DotNetty.Core.Interfaces +{ + /// + /// JT808基于udp的统一下发命令服务 + /// + internal interface IJT808UnificationUdpSendService + { + JT808ResultDto Send(string terminalPhoneNo, byte[] data); + } +} diff --git a/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj b/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj new file mode 100644 index 0000000..351436a --- /dev/null +++ b/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj @@ -0,0 +1,20 @@ + + + + netstandard2.0 + 7.1 + + + + + + + + + + + + + + + diff --git a/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs b/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs new file mode 100644 index 0000000..5e20969 --- /dev/null +++ b/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs @@ -0,0 +1,27 @@ +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Configurations; +using JT808.DotNetty.Core.Impls; +using JT808.DotNetty.Core.Interfaces; +using JT808.DotNetty.Internal; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] + +namespace JT808.DotNetty.Core +{ + public static class JT808CoreDotnettyExtensions + { + public static IServiceCollection AddJT808Core(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT808Configuration")); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + return serviceDescriptors; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Metadata/JT808AtomicCounter.cs b/src/JT808.DotNetty.Core/Metadata/JT808AtomicCounter.cs new file mode 100644 index 0000000..c0d68c5 --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808AtomicCounter.cs @@ -0,0 +1,44 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.DotNetty.Core.Metadata +{ + /// + /// + /// + /// + public class JT808AtomicCounter + { + long counter = 0; + + public JT808AtomicCounter(long initialCount = 0) + { + this.counter = initialCount; + } + + public long Increment() + { + return Interlocked.Increment(ref counter); + } + + public long Add(long len) + { + return Interlocked.Add(ref counter,len); + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); + } + + public long Count + { + get + { + return Interlocked.Read(ref counter); + } + } + } +} diff --git a/src/JT808.DotNetty.Core/Metadata/JT808HttpRequest.cs b/src/JT808.DotNetty.Core/Metadata/JT808HttpRequest.cs new file mode 100644 index 0000000..274226b --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808HttpRequest.cs @@ -0,0 +1,22 @@ +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808HttpRequest + { + public string Json { get; set; } + + public JT808HttpRequest() + { + + } + + public JT808HttpRequest(string json) + { + Json = json; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Metadata/JT808HttpResponse.cs b/src/JT808.DotNetty.Core/Metadata/JT808HttpResponse.cs new file mode 100644 index 0000000..3bd3379 --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808HttpResponse.cs @@ -0,0 +1,22 @@ +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808HttpResponse + { + public byte[] Data { get; set; } + + public JT808HttpResponse() + { + + } + + public JT808HttpResponse(byte[] data) + { + this.Data = data; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Metadata/JT808Request.cs b/src/JT808.DotNetty.Core/Metadata/JT808Request.cs new file mode 100644 index 0000000..84aa29e --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808Request.cs @@ -0,0 +1,23 @@ +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808Request + { + public JT808HeaderPackage Package { get; } + + /// + /// 用于消息发送 + /// + public byte[] OriginalPackage { get;} + + public JT808Request(JT808HeaderPackage package, byte[] originalPackage) + { + Package = package; + OriginalPackage = originalPackage; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Metadata/JT808Response.cs b/src/JT808.DotNetty.Core/Metadata/JT808Response.cs new file mode 100644 index 0000000..1322e74 --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808Response.cs @@ -0,0 +1,27 @@ +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Reflection; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808Response + { + public JT808Package Package { get; set; } + /// + /// 根据实际情况适当调整包的大小 + /// + public int MinBufferSize { get; set; } + + public JT808Response() + { + + } + + public JT808Response(JT808Package package, int minBufferSize = 1024) + { + Package = package; + MinBufferSize = minBufferSize; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Metadata/JT808TcpSession.cs b/src/JT808.DotNetty.Core/Metadata/JT808TcpSession.cs new file mode 100644 index 0000000..6d0e651 --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808TcpSession.cs @@ -0,0 +1,29 @@ +using DotNetty.Transport.Channels; +using System; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808TcpSession + { + public JT808TcpSession(IChannel channel, string terminalPhoneNo) + { + Channel = channel; + TerminalPhoneNo = terminalPhoneNo; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + } + + public JT808TcpSession() { } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + + public IChannel Channel { get; set; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; set; } + } +} diff --git a/src/JT808.DotNetty.Core/Metadata/JT808UdpPackage.cs b/src/JT808.DotNetty.Core/Metadata/JT808UdpPackage.cs new file mode 100644 index 0000000..939f93f --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808UdpPackage.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808UdpPackage + { + public JT808UdpPackage(byte[] buffer, EndPoint sender) + { + Buffer = buffer; + Sender = sender; + } + + public byte[] Buffer { get; } + + public EndPoint Sender { get; } + } +} diff --git a/src/JT808.DotNetty.Core/Metadata/JT808UdpSession.cs b/src/JT808.DotNetty.Core/Metadata/JT808UdpSession.cs new file mode 100644 index 0000000..7984202 --- /dev/null +++ b/src/JT808.DotNetty.Core/Metadata/JT808UdpSession.cs @@ -0,0 +1,35 @@ +using DotNetty.Transport.Channels; +using System; +using System.Net; + +namespace JT808.DotNetty.Core.Metadata +{ + public class JT808UdpSession + { + public JT808UdpSession(IChannel channel, + EndPoint sender, + string terminalPhoneNo) + { + Channel = channel; + TerminalPhoneNo = terminalPhoneNo; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + Sender = sender; + } + + public EndPoint Sender { get; set; } + + public JT808UdpSession() { } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + + public IChannel Channel { get; set; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; set; } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808TcpAtomicCounterService.cs b/src/JT808.DotNetty.Core/Services/JT808TcpAtomicCounterService.cs new file mode 100644 index 0000000..069fb51 --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808TcpAtomicCounterService.cs @@ -0,0 +1,45 @@ +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Core.Services +{ + /// + /// Tcp计数包服务 + /// + public class JT808TcpAtomicCounterService + { + private readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); + + private readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); + + public JT808TcpAtomicCounterService() + { + + } + + public long MsgSuccessIncrement() + { + return MsgSuccessCounter.Increment(); + } + + public long MsgSuccessCount + { + get + { + return MsgSuccessCounter.Count; + } + } + + public long MsgFailIncrement() + { + return MsgFailCounter.Increment(); + } + + public long MsgFailCount + { + get + { + return MsgFailCounter.Count; + } + } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808TcpSessionService.cs b/src/JT808.DotNetty.Core/Services/JT808TcpSessionService.cs new file mode 100644 index 0000000..7ff872d --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808TcpSessionService.cs @@ -0,0 +1,73 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using JT808.DotNetty.Abstractions.Dtos; +using JT808.DotNetty.Core.Interfaces; + +namespace JT808.DotNetty.Core.Services +{ + internal class JT808TcpSessionService : IJT808TcpSessionService + { + private readonly JT808TcpSessionManager jT808SessionManager; + + public JT808TcpSessionService( + JT808TcpSessionManager jT808SessionManager) + { + this.jT808SessionManager = jT808SessionManager; + } + + public JT808ResultDto> GetAll() + { + JT808ResultDto> resultDto = new JT808ResultDto>(); + try + { + resultDto.Data = jT808SessionManager.GetAll().Select(s => new JT808TcpSessionInfoDto + { + LastActiveTime = s.LastActiveTime, + StartTime = s.StartTime, + TerminalPhoneNo = s.TerminalPhoneNo, + RemoteAddressIP = s.Channel.RemoteAddress.ToString(), + }).ToList(); + resultDto.Code = JT808ResultCode.Ok; + } + catch (Exception ex) + { + resultDto.Data = null; + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); + } + return resultDto; + } + + public JT808ResultDto RemoveByTerminalPhoneNo(string terminalPhoneNo) + { + JT808ResultDto resultDto = new JT808ResultDto(); + try + { + var session = jT808SessionManager.RemoveSession(terminalPhoneNo); + if (session != null) + { + if(session.Channel.Open) + { + session.Channel.CloseAsync(); + } + } + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = true; + } + catch (AggregateException ex) + { + resultDto.Data = false; + resultDto.Code = 500; + resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); + } + catch (Exception ex) + { + resultDto.Data = false; + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); + } + return resultDto; + } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808TransmitAddressFilterService.cs b/src/JT808.DotNetty.Core/Services/JT808TransmitAddressFilterService.cs new file mode 100644 index 0000000..17b44c6 --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808TransmitAddressFilterService.cs @@ -0,0 +1,88 @@ +using JT808.DotNetty.Abstractions.Dtos; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using JT808.DotNetty.Core.Configurations; + +namespace JT808.DotNetty.Core.Services +{ + /// + /// JT808转发地址过滤服务 + /// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + /// + public class JT808TransmitAddressFilterService : IDisposable + { + private readonly IOptionsMonitor jT808ConfigurationOptionsMonitor; + + private ConcurrentDictionary ForwardingRemoteAddresssDict; + + private IDisposable jT808ConfigurationOptionsMonitorDisposable; + + public JT808TransmitAddressFilterService( + IOptionsMonitor jT808ConfigurationOptionsMonitor) + { + this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; + ForwardingRemoteAddresssDict = new ConcurrentDictionary(); + InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress); + //OnChange 源码多播委托 + jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options => + { + InitForwardingRemoteAddress(options.ForwardingRemoteAddress); + }); + } + + private void InitForwardingRemoteAddress(List jT808ClientConfigurations) + { + if (jT808ClientConfigurations != null && jT808ClientConfigurations.Count > 0) + { + foreach (var item in jT808ClientConfigurations) + { + string host = item.EndPoint.ToString(); + ForwardingRemoteAddresssDict.TryAdd(host, 0); + } + } + } + + public bool ContainsKey(EndPoint endPoint) + { + return ForwardingRemoteAddresssDict.ContainsKey(endPoint.ToString()); + } + + public JT808ResultDto Add(JT808IPAddressDto jT808IPAddressDto) + { + string host = jT808IPAddressDto.EndPoint.ToString(); + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryAdd(host,0) }; + } + + public JT808ResultDto Remove(JT808IPAddressDto jT808IPAddressDto) + { + string host = jT808IPAddressDto.EndPoint.ToString(); + if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null && + jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== host)) + { + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" }; + } + else + { + return new JT808ResultDto() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryRemove(host,out var temp) }; + } + } + + public JT808ResultDto> GetAll() + { + return new JT808ResultDto>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.Select(s=>s.Key).ToList() }; + } + + public void Dispose() + { + jT808ConfigurationOptionsMonitorDisposable.Dispose(); + } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808UdpAtomicCounterService.cs b/src/JT808.DotNetty.Core/Services/JT808UdpAtomicCounterService.cs new file mode 100644 index 0000000..a5d0697 --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808UdpAtomicCounterService.cs @@ -0,0 +1,45 @@ +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Core.Services +{ + /// + /// Udp计数包服务 + /// + public class JT808UdpAtomicCounterService + { + private readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); + + private readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); + + public JT808UdpAtomicCounterService() + { + + } + + public long MsgSuccessIncrement() + { + return MsgSuccessCounter.Increment(); + } + + public long MsgSuccessCount + { + get + { + return MsgSuccessCounter.Count; + } + } + + public long MsgFailIncrement() + { + return MsgFailCounter.Increment(); + } + + public long MsgFailCount + { + get + { + return MsgFailCounter.Count; + } + } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808UnificationTcpSendService.cs b/src/JT808.DotNetty.Core/Services/JT808UnificationTcpSendService.cs new file mode 100644 index 0000000..7b50fdc --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808UnificationTcpSendService.cs @@ -0,0 +1,55 @@ +using DotNetty.Buffers; +using JT808.DotNetty.Abstractions.Dtos; +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Interfaces; +using System; + +namespace JT808.DotNetty.Internal +{ + internal class JT808UnificationTcpSendService : IJT808UnificationTcpSendService + { + private readonly JT808TcpSessionManager jT808SessionManager; + + public JT808UnificationTcpSendService(JT808TcpSessionManager jT808SessionManager) + { + this.jT808SessionManager = jT808SessionManager; + } + + public JT808ResultDto Send(string terminalPhoneNo, byte[] data) + { + JT808ResultDto resultDto = new JT808ResultDto(); + try + { + var session = jT808SessionManager.GetSession(terminalPhoneNo); + if (session != null) + { + if (session.Channel.Open) + { + session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = true; + } + else + { + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = false; + resultDto.Message = "offline"; + } + } + else + { + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = false; + resultDto.Message = "offline"; + } + } + catch (Exception ex) + { + resultDto.Data = false; + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); + } + return resultDto; + } + } +} diff --git a/src/JT808.DotNetty.Core/Services/JT808UnificationUdpSendService.cs b/src/JT808.DotNetty.Core/Services/JT808UnificationUdpSendService.cs new file mode 100644 index 0000000..293e950 --- /dev/null +++ b/src/JT808.DotNetty.Core/Services/JT808UnificationUdpSendService.cs @@ -0,0 +1,55 @@ +using DotNetty.Buffers; +using JT808.DotNetty.Abstractions.Dtos; +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Interfaces; +using System; + +namespace JT808.DotNetty.Internal +{ + internal class JT808UnificationUdpSendService : IJT808UnificationUdpSendService + { + private readonly JT808UdpSessionManager jT808SessionManager; + + public JT808UnificationUdpSendService(JT808UdpSessionManager jT808SessionManager) + { + this.jT808SessionManager = jT808SessionManager; + } + + public JT808ResultDto Send(string terminalPhoneNo, byte[] data) + { + JT808ResultDto resultDto = new JT808ResultDto(); + try + { + var session = jT808SessionManager.GetSession(terminalPhoneNo); + if (session != null) + { + if (session.Channel.Open) + { + session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = true; + } + else + { + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = false; + resultDto.Message = "offline"; + } + } + else + { + resultDto.Code = JT808ResultCode.Ok; + resultDto.Data = false; + resultDto.Message = "offline"; + } + } + catch (Exception ex) + { + resultDto.Data = false; + resultDto.Code = JT808ResultCode.Error; + resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); + } + return resultDto; + } + } +} diff --git a/src/JT808.DotNetty.Core/Session/JT808TcpSessionManager.cs b/src/JT808.DotNetty.Core/Session/JT808TcpSessionManager.cs new file mode 100644 index 0000000..d1c9308 --- /dev/null +++ b/src/JT808.DotNetty.Core/Session/JT808TcpSessionManager.cs @@ -0,0 +1,141 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using DotNetty.Transport.Channels; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Core +{ + /// + /// JT808 Tcp会话管理 + /// + public class JT808TcpSessionManager + { + private readonly ILogger logger; + + private readonly IJT808SessionPublishing jT808SessionPublishing; + + public JT808TcpSessionManager( + IJT808SessionPublishing jT808SessionPublishing, + ILoggerFactory loggerFactory) + { + this.jT808SessionPublishing = jT808SessionPublishing; + logger = loggerFactory.CreateLogger(); + } + + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public JT808TcpSession GetSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) + return default; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + + public void Heartbeat(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) return; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession oldjT808Session)) + { + oldjT808Session.LastActiveTime = DateTime.Now; + SessionIdDict.TryUpdate(terminalPhoneNo, oldjT808Session, oldjT808Session); + } + } + + public void TryAdd(JT808TcpSession appSession) + { + // 解决了设备号跟通道绑定到一起,不需要用到通道本身的SessionId + // 不管设备下发更改了设备终端号,只要是没有在内存中就当是新的 + // 存在的问题: + // 1.原先老的如何销毁 + // 2.这时候用的通道是相同的,设备终端是不同的 + // 当设备主动或者服务器断开以后,可以释放,这点内存忽略不计,况且更改设备号不是很频繁。 + if (SessionIdDict.TryAdd(appSession.TerminalPhoneNo, appSession)) + { + //使用场景: + //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, + //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 + //todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅 + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo); + } + } + + public JT808TcpSession RemoveSession(string terminalPhoneNo) + { + //todo: 设备离线可以进行通知 + //todo: 使用Redis 发布订阅 + if (string.IsNullOrEmpty(terminalPhoneNo)) return default; + if (!SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession jT808Session)) + { + return default; + } + // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 + //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == jT808Session.Channel.Id).Select(s => s.Key).ToList(); + //2.存在则一个个移除 + if (terminalPhoneNos.Count > 1) + { + //3.移除包括当前的设备号 + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT808TcpSession jT808SessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); + return jT808Session; + } + else + { + if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808TcpSession jT808SessionRemove)) + { + logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo); + return jT808SessionRemove; + } + else + { + return default; + } + } + } + + public void RemoveSessionByChannel(IChannel channel) + { + //todo: 设备离线可以进行通知 + //todo: 使用Redis 发布订阅 + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT808TcpSession jT808SessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{nos} Channel Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); + } + + public IEnumerable GetAll() + { + return SessionIdDict.Select(s => s.Value).ToList(); + } + } +} + diff --git a/src/JT808.DotNetty.Core/Session/JT808UdpSessionManager.cs b/src/JT808.DotNetty.Core/Session/JT808UdpSessionManager.cs new file mode 100644 index 0000000..84415ec --- /dev/null +++ b/src/JT808.DotNetty.Core/Session/JT808UdpSessionManager.cs @@ -0,0 +1,126 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Core +{ + /// + /// JT808 udp会话管理 + /// + public class JT808UdpSessionManager + { + private readonly ILogger logger; + + private readonly IJT808SessionPublishing jT808SessionPublishing; + + public JT808UdpSessionManager( + IJT808SessionPublishing jT808SessionPublishing, + ILoggerFactory loggerFactory) + { + this.jT808SessionPublishing = jT808SessionPublishing; + logger = loggerFactory.CreateLogger(); + } + + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public JT808UdpSession GetSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) + return default; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + + public void TryAdd(JT808UdpSession appSession) + { + // 解决了设备号跟通道绑定到一起,不需要用到通道本身的SessionId + // 不管设备下发更改了设备终端号,只要是没有在内存中就当是新的 + // 存在的问题: + // 1.原先老的如何销毁 + // 2.这时候用的通道是相同的,设备终端是不同的 + // 当设备主动或者服务器断开以后,可以释放,这点内存忽略不计,况且更改设备号不是很频繁。 + if (SessionIdDict.TryAdd(appSession.TerminalPhoneNo, appSession)) + { + //使用场景: + //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, + //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 + //todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅 + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo); + } + } + + public void Heartbeat(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) return; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession oldjT808Session)) + { + oldjT808Session.LastActiveTime = DateTime.Now; + SessionIdDict.TryUpdate(terminalPhoneNo, oldjT808Session, oldjT808Session); + } + } + + public JT808UdpSession RemoveSession(string terminalPhoneNo) + { + //todo: 设备离线可以进行通知 + //todo: 使用Redis 发布订阅 + if (string.IsNullOrEmpty(terminalPhoneNo)) return default; + if (!SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession jT808Session)) + { + return default; + } + // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 + //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == jT808Session.Channel.Id).Select(s => s.Key).ToList(); + //2.存在则一个个移除 + if (terminalPhoneNos.Count > 1) + { + //3.移除包括当前的设备号 + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT808UdpSession jT808SessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); + return jT808Session; + } + else + { + if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808UdpSession jT808SessionRemove)) + { + logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo); + return jT808SessionRemove; + } + else + { + return default; + } + } + } + + public IEnumerable GetAll() + { + return SessionIdDict.Select(s => s.Value).ToList(); + } + } +} + diff --git a/src/JT808.DotNetty.Http/Class1.cs b/src/JT808.DotNetty.Http/Class1.cs new file mode 100644 index 0000000..667f6a5 --- /dev/null +++ b/src/JT808.DotNetty.Http/Class1.cs @@ -0,0 +1,8 @@ +using System; + +namespace JT808.DotNetty.Http +{ + public class Class1 + { + } +} diff --git a/src/JT808.DotNetty.Http/JT808.DotNetty.Http.csproj b/src/JT808.DotNetty.Http/JT808.DotNetty.Http.csproj new file mode 100644 index 0000000..9f5c4f4 --- /dev/null +++ b/src/JT808.DotNetty.Http/JT808.DotNetty.Http.csproj @@ -0,0 +1,7 @@ + + + + netstandard2.0 + + + diff --git a/src/JT808.DotNetty.Tcp/Handlers/JT808MsgIdDefaultTcpHandler.cs b/src/JT808.DotNetty.Tcp/Handlers/JT808MsgIdDefaultTcpHandler.cs new file mode 100644 index 0000000..034d184 --- /dev/null +++ b/src/JT808.DotNetty.Tcp/Handlers/JT808MsgIdDefaultTcpHandler.cs @@ -0,0 +1,18 @@ +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Handlers; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Tcp +{ + /// + /// 默认消息处理业务实现 + /// + internal class JT808MsgIdDefaultTcpHandler : JT808MsgIdTcpHandlerBase + { + public JT808MsgIdDefaultTcpHandler(JT808TcpSessionManager sessionManager) : base(sessionManager) + { + } + } +} diff --git a/src/JT808.DotNetty.Tcp/Handlers/JT808TcpConnectionHandler.cs b/src/JT808.DotNetty.Tcp/Handlers/JT808TcpConnectionHandler.cs new file mode 100644 index 0000000..0eb76c7 --- /dev/null +++ b/src/JT808.DotNetty.Tcp/Handlers/JT808TcpConnectionHandler.cs @@ -0,0 +1,104 @@ +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT808.DotNetty.Core; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Handlers +{ + /// + /// JT808服务通道处理程序 + /// + internal class JT808TcpConnectionHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + private readonly JT808TcpSessionManager jT808SessionManager; + + public JT808TcpConnectionHandler( + JT808TcpSessionManager jT808SessionManager, + ILoggerFactory loggerFactory) + { + this.jT808SessionManager = jT808SessionManager; + logger = loggerFactory.CreateLogger(); + } + + /// + /// 通道激活 + /// + /// + public override void ChannelActive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } Successful client connection to server."); + base.ChannelActive(context); + } + + /// + /// 设备主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($">>>{ channelId } The client disconnects from the server."); + jT808SessionManager.RemoveSessionByChannel(context.Channel); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); + jT808SessionManager.RemoveSessionByChannel(context.Channel); + return base.CloseAsync(context); + } + + public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush(); + + /// + /// 超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + if(idleStateEvent.State== IdleState.ReaderIdle) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 + jT808SessionManager.RemoveSessionByChannel(context.Channel); + context.CloseAsync(); + } + // 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: + // 1.消息的序列化 + // 2.消息的下发 + // 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle + // 就跟神兽貔貅一样。。。 + } + base.UserEventTriggered(context, evt); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogError(exception,$"{channelId} {exception.Message}" ); + jT808SessionManager.RemoveSessionByChannel(context.Channel); + context.CloseAsync(); + } + } +} + diff --git a/src/JT808.DotNetty.Tcp/Handlers/JT808TcpServerHandler.cs b/src/JT808.DotNetty.Tcp/Handlers/JT808TcpServerHandler.cs new file mode 100644 index 0000000..a0fdcdc --- /dev/null +++ b/src/JT808.DotNetty.Tcp/Handlers/JT808TcpServerHandler.cs @@ -0,0 +1,96 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT808.Protocol; +using System; +using JT808.DotNetty.Core; +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Core.Services; +using JT808.DotNetty.Core.Metadata; + +namespace JT808.DotNetty.Tcp.Handlers +{ + /// + /// JT808服务端处理程序 + /// + internal class JT808TcpServerHandler : SimpleChannelInboundHandler + { + private readonly JT808MsgIdTcpHandlerBase handler; + + private readonly JT808TcpSessionManager jT808SessionManager; + + private readonly JT808TransmitAddressFilterService jT808TransmitAddressFilterService; + + private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; + + private readonly JT808TcpAtomicCounterService jT808AtomicCounterService; + + private readonly ILogger logger; + + public JT808TcpServerHandler( + ILoggerFactory loggerFactory, + JT808TransmitAddressFilterService jT808TransmitAddressFilterService, + IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, + JT808MsgIdTcpHandlerBase handler, + JT808TcpAtomicCounterService jT808AtomicCounterService, + JT808TcpSessionManager jT808SessionManager) + { + this.jT808TransmitAddressFilterService = jT808TransmitAddressFilterService; + this.handler = handler; + this.jT808SessionManager = jT808SessionManager; + this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; + this.jT808AtomicCounterService = jT808AtomicCounterService; + logger = loggerFactory.CreateLogger(); + } + + + protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) + { + try + { + jT808SourcePackageDispatcher?.SendAsync(msg); + //解析到头部,然后根据具体的消息Id通过队列去进行消费 + //要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, + //解析具体的消息体,具体调用JT808Serializer.Deserialize + JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize(msg); + jT808AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); + } + jT808SessionManager.TryAdd(new JT808TcpSession(ctx.Channel, jT808HeaderPackage.Header.TerminalPhoneNo)); + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) + { + JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg)); + if (jT808Response != null) + { + if (!jT808TransmitAddressFilterService.ContainsKey(ctx.Channel.RemoteAddress)) + { + ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize))); + } + } + } + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + catch (Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + } + } +} diff --git a/src/JT808.DotNetty.Tcp/JT808.DotNetty.Tcp.csproj b/src/JT808.DotNetty.Tcp/JT808.DotNetty.Tcp.csproj new file mode 100644 index 0000000..0e326c3 --- /dev/null +++ b/src/JT808.DotNetty.Tcp/JT808.DotNetty.Tcp.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + diff --git a/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs b/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs new file mode 100644 index 0000000..adbe54a --- /dev/null +++ b/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs @@ -0,0 +1,34 @@ +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Core.Services; +using JT808.DotNetty.Handlers; +using JT808.DotNetty.Tcp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using Newtonsoft.Json; +using System; +using System.Reflection; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] + +namespace JT808.DotNetty.Tcp +{ + public static class JT808TcpDotnettyExtensions + { + public static IServiceCollection AddJT808TcpHost(this IServiceCollection serviceDescriptors) + { + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.AddHostedService(); + return serviceDescriptors; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Tcp/JT808TcpServerHost.cs b/src/JT808.DotNetty.Tcp/JT808TcpServerHost.cs new file mode 100644 index 0000000..30c53cb --- /dev/null +++ b/src/JT808.DotNetty.Tcp/JT808TcpServerHost.cs @@ -0,0 +1,95 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Core.Configurations; +using JT808.DotNetty.Handlers; +using JT808.DotNetty.Tcp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Tcp +{ + /// + /// JT808 Tcp网关服务 + /// + internal class JT808TcpServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT808Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + private IByteBufferAllocator serverBufferAllocator; + + public JT808TcpServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); + serverBufferAllocator = new PooledByteBufferAllocator(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .ChildOption(ChannelOption.Allocator, serverBufferAllocator) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler( + configuration.ReaderIdleTimeSeconds, + configuration.WriterIdleTimeSeconds, + configuration.AllIdleTimeSeconds)); + channel.Pipeline.AddLast("jt808TcpConnection", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt808TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), + Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); + channel.Pipeline.AddLast("jt808TcpDecode", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("jt808TcpService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); + return bootstrap.BindAsync(configuration.TcpPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT808.DotNetty.Test/JT808.DotNetty.Test.csproj b/src/JT808.DotNetty.Test/JT808.DotNetty.Test.csproj index 23d9e7c..993c60c 100644 --- a/src/JT808.DotNetty.Test/JT808.DotNetty.Test.csproj +++ b/src/JT808.DotNetty.Test/JT808.DotNetty.Test.csproj @@ -4,6 +4,8 @@ netcoreapp2.2 false + + 7.1 diff --git a/src/JT808.DotNetty.Udp/Handlers/JT808MsgIdDefaultUdpHandler.cs b/src/JT808.DotNetty.Udp/Handlers/JT808MsgIdDefaultUdpHandler.cs new file mode 100644 index 0000000..4b23ebd --- /dev/null +++ b/src/JT808.DotNetty.Udp/Handlers/JT808MsgIdDefaultUdpHandler.cs @@ -0,0 +1,15 @@ +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Handlers; + +namespace JT808.DotNetty.Udp +{ + /// + /// 默认消息处理业务实现 + /// + internal class JT808MsgIdDefaultUdpHandler : JT808MsgIdUdpHandlerBase + { + public JT808MsgIdDefaultUdpHandler(JT808UdpSessionManager sessionManager) : base(sessionManager) + { + } + } +} diff --git a/src/JT808.DotNetty.Udp/Handlers/JT808UdpServerHandler.cs b/src/JT808.DotNetty.Udp/Handlers/JT808UdpServerHandler.cs new file mode 100644 index 0000000..ba170c6 --- /dev/null +++ b/src/JT808.DotNetty.Udp/Handlers/JT808UdpServerHandler.cs @@ -0,0 +1,88 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT808.Protocol; +using System; +using Microsoft.Extensions.Logging; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Core.Metadata; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Services; +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Handlers; + +namespace JT808.DotNetty.Udp.Handlers +{ + /// + /// JT808 Udp服务端处理程序 + /// + internal class JT808UdpServerHandler : SimpleChannelInboundHandler + { + private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; + + private readonly JT808UdpAtomicCounterService jT808UdpAtomicCounterService; + + private readonly ILogger logger; + + private readonly JT808UdpSessionManager jT808UdpSessionManager; + + private readonly JT808MsgIdUdpHandlerBase handler; + + public JT808UdpServerHandler( + ILoggerFactory loggerFactory, + IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, + JT808MsgIdUdpHandlerBase handler, + JT808UdpAtomicCounterService jT808UdpAtomicCounterService, + JT808UdpSessionManager jT808UdpSessionManager) + { + this.handler = handler; + this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; + this.jT808UdpAtomicCounterService = jT808UdpAtomicCounterService; + this.jT808UdpSessionManager = jT808UdpSessionManager; + logger = loggerFactory.CreateLogger(); + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UdpPackage msg) + { + try + { + jT808SourcePackageDispatcher?.SendAsync(msg.Buffer); + //解析到头部,然后根据具体的消息Id通过队列去进行消费 + //要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, + //解析具体的消息体,具体调用JT808Serializer.Deserialize + JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize(msg.Buffer); + jT808UdpAtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT808UdpAtomicCounterService.MsgSuccessCount.ToString()); + } + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) + { + JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg.Buffer)); + if (jT808Response != null) + { + ctx.WriteAndFlushAsync(new DatagramPacket(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize)), msg.Sender)); + } + } + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + jT808UdpAtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808UdpAtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); + } + } + catch (Exception ex) + { + jT808UdpAtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808UdpAtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); + } + } + } + } +} diff --git a/src/JT808.DotNetty.Udp/JT808.DotNetty.Udp.csproj b/src/JT808.DotNetty.Udp/JT808.DotNetty.Udp.csproj new file mode 100644 index 0000000..e04aa62 --- /dev/null +++ b/src/JT808.DotNetty.Udp/JT808.DotNetty.Udp.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + 7.1 + + + + + + + + + + + + + + + + + diff --git a/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs b/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs new file mode 100644 index 0000000..b43117b --- /dev/null +++ b/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs @@ -0,0 +1,32 @@ +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Core; +using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Core.Services; +using JT808.DotNetty.Udp; +using JT808.DotNetty.Udp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using Newtonsoft.Json; +using System; +using System.Reflection; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] + +namespace JT808.DotNetty.Udp +{ + public static class JT808UdpDotnettyExtensions + { + public static IServiceCollection AddJT808UdpHost(this IServiceCollection serviceDescriptors) + { + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.AddHostedService(); + return serviceDescriptors; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Udp/JT808UdpServerHost.cs b/src/JT808.DotNetty.Udp/JT808UdpServerHost.cs new file mode 100644 index 0000000..dc35f8a --- /dev/null +++ b/src/JT808.DotNetty.Udp/JT808UdpServerHost.cs @@ -0,0 +1,76 @@ +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Core.Configurations; +using JT808.DotNetty.Udp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Udp +{ + /// + /// JT808 Udp网关服务 + /// + internal class JT808UdpServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT808Configuration configuration; + private readonly ILogger logger; + private MultithreadEventLoopGroup group; + private IChannel bootstrapChannel; + + public JT808UdpServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + group = new MultithreadEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.Group(group); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast(new JT808UdpDecoder()); + pipeline.AddLast("jt808UdpService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}."); + return bootstrap.BindAsync(configuration.UdpPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT808.DotNetty.WebApi/Handlers/JT808MsgIdDefaultWebApiHandler.cs b/src/JT808.DotNetty.WebApi/Handlers/JT808MsgIdDefaultWebApiHandler.cs new file mode 100644 index 0000000..37cefef --- /dev/null +++ b/src/JT808.DotNetty.WebApi/Handlers/JT808MsgIdDefaultWebApiHandler.cs @@ -0,0 +1,27 @@ +using JT808.DotNetty.Core.Handlers; + +namespace JT808.DotNetty.WebApi.Handlers +{ + /// + /// 默认消息处理业务实现 + /// + internal class JT808MsgIdDefaultWebApiHandler : JT808MsgIdHttpHandlerBase + { + private const string sessionRoutePrefix = "Session"; + + private const string sourcePackagePrefix = "SourcePackage"; + + private const string transmitPrefix = "Transmit"; + + //1.TCP一套注入 + //2.UDP一套注入 + //3.统一的一套注入 + + public JT808MsgIdDefaultWebApiHandler( + + ) + { + + } + } +} diff --git a/src/JT808.DotNetty.WebApi/Handlers/JT808WebAPIServerHandler.cs b/src/JT808.DotNetty.WebApi/Handlers/JT808WebAPIServerHandler.cs new file mode 100644 index 0000000..159625d --- /dev/null +++ b/src/JT808.DotNetty.WebApi/Handlers/JT808WebAPIServerHandler.cs @@ -0,0 +1,82 @@ +using DotNetty.Buffers; +using DotNetty.Codecs.Http; +using DotNetty.Common.Utilities; +using DotNetty.Transport.Channels; +using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Core.Metadata; +using Microsoft.Extensions.Logging; +using System; +using System.Text; + +namespace JT808.DotNetty.WebApi.Handlers +{ + /// + /// jt808 webapi服务 + /// 请求量不大,只支持JSON格式并且只支持post发数据 + /// ref: dotnetty HttpServer + /// + internal class JT808WebAPIServerHandler : SimpleChannelInboundHandler + { + private static readonly AsciiString TypeJson = AsciiString.Cached("application/json"); + private static readonly AsciiString ServerName = AsciiString.Cached("JT808WebAPINetty"); + private static readonly AsciiString ContentTypeEntity = HttpHeaderNames.ContentType; + private static readonly AsciiString DateEntity = HttpHeaderNames.Date; + private static readonly AsciiString ContentLengthEntity = HttpHeaderNames.ContentLength; + private static readonly AsciiString ServerEntity = HttpHeaderNames.Server; + private readonly JT808MsgIdHttpHandlerBase jT808MsgIdHttpHandlerBase; + private readonly ILogger logger; + + public JT808WebAPIServerHandler( + JT808MsgIdHttpHandlerBase jT808MsgIdHttpHandlerBase, + ILoggerFactory loggerFactory) + { + this.jT808MsgIdHttpHandlerBase = jT808MsgIdHttpHandlerBase; + logger = loggerFactory.CreateLogger(); + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, IFullHttpRequest msg) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Uri:{msg.Uri}"); + logger.LogDebug($"Content:{msg.Content.ToString(Encoding.UTF8)}"); + } + JT808HttpResponse jT808HttpResponse = null; + if (jT808MsgIdHttpHandlerBase.HandlerDict.TryGetValue(msg.Uri,out var funcHandler)) + { + jT808HttpResponse = funcHandler( new JT808HttpRequest() { Json = msg.Content.ToString(Encoding.UTF8)}); + } + else + { + jT808HttpResponse = jT808MsgIdHttpHandlerBase.NotFoundHttpResponse(); + } + if (jT808HttpResponse != null) + { + WriteResponse(ctx, Unpooled.WrappedBuffer(jT808HttpResponse.Data), TypeJson, jT808HttpResponse.Data.Length); + } + } + + private void WriteResponse(IChannelHandlerContext ctx, IByteBuffer buf, ICharSequence contentType, int contentLength) + { + // Build the response object. + var response = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK, buf, false); + HttpHeaders headers = response.Headers; + headers.Set(ContentTypeEntity, contentType); + headers.Set(ServerEntity, ServerName); + headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); + headers.Set(ContentLengthEntity, contentLength); + // Close the non-keep-alive connection after the write operation is done. + ctx.WriteAndFlushAsync(response); + ctx.CloseAsync(); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + WriteResponse(context, Unpooled.WrappedBuffer(jT808MsgIdHttpHandlerBase.ErrorHttpResponse(exception).Data), TypeJson, jT808MsgIdHttpHandlerBase.ErrorHttpResponse(exception).Data.Length); + logger.LogError(exception, exception.Message); + context.CloseAsync(); + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + } +} diff --git a/src/JT808.DotNetty.WebApi/JT808.DotNetty.WebApi.csproj b/src/JT808.DotNetty.WebApi/JT808.DotNetty.WebApi.csproj new file mode 100644 index 0000000..ccfddaa --- /dev/null +++ b/src/JT808.DotNetty.WebApi/JT808.DotNetty.WebApi.csproj @@ -0,0 +1,18 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + diff --git a/src/JT808.DotNetty.WebApi/JT808WebAPIServerHost.cs b/src/JT808.DotNetty.WebApi/JT808WebAPIServerHost.cs new file mode 100644 index 0000000..8439ac1 --- /dev/null +++ b/src/JT808.DotNetty.WebApi/JT808WebAPIServerHost.cs @@ -0,0 +1,82 @@ +using DotNetty.Codecs.Http; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Core.Configurations; +using JT808.DotNetty.WebApi.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty.WebApi +{ + /// + /// JT808 集成一个webapi服务 + /// + internal class JT808WebAPIServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT808Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + + public JT808WebAPIServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger = loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, 1); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, 8192) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast("http_encoder", new HttpResponseEncoder()); + pipeline.AddLast("http_decoder", new HttpRequestDecoder(4096, 8192, 8192, false)); + //将多个消息转换为单一的request或者response对象 =>IFullHttpRequest + pipeline.AddLast("http_aggregator", new HttpObjectAggregator(65536)); + pipeline.AddLast("http_jt808webapihandler", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT808 WebAPI Server start at {IPAddress.Any}:{configuration.WebApiPort}."); + return bootstrap.BindAsync(configuration.WebApiPort).ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs b/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs new file mode 100644 index 0000000..e662f20 --- /dev/null +++ b/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs @@ -0,0 +1,21 @@ +using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.WebApi.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] + +namespace JT808.DotNetty.WebApi +{ + public static class JT808WebApiDotnettyExtensions + { + public static IServiceCollection AddJT808WebApiHost(this IServiceCollection serviceDescriptors) + { + serviceDescriptors.TryAddSingleton(); + serviceDescriptors.TryAddScoped(); + serviceDescriptors.AddHostedService(); + return serviceDescriptors; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.sln b/src/JT808.DotNetty.sln index 9e88174..7183ebb 100644 --- a/src/JT808.DotNetty.sln +++ b/src/JT808.DotNetty.sln @@ -5,14 +5,26 @@ VisualStudioVersion = 15.0.28307.168 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty", "JT808.DotNetty\JT808.DotNetty.csproj", "{80C7F67E-6B7C-4178-8726-ADD3695622DD}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Hosting", "JT808.DotNetty.Hosting\JT808.DotNetty.Hosting.csproj", "{46772BD5-4132-48A7-856B-11D658F7ADDB}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Test", "JT808.DotNetty.Test\JT808.DotNetty.Test.csproj", "{7315D030-16CA-4AC8-B892-720F3D78C651}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{B5A80356-5AF6-449F-9D8B-3C1BBB9D2443}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Protocol", "JT808.Protocol\src\JT808.Protocol\JT808.Protocol.csproj", "{9FCA2EE9-8253-41AA-A64C-9883413864F9}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Udp", "JT808.DotNetty.Udp\JT808.DotNetty.Udp.csproj", "{C960084C-2CF4-4748-AD35-D2384285D6A3}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Codecs", "JT808.DotNetty.Codecs\JT808.DotNetty.Codecs.csproj", "{42513FBA-1D8F-4F91-A74F-25E06C7BD027}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Core", "JT808.DotNetty.Core\JT808.DotNetty.Core.csproj", "{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Abstractions", "JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj", "{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{3BD7FF02-8516-4A77-A385-9FDCDD792E22}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Tcp", "JT808.DotNetty.Tcp\JT808.DotNetty.Tcp.csproj", "{330CD783-5564-4083-ABFC-573CDC369F50}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.WebApi", "JT808.DotNetty.WebApi\JT808.DotNetty.WebApi.csproj", "{B783DE53-CE2A-4225-921F-04E5E57B28F3}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -23,10 +35,6 @@ Global {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.Build.0 = Debug|Any CPU {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.ActiveCfg = Release|Any CPU {80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.Build.0 = Release|Any CPU - {46772BD5-4132-48A7-856B-11D658F7ADDB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {46772BD5-4132-48A7-856B-11D658F7ADDB}.Debug|Any CPU.Build.0 = Debug|Any CPU - {46772BD5-4132-48A7-856B-11D658F7ADDB}.Release|Any CPU.ActiveCfg = Release|Any CPU - {46772BD5-4132-48A7-856B-11D658F7ADDB}.Release|Any CPU.Build.0 = Release|Any CPU {7315D030-16CA-4AC8-B892-720F3D78C651}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {7315D030-16CA-4AC8-B892-720F3D78C651}.Debug|Any CPU.Build.0 = Debug|Any CPU {7315D030-16CA-4AC8-B892-720F3D78C651}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -35,11 +43,37 @@ Global {9FCA2EE9-8253-41AA-A64C-9883413864F9}.Debug|Any CPU.Build.0 = Debug|Any CPU {9FCA2EE9-8253-41AA-A64C-9883413864F9}.Release|Any CPU.ActiveCfg = Release|Any CPU {9FCA2EE9-8253-41AA-A64C-9883413864F9}.Release|Any CPU.Build.0 = Release|Any CPU + {C960084C-2CF4-4748-AD35-D2384285D6A3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C960084C-2CF4-4748-AD35-D2384285D6A3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C960084C-2CF4-4748-AD35-D2384285D6A3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C960084C-2CF4-4748-AD35-D2384285D6A3}.Release|Any CPU.Build.0 = Release|Any CPU + {42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Debug|Any CPU.Build.0 = Debug|Any CPU + {42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Release|Any CPU.ActiveCfg = Release|Any CPU + {42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Release|Any CPU.Build.0 = Release|Any CPU + {67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Debug|Any CPU.Build.0 = Debug|Any CPU + {67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Release|Any CPU.ActiveCfg = Release|Any CPU + {67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Release|Any CPU.Build.0 = Release|Any CPU + {4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Release|Any CPU.Build.0 = Release|Any CPU + {330CD783-5564-4083-ABFC-573CDC369F50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {330CD783-5564-4083-ABFC-573CDC369F50}.Debug|Any CPU.Build.0 = Debug|Any CPU + {330CD783-5564-4083-ABFC-573CDC369F50}.Release|Any CPU.ActiveCfg = Release|Any CPU + {330CD783-5564-4083-ABFC-573CDC369F50}.Release|Any CPU.Build.0 = Release|Any CPU + {B783DE53-CE2A-4225-921F-04E5E57B28F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B783DE53-CE2A-4225-921F-04E5E57B28F3}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B783DE53-CE2A-4225-921F-04E5E57B28F3}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B783DE53-CE2A-4225-921F-04E5E57B28F3}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection GlobalSection(NestedProjects) = preSolution + {80C7F67E-6B7C-4178-8726-ADD3695622DD} = {B5A80356-5AF6-449F-9D8B-3C1BBB9D2443} + {7315D030-16CA-4AC8-B892-720F3D78C651} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} {9FCA2EE9-8253-41AA-A64C-9883413864F9} = {B5A80356-5AF6-449F-9D8B-3C1BBB9D2443} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution diff --git a/src/JT808.DotNetty/Codecs/JT808UDPDecoder.cs b/src/JT808.DotNetty/Codecs/JT808UDPDecoder.cs new file mode 100644 index 0000000..013f11d --- /dev/null +++ b/src/JT808.DotNetty/Codecs/JT808UDPDecoder.cs @@ -0,0 +1,31 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.Protocol; +using JT808.DotNetty.Internal; +using JT808.DotNetty.Interfaces; +using DotNetty.Transport.Channels.Sockets; +using JT808.DotNetty.Metadata; + +namespace JT808.DotNetty.Codecs +{ + /// + /// JT808 UDP解码 + /// + internal class JT808UDPDecoder : MessageToMessageDecoder + { + + + protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List output) + { + IByteBuffer byteBuffer = message.Content; + byte[] buffer = new byte[byteBuffer.ReadableBytes]; + byteBuffer.ReadBytes(buffer); + output.Add(new JT808UDPPackage(buffer, message.Sender)); + } + } +} diff --git a/src/JT808.DotNetty/Configurations/JT808Configuration.cs b/src/JT808.DotNetty/Configurations/JT808Configuration.cs index bbb324b..3565e4b 100644 --- a/src/JT808.DotNetty/Configurations/JT808Configuration.cs +++ b/src/JT808.DotNetty/Configurations/JT808Configuration.cs @@ -8,6 +8,8 @@ namespace JT808.DotNetty.Configurations { public int Port { get; set; } = 808; + public int UDPPort { get; set; } = 809; + public int QuietPeriodSeconds { get; set; } = 1; public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); diff --git a/src/JT808.DotNetty/Handlers/JT808UDPServerHandler.cs b/src/JT808.DotNetty/Handlers/JT808UDPServerHandler.cs new file mode 100644 index 0000000..8cfcd38 --- /dev/null +++ b/src/JT808.DotNetty/Handlers/JT808UDPServerHandler.cs @@ -0,0 +1,88 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using JT808.Protocol; +using System; +using System.Collections.Generic; +using System.Text; +using JT808.DotNetty.Metadata; +using JT808.DotNetty.Internal; +using JT808.DotNetty.Interfaces; +using Microsoft.Extensions.Logging; +using DotNetty.Transport.Channels.Sockets; + +namespace JT808.DotNetty.Handlers +{ + /// + /// JT808 UDP服务端处理程序 + /// + internal class JT808UDPServerHandler : SimpleChannelInboundHandler + { + private readonly JT808MsgIdHandlerBase handler; + + private readonly JT808SessionManager jT808SessionManager; + + private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; + + private readonly JT808AtomicCounterService jT808AtomicCounterService; + + private readonly ILogger logger; + + public JT808UDPServerHandler( + ILoggerFactory loggerFactory, + IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, + JT808MsgIdHandlerBase handler, + JT808AtomicCounterService jT808AtomicCounterService, + JT808SessionManager jT808SessionManager) + { + this.handler = handler; + this.jT808SessionManager = jT808SessionManager; + this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; + this.jT808AtomicCounterService = jT808AtomicCounterService; + logger = loggerFactory.CreateLogger(); + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UDPPackage msg) + { + try + { + jT808SourcePackageDispatcher?.SendAsync(msg.Buffer); + //解析到头部,然后根据具体的消息Id通过队列去进行消费 + //要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, + //解析具体的消息体,具体调用JT808Serializer.Deserialize + JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize(msg.Buffer); + jT808AtomicCounterService.MsgSuccessIncrement(); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); + } + Func handlerFunc; + if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) + { + JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg.Buffer)); + if (jT808Response != null) + { + ctx.WriteAndFlushAsync(new DatagramPacket(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize)), msg.Sender)); + } + } + } + catch (JT808.Protocol.Exceptions.JT808Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); + } + } + catch (Exception ex) + { + jT808AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); + } + } + } + } +} diff --git a/src/JT808.DotNetty/Interfaces/IJT808Publishing.cs b/src/JT808.DotNetty/Interfaces/IJT808Publishing.cs deleted file mode 100644 index 26d88b5..0000000 --- a/src/JT808.DotNetty/Interfaces/IJT808Publishing.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace JT808.DotNetty.Interfaces -{ - public interface IJT808Publishing - { - Task PublishAsync(string topicName,string key,string value); - } -} diff --git a/src/JT808.DotNetty/Interfaces/IJT808SessionPublishing.cs b/src/JT808.DotNetty/Interfaces/IJT808SessionPublishing.cs index 94a968e..62cde9f 100644 --- a/src/JT808.DotNetty/Interfaces/IJT808SessionPublishing.cs +++ b/src/JT808.DotNetty/Interfaces/IJT808SessionPublishing.cs @@ -5,8 +5,8 @@ using System.Threading.Tasks; namespace JT808.DotNetty.Interfaces { - public interface IJT808SessionPublishing : IJT808Publishing + public interface IJT808SessionPublishing { - + Task PublishAsync(string topicName, string key, string value); } } diff --git a/src/JT808.DotNetty/Internal/JT808TransmitAddressFilterService.cs b/src/JT808.DotNetty/Internal/JT808TransmitAddressFilterService.cs index 8390ed2..3395403 100644 --- a/src/JT808.DotNetty/Internal/JT808TransmitAddressFilterService.cs +++ b/src/JT808.DotNetty/Internal/JT808TransmitAddressFilterService.cs @@ -6,7 +6,6 @@ using System.Collections.Concurrent; using System.Collections.Generic; using System.Linq; using System.Net; -using System.Text; namespace JT808.DotNetty.Internal { diff --git a/src/JT808.DotNetty/JT808DotnettyExtensions.cs b/src/JT808.DotNetty/JT808DotnettyExtensions.cs index 82fa6e7..a62a177 100644 --- a/src/JT808.DotNetty/JT808DotnettyExtensions.cs +++ b/src/JT808.DotNetty/JT808DotnettyExtensions.cs @@ -45,6 +45,7 @@ namespace JT808.DotNetty services.TryAddScoped(); services.TryAddScoped(); services.TryAddScoped(); + services.TryAddScoped(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/JT808.DotNetty/JT808UDPServerHost.cs b/src/JT808.DotNetty/JT808UDPServerHost.cs new file mode 100644 index 0000000..95e5a58 --- /dev/null +++ b/src/JT808.DotNetty/JT808UDPServerHost.cs @@ -0,0 +1,82 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using DotNetty.Transport.Libuv; +using JT808.DotNetty.Codecs; +using JT808.DotNetty.Configurations; +using JT808.DotNetty.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Net; +using System.Runtime.InteropServices; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty +{ + /// + /// JT808 Udp网关服务 + /// + internal class JT808UdpServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT808Configuration configuration; + private readonly ILogger logger; + private MultithreadEventLoopGroup group; + private IChannel bootstrapChannel; + + public JT808UdpServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + group = new MultithreadEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.Group(group); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast(new JT808UDPDecoder()); + pipeline.AddLast("jt808UDPService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"Udp Server start at {IPAddress.Any}:{configuration.UDPPort}."); + return bootstrap.BindAsync(configuration.UDPPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT808.DotNetty/Metadata/JT808UDPPackage.cs b/src/JT808.DotNetty/Metadata/JT808UDPPackage.cs new file mode 100644 index 0000000..530a57c --- /dev/null +++ b/src/JT808.DotNetty/Metadata/JT808UDPPackage.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT808.DotNetty.Metadata +{ + internal class JT808UDPPackage + { + public JT808UDPPackage(byte[] buffer, EndPoint sender) + { + Buffer = buffer; + Sender = sender; + } + + public byte[] Buffer { get; } + + public EndPoint Sender { get; } + } +}