@@ -0,0 +1,12 @@ | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
/// <summary> | |||
/// 包计数器服务 | |||
/// </summary> | |||
public class JT808AtomicCounterDto | |||
{ | |||
public long MsgSuccessCount { get; set; } | |||
public long MsgFailCount { get; set; } | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
public class JT808DefaultResultDto: JT808ResultDto<string> | |||
{ | |||
public JT808DefaultResultDto() | |||
{ | |||
Data = "Hello,JT808 WebAPI"; | |||
Code = JT808ResultCode.Ok; | |||
} | |||
} | |||
} |
@@ -0,0 +1,35 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
public class JT808IPAddressDto | |||
{ | |||
public string Host { get; set; } | |||
public int Port { get; set; } | |||
private EndPoint endPoint; | |||
public EndPoint EndPoint | |||
{ | |||
get | |||
{ | |||
if (endPoint == null) | |||
{ | |||
if (IPAddress.TryParse(Host, out IPAddress ip)) | |||
{ | |||
endPoint = new IPEndPoint(ip, Port); | |||
} | |||
else | |||
{ | |||
endPoint = new DnsEndPoint(Host, Port); | |||
} | |||
} | |||
return endPoint; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,24 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
public class JT808ResultDto<T> | |||
{ | |||
public string Message { get; set; } | |||
public int Code { get; set; } | |||
public T Data { get; set; } | |||
} | |||
public class JT808ResultCode | |||
{ | |||
public const int Ok = 200; | |||
public const int Empty = 201; | |||
public const int NotFound = 404; | |||
public const int Fail = 400; | |||
public const int Error = 500; | |||
} | |||
} |
@@ -0,0 +1,33 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
/// <summary> | |||
/// 原包通道信息 | |||
/// </summary> | |||
public class JT808SourcePackageChannelInfoDto | |||
{ | |||
/// <summary> | |||
/// 远程地址 | |||
/// </summary> | |||
public string RemoteAddress { get; set; } | |||
/// <summary> | |||
/// 本地地址 | |||
/// </summary> | |||
public string LocalAddress { get; set; } | |||
/// <summary> | |||
/// 是否注册 | |||
/// </summary> | |||
public bool Registered { get; set; } | |||
/// <summary> | |||
/// 是否活动 | |||
/// </summary> | |||
public bool Active { get; set; } | |||
/// <summary> | |||
/// 是否打开 | |||
/// </summary> | |||
public bool Open { get; set; } | |||
} | |||
} |
@@ -0,0 +1,24 @@ | |||
using System; | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
public class JT808TcpSessionInfoDto | |||
{ | |||
/// <summary> | |||
/// 最后上线时间 | |||
/// </summary> | |||
public DateTime LastActiveTime { get; set; } | |||
/// <summary> | |||
/// 上线时间 | |||
/// </summary> | |||
public DateTime StartTime { get; set; } | |||
/// <summary> | |||
/// 终端手机号 | |||
/// </summary> | |||
public string TerminalPhoneNo { get; set; } | |||
/// <summary> | |||
/// 远程ip地址 | |||
/// </summary> | |||
public string RemoteAddressIP { get; set; } | |||
} | |||
} |
@@ -0,0 +1,11 @@ | |||
namespace JT808.DotNetty.Abstractions.Dtos | |||
{ | |||
/// <summary> | |||
/// 统一下发请求参数 | |||
/// </summary> | |||
public class JT808UnificationSendRequestDto | |||
{ | |||
public string TerminalPhoneNo { get; set; } | |||
public byte[] Data { get; set; } | |||
} | |||
} |
@@ -0,0 +1,12 @@ | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Abstractions | |||
{ | |||
/// <summary> | |||
/// 会话通知(在线/离线) | |||
/// </summary> | |||
public interface IJT808SessionPublishing | |||
{ | |||
Task PublishAsync(string topicName, string key, string value); | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Abstractions | |||
{ | |||
/// <summary> | |||
/// 源包分发器 | |||
/// 自定义源包分发器业务 | |||
/// ConfigureServices: | |||
/// services.Replace(new ServiceDescriptor(typeof(IJT808SourcePackageDispatcher),typeof(JT808SourcePackageDispatcherDefaultImpl),ServiceLifetime.Singleton)); | |||
/// </summary> | |||
public interface IJT808SourcePackageDispatcher | |||
{ | |||
Task SendAsync(byte[] data); | |||
} | |||
} |
@@ -0,0 +1,8 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<LangVersion>7.1</LangVersion> | |||
</PropertyGroup> | |||
</Project> |
@@ -0,0 +1,9 @@ | |||
namespace JT808.DotNetty.Abstractions | |||
{ | |||
public static class JT808Constants | |||
{ | |||
public const string SessionOnline= "JT808SessionOnline"; | |||
public const string SessionOffline = "JT808SessionOffline"; | |||
} | |||
} |
@@ -0,0 +1,14 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<LangVersion>7.1</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Core\JT808.DotNetty.Core.csproj" /> | |||
<ProjectReference Include="..\JT808.Protocol\src\JT808.Protocol\JT808.Protocol.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,20 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using System.Collections.Generic; | |||
using JT808.Protocol; | |||
using DotNetty.Transport.Channels; | |||
namespace JT808.DotNetty.Codecs | |||
{ | |||
public class JT808TcpDecoder : ByteToMessageDecoder | |||
{ | |||
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output) | |||
{ | |||
byte[] buffer = new byte[input.Capacity + 2]; | |||
input.ReadBytes(buffer, 1, input.Capacity); | |||
buffer[0] = JT808Package.BeginFlag; | |||
buffer[input.Capacity + 1] = JT808Package.EndFlag; | |||
output.Add(buffer); | |||
} | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Transport.Channels; | |||
using System.Collections.Generic; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Codecs | |||
{ | |||
public class JT808UdpDecoder : MessageToMessageDecoder<DatagramPacket> | |||
{ | |||
protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List<object> output) | |||
{ | |||
IByteBuffer byteBuffer = message.Content; | |||
byte[] buffer = new byte[byteBuffer.ReadableBytes]; | |||
byteBuffer.ReadBytes(buffer); | |||
output.Add(new JT808UdpPackage(buffer, message.Sender)); | |||
} | |||
} | |||
} |
@@ -0,0 +1,7 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
</Project> |
@@ -0,0 +1,32 @@ | |||
using System.Net; | |||
namespace JT808.DotNetty.Configurations | |||
{ | |||
public class JT808ClientConfiguration | |||
{ | |||
public string Host { get; set; } | |||
public int Port { get; set; } | |||
private EndPoint endPoint; | |||
public EndPoint EndPoint | |||
{ | |||
get | |||
{ | |||
if (endPoint == null) | |||
{ | |||
if (IPAddress.TryParse(Host, out IPAddress ip)) | |||
{ | |||
endPoint = new IPEndPoint(ip, Port); | |||
} | |||
else | |||
{ | |||
endPoint = new DnsEndPoint(Host, Port); | |||
} | |||
} | |||
return endPoint; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,54 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Configurations | |||
{ | |||
public class JT808Configuration | |||
{ | |||
public int Port { get; set; } = 808; | |||
public int UDPPort { get; set; } = 809; | |||
public int QuietPeriodSeconds { get; set; } = 1; | |||
public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | |||
public int ShutdownTimeoutSeconds { get; set; } = 3; | |||
public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); | |||
public int SoBacklog { get; set; } = 8192; | |||
public int EventLoopCount { get; set; } = Environment.ProcessorCount; | |||
public int ReaderIdleTimeSeconds { get; set; } = 3600; | |||
public int WriterIdleTimeSeconds { get; set; } = 3600; | |||
public int AllIdleTimeSeconds { get; set; } = 3600; | |||
/// <summary> | |||
/// WebApi服务 | |||
/// 默认828端口 | |||
/// </summary> | |||
public int WebApiPort { get; set; } = 828; | |||
/// <summary> | |||
/// 源包分发器配置 | |||
/// </summary> | |||
public List<JT808ClientConfiguration> SourcePackageDispatcherClientConfigurations { get; set; } | |||
/// <summary> | |||
/// 转发远程地址 (可选项)知道转发的地址有利于提升性能 | |||
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: | |||
// 1.消息的序列化 | |||
// 2.消息的下发 | |||
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle | |||
// 就跟神兽貔貅一样。。。 | |||
/// </summary> | |||
public List<JT808ClientConfiguration> ForwardingRemoteAddress { get; set; } | |||
public string RedisHost { get; set; } | |||
} | |||
} |
@@ -0,0 +1,13 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Configurations | |||
{ | |||
public static class JT808Constants | |||
{ | |||
public const string SessionOnline= "JT808SessionOnline"; | |||
public const string SessionOffline = "JT808SessionOffline"; | |||
} | |||
} |
@@ -0,0 +1,35 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT808.DotNetty.Core.Configurations | |||
{ | |||
public class JT808ClientConfiguration | |||
{ | |||
public string Host { get; set; } | |||
public int Port { get; set; } | |||
private EndPoint endPoint; | |||
public EndPoint EndPoint | |||
{ | |||
get | |||
{ | |||
if (endPoint == null) | |||
{ | |||
if (IPAddress.TryParse(Host, out IPAddress ip)) | |||
{ | |||
endPoint = new IPEndPoint(ip, Port); | |||
} | |||
else | |||
{ | |||
endPoint = new DnsEndPoint(Host, Port); | |||
} | |||
} | |||
return endPoint; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,52 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Core.Configurations | |||
{ | |||
public class JT808Configuration | |||
{ | |||
public int TcpPort { get; set; } = 808; | |||
public int UdpPort { get; set; } = 818; | |||
public int QuietPeriodSeconds { get; set; } = 1; | |||
public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | |||
public int ShutdownTimeoutSeconds { get; set; } = 3; | |||
public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); | |||
public int SoBacklog { get; set; } = 8192; | |||
public int EventLoopCount { get; set; } = Environment.ProcessorCount; | |||
public int ReaderIdleTimeSeconds { get; set; } = 3600; | |||
public int WriterIdleTimeSeconds { get; set; } = 3600; | |||
public int AllIdleTimeSeconds { get; set; } = 3600; | |||
/// <summary> | |||
/// WebApi服务 | |||
/// 默认828端口 | |||
/// </summary> | |||
public int WebApiPort { get; set; } = 828; | |||
/// <summary> | |||
/// 源包分发器配置 | |||
/// </summary> | |||
public List<JT808ClientConfiguration> SourcePackageDispatcherClientConfigurations { get; set; } | |||
/// <summary> | |||
/// 转发远程地址 (可选项)知道转发的地址有利于提升性能 | |||
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: | |||
// 1.消息的序列化 | |||
// 2.消息的下发 | |||
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle | |||
// 就跟神兽貔貅一样。。。 | |||
/// </summary> | |||
public List<JT808ClientConfiguration> ForwardingRemoteAddress { get; set; } | |||
} | |||
} |
@@ -0,0 +1,84 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using JT808.DotNetty.Core.Metadata; | |||
using Newtonsoft.Json; | |||
namespace JT808.DotNetty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 基于webapi http模式抽象消息处理业务 | |||
/// 自定义消息处理业务 | |||
/// 注意: | |||
/// 1.ConfigureServices: | |||
/// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHttpHandlerBase),typeof(JT808MsgIdCustomHttpHandlerImpl),ServiceLifetime.Singleton)); | |||
/// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize<T> | |||
/// </summary> | |||
public abstract class JT808MsgIdHttpHandlerBase | |||
{ | |||
private const string RouteTablePrefix = "/jt808api"; | |||
/// <summary> | |||
/// 初始化消息处理业务 | |||
/// </summary> | |||
protected JT808MsgIdHttpHandlerBase() | |||
{ | |||
HandlerDict = new Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>>(); | |||
} | |||
protected void CreateRoute(string url, Func<JT808HttpRequest, JT808HttpResponse> func) | |||
{ | |||
HandlerDict.Add($"{RouteTablePrefix}/{url}", func); | |||
} | |||
public Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>> HandlerDict { get; } | |||
protected JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject) | |||
{ | |||
byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject)); | |||
return new JT808HttpResponse() | |||
{ | |||
Data = data | |||
}; | |||
} | |||
public JT808HttpResponse DefaultHttpResponse() | |||
{ | |||
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808DefaultResultDto())); | |||
return new JT808HttpResponse(json); | |||
} | |||
public JT808HttpResponse EmptyHttpResponse() | |||
{ | |||
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>() | |||
{ | |||
Code = JT808ResultCode.Empty, | |||
Message = "内容为空", | |||
Data = "Content Empty" | |||
})); | |||
return new JT808HttpResponse(json); | |||
} | |||
public JT808HttpResponse NotFoundHttpResponse() | |||
{ | |||
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>() | |||
{ | |||
Code = JT808ResultCode.NotFound, | |||
Message = "没有该服务", | |||
Data = "没有该服务" | |||
})); | |||
return new JT808HttpResponse(json); | |||
} | |||
public JT808HttpResponse ErrorHttpResponse(Exception ex) | |||
{ | |||
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>() | |||
{ | |||
Code = JT808ResultCode.Error, | |||
Message = JsonConvert.SerializeObject(ex), | |||
Data = ex.Message | |||
})); | |||
return new JT808HttpResponse(json); | |||
} | |||
} | |||
} |
@@ -0,0 +1,160 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using JT808.DotNetty.Core.Metadata; | |||
using JT808.Protocol.Enums; | |||
using JT808.Protocol.Extensions; | |||
using JT808.Protocol.MessageBody; | |||
namespace JT808.DotNetty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 基于Tcp模式抽象消息处理业务 | |||
/// 自定义消息处理业务 | |||
/// 注意: | |||
/// 1.ConfigureServices: | |||
/// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdTcpHandlerBase),typeof(JT808MsgIdCustomTcpHandlerImpl),ServiceLifetime.Singleton)); | |||
/// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize<T> | |||
/// </summary> | |||
public abstract class JT808MsgIdTcpHandlerBase | |||
{ | |||
protected JT808TcpSessionManager sessionManager { get; } | |||
/// <summary> | |||
/// 初始化消息处理业务 | |||
/// </summary> | |||
protected JT808MsgIdTcpHandlerBase(JT808TcpSessionManager sessionManager) | |||
{ | |||
this.sessionManager = sessionManager; | |||
HandlerDict = new Dictionary<ushort, Func<JT808Request, JT808Response>> | |||
{ | |||
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, | |||
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, | |||
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, | |||
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, | |||
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, | |||
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, | |||
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, | |||
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } | |||
}; | |||
} | |||
public Dictionary<ushort, Func<JT808Request, JT808Response>> HandlerDict { get; protected set; } | |||
/// <summary> | |||
/// 终端通用应答 | |||
/// 平台无需回复 | |||
/// 实现自己的业务 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0001(JT808Request request) | |||
{ | |||
return null; | |||
} | |||
/// <summary> | |||
/// 终端心跳 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0002(JT808Request request) | |||
{ | |||
sessionManager.Heartbeat(request.Package.Header.TerminalPhoneNo); | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端注销 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0003(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端注册 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0100(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.终端注册应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8100() | |||
{ | |||
Code = "J" + request.Package.Header.TerminalPhoneNo, | |||
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端鉴权 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0102(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 位置信息汇报 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0200(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 定位数据批量上传 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0704(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId =request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 数据上行透传 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0900(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId =request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
} | |||
} |
@@ -0,0 +1,160 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using JT808.DotNetty.Core.Metadata; | |||
using JT808.Protocol.Enums; | |||
using JT808.Protocol.Extensions; | |||
using JT808.Protocol.MessageBody; | |||
namespace JT808.DotNetty.Core.Handlers | |||
{ | |||
/// <summary> | |||
/// 基于Udp模式的抽象消息处理业务 | |||
/// 自定义消息处理业务 | |||
/// 注意: | |||
/// 1.ConfigureServices: | |||
/// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdUdpHandlerBase),typeof(JT808MsgIdCustomUdpHandlerImpl),ServiceLifetime.Singleton)); | |||
/// 2.解析具体的消息体,具体消息调用具体的JT808Serializer.Deserialize<T> | |||
/// </summary> | |||
public abstract class JT808MsgIdUdpHandlerBase | |||
{ | |||
protected JT808UdpSessionManager sessionManager { get; } | |||
/// <summary> | |||
/// 初始化消息处理业务 | |||
/// </summary> | |||
protected JT808MsgIdUdpHandlerBase(JT808UdpSessionManager sessionManager) | |||
{ | |||
this.sessionManager = sessionManager; | |||
HandlerDict = new Dictionary<ushort, Func<JT808Request, JT808Response>> | |||
{ | |||
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, | |||
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, | |||
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, | |||
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, | |||
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, | |||
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, | |||
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, | |||
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } | |||
}; | |||
} | |||
public Dictionary<ushort, Func<JT808Request, JT808Response>> HandlerDict { get; protected set; } | |||
/// <summary> | |||
/// 终端通用应答 | |||
/// 平台无需回复 | |||
/// 实现自己的业务 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0001(JT808Request request) | |||
{ | |||
return null; | |||
} | |||
/// <summary> | |||
/// 终端心跳 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0002(JT808Request request) | |||
{ | |||
sessionManager.Heartbeat(request.Package.Header.TerminalPhoneNo); | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端注销 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0003(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端注册 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0100(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.终端注册应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8100() | |||
{ | |||
Code = "J" + request.Package.Header.TerminalPhoneNo, | |||
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 终端鉴权 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0102(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 位置信息汇报 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0200(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId = request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 定位数据批量上传 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0704(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId =request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
/// <summary> | |||
/// 数据上行透传 | |||
/// </summary> | |||
/// <param name="reqJT808Package"></param> | |||
/// <param name="ctx"></param> | |||
/// <returns></returns> | |||
public virtual JT808Response Msg0x0900(JT808Request request) | |||
{ | |||
return new JT808Response(JT808MsgId.平台通用应答.Create(request.Package.Header.TerminalPhoneNo, new JT808_0x8001() | |||
{ | |||
MsgId =request.Package.Header.MsgId, | |||
JT808PlatformResult = JT808PlatformResult.成功, | |||
MsgNum = request.Package.Header.MsgNum | |||
})); | |||
} | |||
} | |||
} |
@@ -0,0 +1,13 @@ | |||
using JT808.DotNetty.Abstractions; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Core | |||
{ | |||
internal class JT808SessionPublishingEmptyImpl : IJT808SessionPublishing | |||
{ | |||
public Task PublishAsync(string topicName, string key, string value) | |||
{ | |||
return Task.CompletedTask; | |||
} | |||
} | |||
} |
@@ -0,0 +1,16 @@ | |||
using JT808.DotNetty.Abstractions; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Core.Impls | |||
{ | |||
/// <summary> | |||
/// 原包分发器默认空实现 | |||
/// </summary> | |||
public class JT808SourcePackageDispatcherEmptyImpl : IJT808SourcePackageDispatcher | |||
{ | |||
public Task SendAsync(byte[] data) | |||
{ | |||
return Task.CompletedTask; | |||
} | |||
} | |||
} |
@@ -0,0 +1,25 @@ | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Core.Interfaces | |||
{ | |||
/// <summary> | |||
/// JT808 Tcp会话服务 | |||
/// </summary> | |||
public interface IJT808TcpSessionService | |||
{ | |||
/// <summary> | |||
/// 获取会话集合 | |||
/// </summary> | |||
/// <returns></returns> | |||
JT808ResultDto<List<JT808TcpSessionInfoDto>> GetAll(); | |||
/// <summary> | |||
/// 通过设备终端号移除对应会话 | |||
/// </summary> | |||
/// <param name="terminalPhoneNo"></param> | |||
/// <returns></returns> | |||
JT808ResultDto<bool> RemoveByTerminalPhoneNo(string terminalPhoneNo); | |||
} | |||
} |
@@ -0,0 +1,12 @@ | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
namespace JT808.DotNetty.Core.Interfaces | |||
{ | |||
/// <summary> | |||
/// JT808基于tcp的统一下发命令服务 | |||
/// </summary> | |||
internal interface IJT808UnificationTcpSendService | |||
{ | |||
JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data); | |||
} | |||
} |
@@ -0,0 +1,12 @@ | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
namespace JT808.DotNetty.Core.Interfaces | |||
{ | |||
/// <summary> | |||
/// JT808基于udp的统一下发命令服务 | |||
/// </summary> | |||
internal interface IJT808UnificationUdpSendService | |||
{ | |||
JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data); | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<LangVersion>7.1</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" /> | |||
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj" /> | |||
<ProjectReference Include="..\JT808.Protocol\src\JT808.Protocol\JT808.Protocol.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,27 @@ | |||
using JT808.DotNetty.Abstractions; | |||
using JT808.DotNetty.Core.Configurations; | |||
using JT808.DotNetty.Core.Impls; | |||
using JT808.DotNetty.Core.Interfaces; | |||
using JT808.DotNetty.Internal; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using System.Runtime.CompilerServices; | |||
[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] | |||
namespace JT808.DotNetty.Core | |||
{ | |||
public static class JT808CoreDotnettyExtensions | |||
{ | |||
public static IServiceCollection AddJT808Core(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||
{ | |||
serviceDescriptors.Configure<JT808Configuration>(configuration.GetSection("JT808Configuration")); | |||
serviceDescriptors.TryAddSingleton<IJT808SessionPublishing, JT808SessionPublishingEmptyImpl>(); | |||
serviceDescriptors.TryAddSingleton<IJT808SourcePackageDispatcher, JT808SourcePackageDispatcherEmptyImpl>(); | |||
serviceDescriptors.TryAddSingleton<IJT808UnificationTcpSendService, JT808UnificationTcpSendService>(); | |||
serviceDescriptors.TryAddSingleton<IJT808UnificationUdpSendService, JT808UnificationUdpSendService>(); | |||
return serviceDescriptors; | |||
} | |||
} | |||
} |
@@ -0,0 +1,44 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
/// <summary> | |||
/// | |||
/// <see cref="Grpc.Core.Internal"/> | |||
/// </summary> | |||
public class JT808AtomicCounter | |||
{ | |||
long counter = 0; | |||
public JT808AtomicCounter(long initialCount = 0) | |||
{ | |||
this.counter = initialCount; | |||
} | |||
public long Increment() | |||
{ | |||
return Interlocked.Increment(ref counter); | |||
} | |||
public long Add(long len) | |||
{ | |||
return Interlocked.Add(ref counter,len); | |||
} | |||
public long Decrement() | |||
{ | |||
return Interlocked.Decrement(ref counter); | |||
} | |||
public long Count | |||
{ | |||
get | |||
{ | |||
return Interlocked.Read(ref counter); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,22 @@ | |||
using JT808.Protocol; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808HttpRequest | |||
{ | |||
public string Json { get; set; } | |||
public JT808HttpRequest() | |||
{ | |||
} | |||
public JT808HttpRequest(string json) | |||
{ | |||
Json = json; | |||
} | |||
} | |||
} |
@@ -0,0 +1,22 @@ | |||
using JT808.Protocol; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808HttpResponse | |||
{ | |||
public byte[] Data { get; set; } | |||
public JT808HttpResponse() | |||
{ | |||
} | |||
public JT808HttpResponse(byte[] data) | |||
{ | |||
this.Data = data; | |||
} | |||
} | |||
} |
@@ -0,0 +1,23 @@ | |||
using JT808.Protocol; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808Request | |||
{ | |||
public JT808HeaderPackage Package { get; } | |||
/// <summary> | |||
/// 用于消息发送 | |||
/// </summary> | |||
public byte[] OriginalPackage { get;} | |||
public JT808Request(JT808HeaderPackage package, byte[] originalPackage) | |||
{ | |||
Package = package; | |||
OriginalPackage = originalPackage; | |||
} | |||
} | |||
} |
@@ -0,0 +1,27 @@ | |||
using JT808.Protocol; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Reflection; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808Response | |||
{ | |||
public JT808Package Package { get; set; } | |||
/// <summary> | |||
/// 根据实际情况适当调整包的大小 | |||
/// </summary> | |||
public int MinBufferSize { get; set; } | |||
public JT808Response() | |||
{ | |||
} | |||
public JT808Response(JT808Package package, int minBufferSize = 1024) | |||
{ | |||
Package = package; | |||
MinBufferSize = minBufferSize; | |||
} | |||
} | |||
} |
@@ -0,0 +1,29 @@ | |||
using DotNetty.Transport.Channels; | |||
using System; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808TcpSession | |||
{ | |||
public JT808TcpSession(IChannel channel, string terminalPhoneNo) | |||
{ | |||
Channel = channel; | |||
TerminalPhoneNo = terminalPhoneNo; | |||
StartTime = DateTime.Now; | |||
LastActiveTime = DateTime.Now; | |||
} | |||
public JT808TcpSession() { } | |||
/// <summary> | |||
/// 终端手机号 | |||
/// </summary> | |||
public string TerminalPhoneNo { get; set; } | |||
public IChannel Channel { get; set; } | |||
public DateTime LastActiveTime { get; set; } | |||
public DateTime StartTime { get; set; } | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808UdpPackage | |||
{ | |||
public JT808UdpPackage(byte[] buffer, EndPoint sender) | |||
{ | |||
Buffer = buffer; | |||
Sender = sender; | |||
} | |||
public byte[] Buffer { get; } | |||
public EndPoint Sender { get; } | |||
} | |||
} |
@@ -0,0 +1,35 @@ | |||
using DotNetty.Transport.Channels; | |||
using System; | |||
using System.Net; | |||
namespace JT808.DotNetty.Core.Metadata | |||
{ | |||
public class JT808UdpSession | |||
{ | |||
public JT808UdpSession(IChannel channel, | |||
EndPoint sender, | |||
string terminalPhoneNo) | |||
{ | |||
Channel = channel; | |||
TerminalPhoneNo = terminalPhoneNo; | |||
StartTime = DateTime.Now; | |||
LastActiveTime = DateTime.Now; | |||
Sender = sender; | |||
} | |||
public EndPoint Sender { get; set; } | |||
public JT808UdpSession() { } | |||
/// <summary> | |||
/// 终端手机号 | |||
/// </summary> | |||
public string TerminalPhoneNo { get; set; } | |||
public IChannel Channel { get; set; } | |||
public DateTime LastActiveTime { get; set; } | |||
public DateTime StartTime { get; set; } | |||
} | |||
} |
@@ -0,0 +1,45 @@ | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Core.Services | |||
{ | |||
/// <summary> | |||
/// Tcp计数包服务 | |||
/// </summary> | |||
public class JT808TcpAtomicCounterService | |||
{ | |||
private readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); | |||
private readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); | |||
public JT808TcpAtomicCounterService() | |||
{ | |||
} | |||
public long MsgSuccessIncrement() | |||
{ | |||
return MsgSuccessCounter.Increment(); | |||
} | |||
public long MsgSuccessCount | |||
{ | |||
get | |||
{ | |||
return MsgSuccessCounter.Count; | |||
} | |||
} | |||
public long MsgFailIncrement() | |||
{ | |||
return MsgFailCounter.Increment(); | |||
} | |||
public long MsgFailCount | |||
{ | |||
get | |||
{ | |||
return MsgFailCounter.Count; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,73 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using JT808.DotNetty.Core.Interfaces; | |||
namespace JT808.DotNetty.Core.Services | |||
{ | |||
internal class JT808TcpSessionService : IJT808TcpSessionService | |||
{ | |||
private readonly JT808TcpSessionManager jT808SessionManager; | |||
public JT808TcpSessionService( | |||
JT808TcpSessionManager jT808SessionManager) | |||
{ | |||
this.jT808SessionManager = jT808SessionManager; | |||
} | |||
public JT808ResultDto<List<JT808TcpSessionInfoDto>> GetAll() | |||
{ | |||
JT808ResultDto<List<JT808TcpSessionInfoDto>> resultDto = new JT808ResultDto<List<JT808TcpSessionInfoDto>>(); | |||
try | |||
{ | |||
resultDto.Data = jT808SessionManager.GetAll().Select(s => new JT808TcpSessionInfoDto | |||
{ | |||
LastActiveTime = s.LastActiveTime, | |||
StartTime = s.StartTime, | |||
TerminalPhoneNo = s.TerminalPhoneNo, | |||
RemoteAddressIP = s.Channel.RemoteAddress.ToString(), | |||
}).ToList(); | |||
resultDto.Code = JT808ResultCode.Ok; | |||
} | |||
catch (Exception ex) | |||
{ | |||
resultDto.Data = null; | |||
resultDto.Code = JT808ResultCode.Error; | |||
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); | |||
} | |||
return resultDto; | |||
} | |||
public JT808ResultDto<bool> RemoveByTerminalPhoneNo(string terminalPhoneNo) | |||
{ | |||
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>(); | |||
try | |||
{ | |||
var session = jT808SessionManager.RemoveSession(terminalPhoneNo); | |||
if (session != null) | |||
{ | |||
if(session.Channel.Open) | |||
{ | |||
session.Channel.CloseAsync(); | |||
} | |||
} | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = true; | |||
} | |||
catch (AggregateException ex) | |||
{ | |||
resultDto.Data = false; | |||
resultDto.Code = 500; | |||
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); | |||
} | |||
catch (Exception ex) | |||
{ | |||
resultDto.Data = false; | |||
resultDto.Code = JT808ResultCode.Error; | |||
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); | |||
} | |||
return resultDto; | |||
} | |||
} | |||
} |
@@ -0,0 +1,88 @@ | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Net; | |||
using JT808.DotNetty.Core.Configurations; | |||
namespace JT808.DotNetty.Core.Services | |||
{ | |||
/// <summary> | |||
/// JT808转发地址过滤服务 | |||
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: | |||
// 1.消息的序列化 | |||
// 2.消息的下发 | |||
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle | |||
// 就跟神兽貔貅一样。。。 | |||
/// </summary> | |||
public class JT808TransmitAddressFilterService : IDisposable | |||
{ | |||
private readonly IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor; | |||
private ConcurrentDictionary<string,int> ForwardingRemoteAddresssDict; | |||
private IDisposable jT808ConfigurationOptionsMonitorDisposable; | |||
public JT808TransmitAddressFilterService( | |||
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor) | |||
{ | |||
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor; | |||
ForwardingRemoteAddresssDict = new ConcurrentDictionary<string, int>(); | |||
InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress); | |||
//OnChange 源码多播委托 | |||
jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options => | |||
{ | |||
InitForwardingRemoteAddress(options.ForwardingRemoteAddress); | |||
}); | |||
} | |||
private void InitForwardingRemoteAddress(List<JT808ClientConfiguration> jT808ClientConfigurations) | |||
{ | |||
if (jT808ClientConfigurations != null && jT808ClientConfigurations.Count > 0) | |||
{ | |||
foreach (var item in jT808ClientConfigurations) | |||
{ | |||
string host = item.EndPoint.ToString(); | |||
ForwardingRemoteAddresssDict.TryAdd(host, 0); | |||
} | |||
} | |||
} | |||
public bool ContainsKey(EndPoint endPoint) | |||
{ | |||
return ForwardingRemoteAddresssDict.ContainsKey(endPoint.ToString()); | |||
} | |||
public JT808ResultDto<bool> Add(JT808IPAddressDto jT808IPAddressDto) | |||
{ | |||
string host = jT808IPAddressDto.EndPoint.ToString(); | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryAdd(host,0) }; | |||
} | |||
public JT808ResultDto<bool> Remove(JT808IPAddressDto jT808IPAddressDto) | |||
{ | |||
string host = jT808IPAddressDto.EndPoint.ToString(); | |||
if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null && | |||
jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== host)) | |||
{ | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" }; | |||
} | |||
else | |||
{ | |||
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.TryRemove(host,out var temp) }; | |||
} | |||
} | |||
public JT808ResultDto<List<string>> GetAll() | |||
{ | |||
return new JT808ResultDto<List<string>>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresssDict.Select(s=>s.Key).ToList() }; | |||
} | |||
public void Dispose() | |||
{ | |||
jT808ConfigurationOptionsMonitorDisposable.Dispose(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,45 @@ | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Core.Services | |||
{ | |||
/// <summary> | |||
/// Udp计数包服务 | |||
/// </summary> | |||
public class JT808UdpAtomicCounterService | |||
{ | |||
private readonly JT808AtomicCounter MsgSuccessCounter = new JT808AtomicCounter(); | |||
private readonly JT808AtomicCounter MsgFailCounter = new JT808AtomicCounter(); | |||
public JT808UdpAtomicCounterService() | |||
{ | |||
} | |||
public long MsgSuccessIncrement() | |||
{ | |||
return MsgSuccessCounter.Increment(); | |||
} | |||
public long MsgSuccessCount | |||
{ | |||
get | |||
{ | |||
return MsgSuccessCounter.Count; | |||
} | |||
} | |||
public long MsgFailIncrement() | |||
{ | |||
return MsgFailCounter.Increment(); | |||
} | |||
public long MsgFailCount | |||
{ | |||
get | |||
{ | |||
return MsgFailCounter.Count; | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,55 @@ | |||
using DotNetty.Buffers; | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Interfaces; | |||
using System; | |||
namespace JT808.DotNetty.Internal | |||
{ | |||
internal class JT808UnificationTcpSendService : IJT808UnificationTcpSendService | |||
{ | |||
private readonly JT808TcpSessionManager jT808SessionManager; | |||
public JT808UnificationTcpSendService(JT808TcpSessionManager jT808SessionManager) | |||
{ | |||
this.jT808SessionManager = jT808SessionManager; | |||
} | |||
public JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data) | |||
{ | |||
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>(); | |||
try | |||
{ | |||
var session = jT808SessionManager.GetSession(terminalPhoneNo); | |||
if (session != null) | |||
{ | |||
if (session.Channel.Open) | |||
{ | |||
session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = true; | |||
} | |||
else | |||
{ | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = false; | |||
resultDto.Message = "offline"; | |||
} | |||
} | |||
else | |||
{ | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = false; | |||
resultDto.Message = "offline"; | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
resultDto.Data = false; | |||
resultDto.Code = JT808ResultCode.Error; | |||
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); | |||
} | |||
return resultDto; | |||
} | |||
} | |||
} |
@@ -0,0 +1,55 @@ | |||
using DotNetty.Buffers; | |||
using JT808.DotNetty.Abstractions.Dtos; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Interfaces; | |||
using System; | |||
namespace JT808.DotNetty.Internal | |||
{ | |||
internal class JT808UnificationUdpSendService : IJT808UnificationUdpSendService | |||
{ | |||
private readonly JT808UdpSessionManager jT808SessionManager; | |||
public JT808UnificationUdpSendService(JT808UdpSessionManager jT808SessionManager) | |||
{ | |||
this.jT808SessionManager = jT808SessionManager; | |||
} | |||
public JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data) | |||
{ | |||
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>(); | |||
try | |||
{ | |||
var session = jT808SessionManager.GetSession(terminalPhoneNo); | |||
if (session != null) | |||
{ | |||
if (session.Channel.Open) | |||
{ | |||
session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data)); | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = true; | |||
} | |||
else | |||
{ | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = false; | |||
resultDto.Message = "offline"; | |||
} | |||
} | |||
else | |||
{ | |||
resultDto.Code = JT808ResultCode.Ok; | |||
resultDto.Data = false; | |||
resultDto.Message = "offline"; | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
resultDto.Data = false; | |||
resultDto.Code = JT808ResultCode.Error; | |||
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex); | |||
} | |||
return resultDto; | |||
} | |||
} | |||
} |
@@ -0,0 +1,141 @@ | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using DotNetty.Transport.Channels; | |||
using JT808.DotNetty.Abstractions; | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Core | |||
{ | |||
/// <summary> | |||
/// JT808 Tcp会话管理 | |||
/// </summary> | |||
public class JT808TcpSessionManager | |||
{ | |||
private readonly ILogger<JT808TcpSessionManager> logger; | |||
private readonly IJT808SessionPublishing jT808SessionPublishing; | |||
public JT808TcpSessionManager( | |||
IJT808SessionPublishing jT808SessionPublishing, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT808SessionPublishing = jT808SessionPublishing; | |||
logger = loggerFactory.CreateLogger<JT808TcpSessionManager>(); | |||
} | |||
private ConcurrentDictionary<string, JT808TcpSession> SessionIdDict = new ConcurrentDictionary<string, JT808TcpSession>(StringComparer.OrdinalIgnoreCase); | |||
public int SessionCount | |||
{ | |||
get | |||
{ | |||
return SessionIdDict.Count; | |||
} | |||
} | |||
public JT808TcpSession GetSession(string terminalPhoneNo) | |||
{ | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) | |||
return default; | |||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession targetSession)) | |||
{ | |||
return targetSession; | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
public void Heartbeat(string terminalPhoneNo) | |||
{ | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) return; | |||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession oldjT808Session)) | |||
{ | |||
oldjT808Session.LastActiveTime = DateTime.Now; | |||
SessionIdDict.TryUpdate(terminalPhoneNo, oldjT808Session, oldjT808Session); | |||
} | |||
} | |||
public void TryAdd(JT808TcpSession appSession) | |||
{ | |||
// 解决了设备号跟通道绑定到一起,不需要用到通道本身的SessionId | |||
// 不管设备下发更改了设备终端号,只要是没有在内存中就当是新的 | |||
// 存在的问题: | |||
// 1.原先老的如何销毁 | |||
// 2.这时候用的通道是相同的,设备终端是不同的 | |||
// 当设备主动或者服务器断开以后,可以释放,这点内存忽略不计,况且更改设备号不是很频繁。 | |||
if (SessionIdDict.TryAdd(appSession.TerminalPhoneNo, appSession)) | |||
{ | |||
//使用场景: | |||
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, | |||
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 | |||
//todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅 | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo); | |||
} | |||
} | |||
public JT808TcpSession RemoveSession(string terminalPhoneNo) | |||
{ | |||
//todo: 设备离线可以进行通知 | |||
//todo: 使用Redis 发布订阅 | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) return default; | |||
if (!SessionIdDict.TryGetValue(terminalPhoneNo, out JT808TcpSession jT808Session)) | |||
{ | |||
return default; | |||
} | |||
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 | |||
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 | |||
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == jT808Session.Channel.Id).Select(s => s.Key).ToList(); | |||
//2.存在则一个个移除 | |||
if (terminalPhoneNos.Count > 1) | |||
{ | |||
//3.移除包括当前的设备号 | |||
foreach (var key in terminalPhoneNos) | |||
{ | |||
SessionIdDict.TryRemove(key, out JT808TcpSession jT808SessionRemove); | |||
} | |||
string nos = string.Join(",", terminalPhoneNos); | |||
logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); | |||
return jT808Session; | |||
} | |||
else | |||
{ | |||
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808TcpSession jT808SessionRemove)) | |||
{ | |||
logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo); | |||
return jT808SessionRemove; | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
} | |||
public void RemoveSessionByChannel(IChannel channel) | |||
{ | |||
//todo: 设备离线可以进行通知 | |||
//todo: 使用Redis 发布订阅 | |||
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); | |||
foreach (var key in terminalPhoneNos) | |||
{ | |||
SessionIdDict.TryRemove(key, out JT808TcpSession jT808SessionRemove); | |||
} | |||
string nos = string.Join(",", terminalPhoneNos); | |||
logger.LogInformation($">>>{nos} Channel Remove."); | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); | |||
} | |||
public IEnumerable<JT808TcpSession> GetAll() | |||
{ | |||
return SessionIdDict.Select(s => s.Value).ToList(); | |||
} | |||
} | |||
} | |||
@@ -0,0 +1,126 @@ | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using JT808.DotNetty.Abstractions; | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Core | |||
{ | |||
/// <summary> | |||
/// JT808 udp会话管理 | |||
/// </summary> | |||
public class JT808UdpSessionManager | |||
{ | |||
private readonly ILogger<JT808UdpSessionManager> logger; | |||
private readonly IJT808SessionPublishing jT808SessionPublishing; | |||
public JT808UdpSessionManager( | |||
IJT808SessionPublishing jT808SessionPublishing, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT808SessionPublishing = jT808SessionPublishing; | |||
logger = loggerFactory.CreateLogger<JT808UdpSessionManager>(); | |||
} | |||
private ConcurrentDictionary<string, JT808UdpSession> SessionIdDict = new ConcurrentDictionary<string, JT808UdpSession>(StringComparer.OrdinalIgnoreCase); | |||
public int SessionCount | |||
{ | |||
get | |||
{ | |||
return SessionIdDict.Count; | |||
} | |||
} | |||
public JT808UdpSession GetSession(string terminalPhoneNo) | |||
{ | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) | |||
return default; | |||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession targetSession)) | |||
{ | |||
return targetSession; | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
public void TryAdd(JT808UdpSession appSession) | |||
{ | |||
// 解决了设备号跟通道绑定到一起,不需要用到通道本身的SessionId | |||
// 不管设备下发更改了设备终端号,只要是没有在内存中就当是新的 | |||
// 存在的问题: | |||
// 1.原先老的如何销毁 | |||
// 2.这时候用的通道是相同的,设备终端是不同的 | |||
// 当设备主动或者服务器断开以后,可以释放,这点内存忽略不计,况且更改设备号不是很频繁。 | |||
if (SessionIdDict.TryAdd(appSession.TerminalPhoneNo, appSession)) | |||
{ | |||
//使用场景: | |||
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, | |||
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 | |||
//todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅 | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo); | |||
} | |||
} | |||
public void Heartbeat(string terminalPhoneNo) | |||
{ | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) return; | |||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession oldjT808Session)) | |||
{ | |||
oldjT808Session.LastActiveTime = DateTime.Now; | |||
SessionIdDict.TryUpdate(terminalPhoneNo, oldjT808Session, oldjT808Session); | |||
} | |||
} | |||
public JT808UdpSession RemoveSession(string terminalPhoneNo) | |||
{ | |||
//todo: 设备离线可以进行通知 | |||
//todo: 使用Redis 发布订阅 | |||
if (string.IsNullOrEmpty(terminalPhoneNo)) return default; | |||
if (!SessionIdDict.TryGetValue(terminalPhoneNo, out JT808UdpSession jT808Session)) | |||
{ | |||
return default; | |||
} | |||
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 | |||
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 | |||
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == jT808Session.Channel.Id).Select(s => s.Key).ToList(); | |||
//2.存在则一个个移除 | |||
if (terminalPhoneNos.Count > 1) | |||
{ | |||
//3.移除包括当前的设备号 | |||
foreach (var key in terminalPhoneNos) | |||
{ | |||
SessionIdDict.TryRemove(key, out JT808UdpSession jT808SessionRemove); | |||
} | |||
string nos = string.Join(",", terminalPhoneNos); | |||
logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); | |||
return jT808Session; | |||
} | |||
else | |||
{ | |||
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808UdpSession jT808SessionRemove)) | |||
{ | |||
logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); | |||
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo); | |||
return jT808SessionRemove; | |||
} | |||
else | |||
{ | |||
return default; | |||
} | |||
} | |||
} | |||
public IEnumerable<JT808UdpSession> GetAll() | |||
{ | |||
return SessionIdDict.Select(s => s.Value).ToList(); | |||
} | |||
} | |||
} | |||
@@ -0,0 +1,8 @@ | |||
using System; | |||
namespace JT808.DotNetty.Http | |||
{ | |||
public class Class1 | |||
{ | |||
} | |||
} |
@@ -0,0 +1,7 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
</Project> |
@@ -0,0 +1,18 @@ | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Handlers; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Tcp | |||
{ | |||
/// <summary> | |||
/// 默认消息处理业务实现 | |||
/// </summary> | |||
internal class JT808MsgIdDefaultTcpHandler : JT808MsgIdTcpHandlerBase | |||
{ | |||
public JT808MsgIdDefaultTcpHandler(JT808TcpSessionManager sessionManager) : base(sessionManager) | |||
{ | |||
} | |||
} | |||
} |
@@ -0,0 +1,104 @@ | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Channels; | |||
using JT808.DotNetty.Core; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808服务通道处理程序 | |||
/// </summary> | |||
internal class JT808TcpConnectionHandler : ChannelHandlerAdapter | |||
{ | |||
private readonly ILogger<JT808TcpConnectionHandler> logger; | |||
private readonly JT808TcpSessionManager jT808SessionManager; | |||
public JT808TcpConnectionHandler( | |||
JT808TcpSessionManager jT808SessionManager, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT808SessionManager = jT808SessionManager; | |||
logger = loggerFactory.CreateLogger<JT808TcpConnectionHandler>(); | |||
} | |||
/// <summary> | |||
/// 通道激活 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelActive(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($"<<<{ channelId } Successful client connection to server."); | |||
base.ChannelActive(context); | |||
} | |||
/// <summary> | |||
/// 设备主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
public override void ChannelInactive(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($">>>{ channelId } The client disconnects from the server."); | |||
jT808SessionManager.RemoveSessionByChannel(context.Channel); | |||
base.ChannelInactive(context); | |||
} | |||
/// <summary> | |||
/// 服务器主动断开 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <returns></returns> | |||
public override Task CloseAsync(IChannelHandlerContext context) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); | |||
jT808SessionManager.RemoveSessionByChannel(context.Channel); | |||
return base.CloseAsync(context); | |||
} | |||
public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush(); | |||
/// <summary> | |||
/// 超时策略 | |||
/// </summary> | |||
/// <param name="context"></param> | |||
/// <param name="evt"></param> | |||
public override void UserEventTriggered(IChannelHandlerContext context, object evt) | |||
{ | |||
IdleStateEvent idleStateEvent = evt as IdleStateEvent; | |||
if (idleStateEvent != null) | |||
{ | |||
if(idleStateEvent.State== IdleState.ReaderIdle) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); | |||
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 | |||
jT808SessionManager.RemoveSessionByChannel(context.Channel); | |||
context.CloseAsync(); | |||
} | |||
// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括: | |||
// 1.消息的序列化 | |||
// 2.消息的下发 | |||
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle | |||
// 就跟神兽貔貅一样。。。 | |||
} | |||
base.UserEventTriggered(context, evt); | |||
} | |||
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) | |||
{ | |||
string channelId = context.Channel.Id.AsShortText(); | |||
logger.LogError(exception,$"{channelId} {exception.Message}" ); | |||
jT808SessionManager.RemoveSessionByChannel(context.Channel); | |||
context.CloseAsync(); | |||
} | |||
} | |||
} | |||
@@ -0,0 +1,96 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Transport.Channels; | |||
using JT808.Protocol; | |||
using System; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Logging; | |||
using JT808.DotNetty.Core.Handlers; | |||
using JT808.DotNetty.Core.Services; | |||
using JT808.DotNetty.Core.Metadata; | |||
namespace JT808.DotNetty.Tcp.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808服务端处理程序 | |||
/// </summary> | |||
internal class JT808TcpServerHandler : SimpleChannelInboundHandler<byte[]> | |||
{ | |||
private readonly JT808MsgIdTcpHandlerBase handler; | |||
private readonly JT808TcpSessionManager jT808SessionManager; | |||
private readonly JT808TransmitAddressFilterService jT808TransmitAddressFilterService; | |||
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; | |||
private readonly JT808TcpAtomicCounterService jT808AtomicCounterService; | |||
private readonly ILogger<JT808TcpServerHandler> logger; | |||
public JT808TcpServerHandler( | |||
ILoggerFactory loggerFactory, | |||
JT808TransmitAddressFilterService jT808TransmitAddressFilterService, | |||
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, | |||
JT808MsgIdTcpHandlerBase handler, | |||
JT808TcpAtomicCounterService jT808AtomicCounterService, | |||
JT808TcpSessionManager jT808SessionManager) | |||
{ | |||
this.jT808TransmitAddressFilterService = jT808TransmitAddressFilterService; | |||
this.handler = handler; | |||
this.jT808SessionManager = jT808SessionManager; | |||
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; | |||
this.jT808AtomicCounterService = jT808AtomicCounterService; | |||
logger = loggerFactory.CreateLogger<JT808TcpServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) | |||
{ | |||
try | |||
{ | |||
jT808SourcePackageDispatcher?.SendAsync(msg); | |||
//解析到头部,然后根据具体的消息Id通过队列去进行消费 | |||
//要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, | |||
//解析具体的消息体,具体调用JT808Serializer.Deserialize<T> | |||
JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize<JT808HeaderPackage>(msg); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
jT808SessionManager.TryAdd(new JT808TcpSession(ctx.Channel, jT808HeaderPackage.Header.TerminalPhoneNo)); | |||
Func<JT808Request, JT808Response> handlerFunc; | |||
if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) | |||
{ | |||
JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg)); | |||
if (jT808Response != null) | |||
{ | |||
if (!jT808TransmitAddressFilterService.ContainsKey(ctx.Channel.RemoteAddress)) | |||
{ | |||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize))); | |||
} | |||
} | |||
} | |||
} | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,18 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Buffers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Codecs\JT808.DotNetty.Codecs.csproj" /> | |||
<ProjectReference Include="..\JT808.DotNetty.Core\JT808.DotNetty.Core.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,34 @@ | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Handlers; | |||
using JT808.DotNetty.Core.Services; | |||
using JT808.DotNetty.Handlers; | |||
using JT808.DotNetty.Tcp.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using Microsoft.Extensions.Hosting; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Reflection; | |||
using System.Runtime.CompilerServices; | |||
[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] | |||
namespace JT808.DotNetty.Tcp | |||
{ | |||
public static class JT808TcpDotnettyExtensions | |||
{ | |||
public static IServiceCollection AddJT808TcpHost(this IServiceCollection serviceDescriptors) | |||
{ | |||
serviceDescriptors.TryAddSingleton<JT808TcpSessionManager>(); | |||
serviceDescriptors.TryAddSingleton<JT808TcpAtomicCounterService>(); | |||
serviceDescriptors.TryAddSingleton<JT808TransmitAddressFilterService>(); | |||
serviceDescriptors.TryAddSingleton<JT808MsgIdTcpHandlerBase, JT808MsgIdDefaultTcpHandler>(); | |||
serviceDescriptors.TryAddScoped<JT808TcpConnectionHandler>(); | |||
serviceDescriptors.TryAddScoped<JT808TcpDecoder>(); | |||
serviceDescriptors.TryAddScoped<JT808TcpServerHandler>(); | |||
serviceDescriptors.AddHostedService<JT808TcpServerHost>(); | |||
return serviceDescriptors; | |||
} | |||
} | |||
} |
@@ -0,0 +1,95 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Core.Configurations; | |||
using JT808.DotNetty.Handlers; | |||
using JT808.DotNetty.Tcp.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Tcp | |||
{ | |||
/// <summary> | |||
/// JT808 Tcp网关服务 | |||
/// </summary> | |||
internal class JT808TcpServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
private readonly JT808Configuration configuration; | |||
private readonly ILogger<JT808TcpServerHost> logger; | |||
private DispatcherEventLoopGroup bossGroup; | |||
private WorkerEventLoopGroup workerGroup; | |||
private IChannel bootstrapChannel; | |||
private IByteBufferAllocator serverBufferAllocator; | |||
public JT808TcpServerHost( | |||
IServiceProvider provider, | |||
ILoggerFactory loggerFactory, | |||
IOptions<JT808Configuration> jT808ConfigurationAccessor) | |||
{ | |||
serviceProvider = provider; | |||
configuration = jT808ConfigurationAccessor.Value; | |||
logger=loggerFactory.CreateLogger<JT808TcpServerHost>(); | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
bossGroup = new DispatcherEventLoopGroup(); | |||
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); | |||
serverBufferAllocator = new PooledByteBufferAllocator(); | |||
ServerBootstrap bootstrap = new ServerBootstrap(); | |||
bootstrap.Group(bossGroup, workerGroup); | |||
bootstrap.Channel<TcpServerChannel>(); | |||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||
{ | |||
bootstrap | |||
.Option(ChannelOption.SoReuseport, true) | |||
.ChildOption(ChannelOption.SoReuseaddr, true); | |||
} | |||
bootstrap | |||
.Option(ChannelOption.SoBacklog, configuration.SoBacklog) | |||
.ChildOption(ChannelOption.Allocator, serverBufferAllocator) | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
using (var scope = serviceProvider.CreateScope()) | |||
{ | |||
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler( | |||
configuration.ReaderIdleTimeSeconds, | |||
configuration.WriterIdleTimeSeconds, | |||
configuration.AllIdleTimeSeconds)); | |||
channel.Pipeline.AddLast("jt808TcpConnection", scope.ServiceProvider.GetRequiredService<JT808TcpConnectionHandler>()); | |||
channel.Pipeline.AddLast("jt808TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue, | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }), | |||
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag }))); | |||
channel.Pipeline.AddLast("jt808TcpDecode", scope.ServiceProvider.GetRequiredService<JT808TcpDecoder>()); | |||
channel.Pipeline.AddLast("jt808TcpService", scope.ServiceProvider.GetRequiredService<JT808TcpServerHandler>()); | |||
} | |||
})); | |||
logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); | |||
return bootstrap.BindAsync(configuration.TcpPort) | |||
.ContinueWith(i => bootstrapChannel = i.Result); | |||
} | |||
public async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
await bootstrapChannel.CloseAsync(); | |||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
} | |||
} | |||
} |
@@ -4,6 +4,8 @@ | |||
<TargetFramework>netcoreapp2.2</TargetFramework> | |||
<IsPackable>false</IsPackable> | |||
<LangVersion>7.1</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
@@ -0,0 +1,15 @@ | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Handlers; | |||
namespace JT808.DotNetty.Udp | |||
{ | |||
/// <summary> | |||
/// 默认消息处理业务实现 | |||
/// </summary> | |||
internal class JT808MsgIdDefaultUdpHandler : JT808MsgIdUdpHandlerBase | |||
{ | |||
public JT808MsgIdDefaultUdpHandler(JT808UdpSessionManager sessionManager) : base(sessionManager) | |||
{ | |||
} | |||
} | |||
} |
@@ -0,0 +1,88 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Transport.Channels; | |||
using JT808.Protocol; | |||
using System; | |||
using Microsoft.Extensions.Logging; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using JT808.DotNetty.Core.Metadata; | |||
using JT808.DotNetty.Abstractions; | |||
using JT808.DotNetty.Core.Services; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Handlers; | |||
namespace JT808.DotNetty.Udp.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808 Udp服务端处理程序 | |||
/// </summary> | |||
internal class JT808UdpServerHandler : SimpleChannelInboundHandler<JT808UdpPackage> | |||
{ | |||
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; | |||
private readonly JT808UdpAtomicCounterService jT808UdpAtomicCounterService; | |||
private readonly ILogger<JT808UdpServerHandler> logger; | |||
private readonly JT808UdpSessionManager jT808UdpSessionManager; | |||
private readonly JT808MsgIdUdpHandlerBase handler; | |||
public JT808UdpServerHandler( | |||
ILoggerFactory loggerFactory, | |||
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, | |||
JT808MsgIdUdpHandlerBase handler, | |||
JT808UdpAtomicCounterService jT808UdpAtomicCounterService, | |||
JT808UdpSessionManager jT808UdpSessionManager) | |||
{ | |||
this.handler = handler; | |||
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; | |||
this.jT808UdpAtomicCounterService = jT808UdpAtomicCounterService; | |||
this.jT808UdpSessionManager = jT808UdpSessionManager; | |||
logger = loggerFactory.CreateLogger<JT808UdpServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UdpPackage msg) | |||
{ | |||
try | |||
{ | |||
jT808SourcePackageDispatcher?.SendAsync(msg.Buffer); | |||
//解析到头部,然后根据具体的消息Id通过队列去进行消费 | |||
//要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, | |||
//解析具体的消息体,具体调用JT808Serializer.Deserialize<T> | |||
JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize<JT808HeaderPackage>(msg.Buffer); | |||
jT808UdpAtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808UdpAtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
Func<JT808Request, JT808Response> handlerFunc; | |||
if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) | |||
{ | |||
JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg.Buffer)); | |||
if (jT808Response != null) | |||
{ | |||
ctx.WriteAndFlushAsync(new DatagramPacket(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize)), msg.Sender)); | |||
} | |||
} | |||
} | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808UdpAtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808UdpAtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808UdpAtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808UdpAtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,22 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
<LangVersion>7.1</LangVersion> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Buffers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Codecs\JT808.DotNetty.Codecs.csproj" /> | |||
<ProjectReference Include="..\JT808.DotNetty.Core\JT808.DotNetty.Core.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,32 @@ | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Core; | |||
using JT808.DotNetty.Core.Handlers; | |||
using JT808.DotNetty.Core.Services; | |||
using JT808.DotNetty.Udp; | |||
using JT808.DotNetty.Udp.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using Microsoft.Extensions.Hosting; | |||
using Newtonsoft.Json; | |||
using System; | |||
using System.Reflection; | |||
using System.Runtime.CompilerServices; | |||
[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] | |||
namespace JT808.DotNetty.Udp | |||
{ | |||
public static class JT808UdpDotnettyExtensions | |||
{ | |||
public static IServiceCollection AddJT808UdpHost(this IServiceCollection serviceDescriptors) | |||
{ | |||
serviceDescriptors.TryAddSingleton<JT808UdpSessionManager>(); | |||
serviceDescriptors.TryAddSingleton<JT808UdpAtomicCounterService>(); | |||
serviceDescriptors.TryAddSingleton<JT808MsgIdUdpHandlerBase, JT808MsgIdDefaultUdpHandler>(); | |||
serviceDescriptors.TryAddScoped<JT808UdpDecoder>(); | |||
serviceDescriptors.TryAddScoped<JT808UdpServerHandler>(); | |||
serviceDescriptors.AddHostedService<JT808UdpServerHost>(); | |||
return serviceDescriptors; | |||
} | |||
} | |||
} |
@@ -0,0 +1,76 @@ | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Core.Configurations; | |||
using JT808.DotNetty.Udp.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Udp | |||
{ | |||
/// <summary> | |||
/// JT808 Udp网关服务 | |||
/// </summary> | |||
internal class JT808UdpServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
private readonly JT808Configuration configuration; | |||
private readonly ILogger<JT808UdpServerHost> logger; | |||
private MultithreadEventLoopGroup group; | |||
private IChannel bootstrapChannel; | |||
public JT808UdpServerHost( | |||
IServiceProvider provider, | |||
ILoggerFactory loggerFactory, | |||
IOptions<JT808Configuration> jT808ConfigurationAccessor) | |||
{ | |||
serviceProvider = provider; | |||
configuration = jT808ConfigurationAccessor.Value; | |||
logger=loggerFactory.CreateLogger<JT808UdpServerHost>(); | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
group = new MultithreadEventLoopGroup(); | |||
Bootstrap bootstrap = new Bootstrap(); | |||
bootstrap.Group(group); | |||
bootstrap.Channel<SocketDatagramChannel>(); | |||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||
{ | |||
bootstrap | |||
.Option(ChannelOption.SoReuseport, true); | |||
} | |||
bootstrap | |||
.Option(ChannelOption.SoBacklog, configuration.SoBacklog) | |||
.Handler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
using (var scope = serviceProvider.CreateScope()) | |||
{ | |||
pipeline.AddLast(new JT808UdpDecoder()); | |||
pipeline.AddLast("jt808UdpService", scope.ServiceProvider.GetRequiredService<JT808UdpServerHandler>()); | |||
} | |||
})); | |||
logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}."); | |||
return bootstrap.BindAsync(configuration.UdpPort) | |||
.ContinueWith(i => bootstrapChannel = i.Result); | |||
} | |||
public async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
await bootstrapChannel.CloseAsync(); | |||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||
await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
} | |||
} | |||
} |
@@ -0,0 +1,27 @@ | |||
using JT808.DotNetty.Core.Handlers; | |||
namespace JT808.DotNetty.WebApi.Handlers | |||
{ | |||
/// <summary> | |||
/// 默认消息处理业务实现 | |||
/// </summary> | |||
internal class JT808MsgIdDefaultWebApiHandler : JT808MsgIdHttpHandlerBase | |||
{ | |||
private const string sessionRoutePrefix = "Session"; | |||
private const string sourcePackagePrefix = "SourcePackage"; | |||
private const string transmitPrefix = "Transmit"; | |||
//1.TCP一套注入 | |||
//2.UDP一套注入 | |||
//3.统一的一套注入 | |||
public JT808MsgIdDefaultWebApiHandler( | |||
) | |||
{ | |||
} | |||
} | |||
} |
@@ -0,0 +1,82 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs.Http; | |||
using DotNetty.Common.Utilities; | |||
using DotNetty.Transport.Channels; | |||
using JT808.DotNetty.Core.Handlers; | |||
using JT808.DotNetty.Core.Metadata; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Text; | |||
namespace JT808.DotNetty.WebApi.Handlers | |||
{ | |||
/// <summary> | |||
/// jt808 webapi服务 | |||
/// 请求量不大,只支持JSON格式并且只支持post发数据 | |||
/// ref: dotnetty HttpServer | |||
/// </summary> | |||
internal class JT808WebAPIServerHandler : SimpleChannelInboundHandler<IFullHttpRequest> | |||
{ | |||
private static readonly AsciiString TypeJson = AsciiString.Cached("application/json"); | |||
private static readonly AsciiString ServerName = AsciiString.Cached("JT808WebAPINetty"); | |||
private static readonly AsciiString ContentTypeEntity = HttpHeaderNames.ContentType; | |||
private static readonly AsciiString DateEntity = HttpHeaderNames.Date; | |||
private static readonly AsciiString ContentLengthEntity = HttpHeaderNames.ContentLength; | |||
private static readonly AsciiString ServerEntity = HttpHeaderNames.Server; | |||
private readonly JT808MsgIdHttpHandlerBase jT808MsgIdHttpHandlerBase; | |||
private readonly ILogger<JT808WebAPIServerHandler> logger; | |||
public JT808WebAPIServerHandler( | |||
JT808MsgIdHttpHandlerBase jT808MsgIdHttpHandlerBase, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT808MsgIdHttpHandlerBase = jT808MsgIdHttpHandlerBase; | |||
logger = loggerFactory.CreateLogger<JT808WebAPIServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, IFullHttpRequest msg) | |||
{ | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug($"Uri:{msg.Uri}"); | |||
logger.LogDebug($"Content:{msg.Content.ToString(Encoding.UTF8)}"); | |||
} | |||
JT808HttpResponse jT808HttpResponse = null; | |||
if (jT808MsgIdHttpHandlerBase.HandlerDict.TryGetValue(msg.Uri,out var funcHandler)) | |||
{ | |||
jT808HttpResponse = funcHandler( new JT808HttpRequest() { Json = msg.Content.ToString(Encoding.UTF8)}); | |||
} | |||
else | |||
{ | |||
jT808HttpResponse = jT808MsgIdHttpHandlerBase.NotFoundHttpResponse(); | |||
} | |||
if (jT808HttpResponse != null) | |||
{ | |||
WriteResponse(ctx, Unpooled.WrappedBuffer(jT808HttpResponse.Data), TypeJson, jT808HttpResponse.Data.Length); | |||
} | |||
} | |||
private void WriteResponse(IChannelHandlerContext ctx, IByteBuffer buf, ICharSequence contentType, int contentLength) | |||
{ | |||
// Build the response object. | |||
var response = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK, buf, false); | |||
HttpHeaders headers = response.Headers; | |||
headers.Set(ContentTypeEntity, contentType); | |||
headers.Set(ServerEntity, ServerName); | |||
headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); | |||
headers.Set(ContentLengthEntity, contentLength); | |||
// Close the non-keep-alive connection after the write operation is done. | |||
ctx.WriteAndFlushAsync(response); | |||
ctx.CloseAsync(); | |||
} | |||
public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) | |||
{ | |||
WriteResponse(context, Unpooled.WrappedBuffer(jT808MsgIdHttpHandlerBase.ErrorHttpResponse(exception).Data), TypeJson, jT808MsgIdHttpHandlerBase.ErrorHttpResponse(exception).Data.Length); | |||
logger.LogError(exception, exception.Message); | |||
context.CloseAsync(); | |||
} | |||
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); | |||
} | |||
} |
@@ -0,0 +1,18 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netstandard2.0</TargetFramework> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="DotNetty.Buffers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Core\JT808.DotNetty.Core.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,82 @@ | |||
using DotNetty.Codecs.Http; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Core.Configurations; | |||
using JT808.DotNetty.WebApi.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.WebApi | |||
{ | |||
/// <summary> | |||
/// JT808 集成一个webapi服务 | |||
/// </summary> | |||
internal class JT808WebAPIServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
private readonly JT808Configuration configuration; | |||
private readonly ILogger<JT808WebAPIServerHost> logger; | |||
private DispatcherEventLoopGroup bossGroup; | |||
private WorkerEventLoopGroup workerGroup; | |||
private IChannel bootstrapChannel; | |||
public JT808WebAPIServerHost( | |||
IServiceProvider provider, | |||
ILoggerFactory loggerFactory, | |||
IOptions<JT808Configuration> jT808ConfigurationAccessor) | |||
{ | |||
serviceProvider = provider; | |||
configuration = jT808ConfigurationAccessor.Value; | |||
logger = loggerFactory.CreateLogger<JT808WebAPIServerHost>(); | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
bossGroup = new DispatcherEventLoopGroup(); | |||
workerGroup = new WorkerEventLoopGroup(bossGroup, 1); | |||
ServerBootstrap bootstrap = new ServerBootstrap(); | |||
bootstrap.Group(bossGroup, workerGroup); | |||
bootstrap.Channel<TcpServerChannel>(); | |||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||
{ | |||
bootstrap | |||
.Option(ChannelOption.SoReuseport, true) | |||
.ChildOption(ChannelOption.SoReuseaddr, true); | |||
} | |||
bootstrap | |||
.Option(ChannelOption.SoBacklog, 8192) | |||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
using (var scope = serviceProvider.CreateScope()) | |||
{ | |||
pipeline.AddLast("http_encoder", new HttpResponseEncoder()); | |||
pipeline.AddLast("http_decoder", new HttpRequestDecoder(4096, 8192, 8192, false)); | |||
//将多个消息转换为单一的request或者response对象 =>IFullHttpRequest | |||
pipeline.AddLast("http_aggregator", new HttpObjectAggregator(65536)); | |||
pipeline.AddLast("http_jt808webapihandler", scope.ServiceProvider.GetRequiredService<JT808WebAPIServerHandler>()); | |||
} | |||
})); | |||
logger.LogInformation($"JT808 WebAPI Server start at {IPAddress.Any}:{configuration.WebApiPort}."); | |||
return bootstrap.BindAsync(configuration.WebApiPort).ContinueWith(i => bootstrapChannel = i.Result); | |||
} | |||
public async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
await bootstrapChannel.CloseAsync(); | |||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
} | |||
} | |||
} |
@@ -0,0 +1,21 @@ | |||
using JT808.DotNetty.Core.Handlers; | |||
using JT808.DotNetty.WebApi.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||
using System.Runtime.CompilerServices; | |||
[assembly: InternalsVisibleTo("JT808.DotNetty.Test")] | |||
namespace JT808.DotNetty.WebApi | |||
{ | |||
public static class JT808WebApiDotnettyExtensions | |||
{ | |||
public static IServiceCollection AddJT808WebApiHost(this IServiceCollection serviceDescriptors) | |||
{ | |||
serviceDescriptors.TryAddSingleton<JT808MsgIdHttpHandlerBase, JT808MsgIdDefaultWebApiHandler>(); | |||
serviceDescriptors.TryAddScoped<JT808WebAPIServerHandler>(); | |||
serviceDescriptors.AddHostedService<JT808WebAPIServerHost>(); | |||
return serviceDescriptors; | |||
} | |||
} | |||
} |
@@ -5,14 +5,26 @@ VisualStudioVersion = 15.0.28307.168 | |||
MinimumVisualStudioVersion = 10.0.40219.1 | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty", "JT808.DotNetty\JT808.DotNetty.csproj", "{80C7F67E-6B7C-4178-8726-ADD3695622DD}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Hosting", "JT808.DotNetty.Hosting\JT808.DotNetty.Hosting.csproj", "{46772BD5-4132-48A7-856B-11D658F7ADDB}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Test", "JT808.DotNetty.Test\JT808.DotNetty.Test.csproj", "{7315D030-16CA-4AC8-B892-720F3D78C651}" | |||
EndProject | |||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "src", "src", "{B5A80356-5AF6-449F-9D8B-3C1BBB9D2443}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Protocol", "JT808.Protocol\src\JT808.Protocol\JT808.Protocol.csproj", "{9FCA2EE9-8253-41AA-A64C-9883413864F9}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Udp", "JT808.DotNetty.Udp\JT808.DotNetty.Udp.csproj", "{C960084C-2CF4-4748-AD35-D2384285D6A3}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Codecs", "JT808.DotNetty.Codecs\JT808.DotNetty.Codecs.csproj", "{42513FBA-1D8F-4F91-A74F-25E06C7BD027}" | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Core", "JT808.DotNetty.Core\JT808.DotNetty.Core.csproj", "{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Abstractions", "JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj", "{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}" | |||
EndProject | |||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{3BD7FF02-8516-4A77-A385-9FDCDD792E22}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Tcp", "JT808.DotNetty.Tcp\JT808.DotNetty.Tcp.csproj", "{330CD783-5564-4083-ABFC-573CDC369F50}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.WebApi", "JT808.DotNetty.WebApi\JT808.DotNetty.WebApi.csproj", "{B783DE53-CE2A-4225-921F-04E5E57B28F3}" | |||
EndProject | |||
Global | |||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | |||
Debug|Any CPU = Debug|Any CPU | |||
@@ -23,10 +35,6 @@ Global | |||
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{46772BD5-4132-48A7-856B-11D658F7ADDB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{46772BD5-4132-48A7-856B-11D658F7ADDB}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{46772BD5-4132-48A7-856B-11D658F7ADDB}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{46772BD5-4132-48A7-856B-11D658F7ADDB}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{7315D030-16CA-4AC8-B892-720F3D78C651}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{7315D030-16CA-4AC8-B892-720F3D78C651}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{7315D030-16CA-4AC8-B892-720F3D78C651}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
@@ -35,11 +43,37 @@ Global | |||
{9FCA2EE9-8253-41AA-A64C-9883413864F9}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{9FCA2EE9-8253-41AA-A64C-9883413864F9}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{9FCA2EE9-8253-41AA-A64C-9883413864F9}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{C960084C-2CF4-4748-AD35-D2384285D6A3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{C960084C-2CF4-4748-AD35-D2384285D6A3}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{C960084C-2CF4-4748-AD35-D2384285D6A3}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{C960084C-2CF4-4748-AD35-D2384285D6A3}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{42513FBA-1D8F-4F91-A74F-25E06C7BD027}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{67C5DC72-0004-48B3-BB5A-9CB7069B4F02}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{4DCF33C0-67C5-4179-AF1E-4E919F9F856D}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{330CD783-5564-4083-ABFC-573CDC369F50}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{330CD783-5564-4083-ABFC-573CDC369F50}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{330CD783-5564-4083-ABFC-573CDC369F50}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{330CD783-5564-4083-ABFC-573CDC369F50}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{B783DE53-CE2A-4225-921F-04E5E57B28F3}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{B783DE53-CE2A-4225-921F-04E5E57B28F3}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{B783DE53-CE2A-4225-921F-04E5E57B28F3}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{B783DE53-CE2A-4225-921F-04E5E57B28F3}.Release|Any CPU.Build.0 = Release|Any CPU | |||
EndGlobalSection | |||
GlobalSection(SolutionProperties) = preSolution | |||
HideSolutionNode = FALSE | |||
EndGlobalSection | |||
GlobalSection(NestedProjects) = preSolution | |||
{80C7F67E-6B7C-4178-8726-ADD3695622DD} = {B5A80356-5AF6-449F-9D8B-3C1BBB9D2443} | |||
{7315D030-16CA-4AC8-B892-720F3D78C651} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} | |||
{9FCA2EE9-8253-41AA-A64C-9883413864F9} = {B5A80356-5AF6-449F-9D8B-3C1BBB9D2443} | |||
EndGlobalSection | |||
GlobalSection(ExtensibilityGlobals) = postSolution | |||
@@ -0,0 +1,31 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Transport.Channels; | |||
using Microsoft.Extensions.Logging; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using JT808.Protocol; | |||
using JT808.DotNetty.Internal; | |||
using JT808.DotNetty.Interfaces; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using JT808.DotNetty.Metadata; | |||
namespace JT808.DotNetty.Codecs | |||
{ | |||
/// <summary> | |||
/// JT808 UDP解码 | |||
/// </summary> | |||
internal class JT808UDPDecoder : MessageToMessageDecoder<DatagramPacket> | |||
{ | |||
protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List<object> output) | |||
{ | |||
IByteBuffer byteBuffer = message.Content; | |||
byte[] buffer = new byte[byteBuffer.ReadableBytes]; | |||
byteBuffer.ReadBytes(buffer); | |||
output.Add(new JT808UDPPackage(buffer, message.Sender)); | |||
} | |||
} | |||
} |
@@ -8,6 +8,8 @@ namespace JT808.DotNetty.Configurations | |||
{ | |||
public int Port { get; set; } = 808; | |||
public int UDPPort { get; set; } = 809; | |||
public int QuietPeriodSeconds { get; set; } = 1; | |||
public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | |||
@@ -0,0 +1,88 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Transport.Channels; | |||
using JT808.Protocol; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using JT808.DotNetty.Metadata; | |||
using JT808.DotNetty.Internal; | |||
using JT808.DotNetty.Interfaces; | |||
using Microsoft.Extensions.Logging; | |||
using DotNetty.Transport.Channels.Sockets; | |||
namespace JT808.DotNetty.Handlers | |||
{ | |||
/// <summary> | |||
/// JT808 UDP服务端处理程序 | |||
/// </summary> | |||
internal class JT808UDPServerHandler : SimpleChannelInboundHandler<JT808UDPPackage> | |||
{ | |||
private readonly JT808MsgIdHandlerBase handler; | |||
private readonly JT808SessionManager jT808SessionManager; | |||
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher; | |||
private readonly JT808AtomicCounterService jT808AtomicCounterService; | |||
private readonly ILogger<JT808UDPServerHandler> logger; | |||
public JT808UDPServerHandler( | |||
ILoggerFactory loggerFactory, | |||
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher, | |||
JT808MsgIdHandlerBase handler, | |||
JT808AtomicCounterService jT808AtomicCounterService, | |||
JT808SessionManager jT808SessionManager) | |||
{ | |||
this.handler = handler; | |||
this.jT808SessionManager = jT808SessionManager; | |||
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher; | |||
this.jT808AtomicCounterService = jT808AtomicCounterService; | |||
logger = loggerFactory.CreateLogger<JT808UDPServerHandler>(); | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UDPPackage msg) | |||
{ | |||
try | |||
{ | |||
jT808SourcePackageDispatcher?.SendAsync(msg.Buffer); | |||
//解析到头部,然后根据具体的消息Id通过队列去进行消费 | |||
//要是一定要解析到数据体可以在JT808MsgIdHandlerBase类中根据具体的消息, | |||
//解析具体的消息体,具体调用JT808Serializer.Deserialize<T> | |||
JT808HeaderPackage jT808HeaderPackage = JT808Serializer.Deserialize<JT808HeaderPackage>(msg.Buffer); | |||
jT808AtomicCounterService.MsgSuccessIncrement(); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug("accept package success count<<<" + jT808AtomicCounterService.MsgSuccessCount.ToString()); | |||
} | |||
Func<JT808Request, JT808Response> handlerFunc; | |||
if (handler.HandlerDict.TryGetValue(jT808HeaderPackage.Header.MsgId, out handlerFunc)) | |||
{ | |||
JT808Response jT808Response = handlerFunc(new JT808Request(jT808HeaderPackage, msg.Buffer)); | |||
if (jT808Response != null) | |||
{ | |||
ctx.WriteAndFlushAsync(new DatagramPacket(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize)), msg.Sender)); | |||
} | |||
} | |||
} | |||
catch (JT808.Protocol.Exceptions.JT808Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); | |||
} | |||
} | |||
catch (Exception ex) | |||
{ | |||
jT808AtomicCounterService.MsgFailIncrement(); | |||
if (logger.IsEnabled(LogLevel.Error)) | |||
{ | |||
logger.LogError("accept package fail count<<<" + jT808AtomicCounterService.MsgFailCount.ToString()); | |||
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); | |||
} | |||
} | |||
} | |||
} | |||
} |
@@ -1,12 +0,0 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Interfaces | |||
{ | |||
public interface IJT808Publishing | |||
{ | |||
Task PublishAsync(string topicName,string key,string value); | |||
} | |||
} |
@@ -5,8 +5,8 @@ using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Interfaces | |||
{ | |||
public interface IJT808SessionPublishing : IJT808Publishing | |||
public interface IJT808SessionPublishing | |||
{ | |||
Task PublishAsync(string topicName, string key, string value); | |||
} | |||
} |
@@ -6,7 +6,6 @@ using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Linq; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT808.DotNetty.Internal | |||
{ | |||
@@ -45,6 +45,7 @@ namespace JT808.DotNetty | |||
services.TryAddScoped<JT808ConnectionHandler>(); | |||
services.TryAddScoped<JT808Decoder>(); | |||
services.TryAddScoped<JT808ServerHandler>(); | |||
services.TryAddScoped<JT808UDPServerHandler>(); | |||
services.TryAddSingleton<IJT808SessionService, JT808SessionServiceDefaultImpl>(); | |||
services.TryAddSingleton<IJT808UnificationSendService, JT808UnificationSendServiceDefaultImpl>(); | |||
services.TryAddSingleton<JT808WebAPIService>(); | |||
@@ -0,0 +1,82 @@ | |||
using DotNetty.Buffers; | |||
using DotNetty.Codecs; | |||
using DotNetty.Handlers.Timeout; | |||
using DotNetty.Transport.Bootstrapping; | |||
using DotNetty.Transport.Channels; | |||
using DotNetty.Transport.Channels.Sockets; | |||
using DotNetty.Transport.Libuv; | |||
using JT808.DotNetty.Codecs; | |||
using JT808.DotNetty.Configurations; | |||
using JT808.DotNetty.Handlers; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Hosting; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Runtime.InteropServices; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty | |||
{ | |||
/// <summary> | |||
/// JT808 Udp网关服务 | |||
/// </summary> | |||
internal class JT808UdpServerHost : IHostedService | |||
{ | |||
private readonly IServiceProvider serviceProvider; | |||
private readonly JT808Configuration configuration; | |||
private readonly ILogger<JT808UdpServerHost> logger; | |||
private MultithreadEventLoopGroup group; | |||
private IChannel bootstrapChannel; | |||
public JT808UdpServerHost( | |||
IServiceProvider provider, | |||
ILoggerFactory loggerFactory, | |||
IOptions<JT808Configuration> jT808ConfigurationAccessor) | |||
{ | |||
serviceProvider = provider; | |||
configuration = jT808ConfigurationAccessor.Value; | |||
logger=loggerFactory.CreateLogger<JT808UdpServerHost>(); | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
group = new MultithreadEventLoopGroup(); | |||
Bootstrap bootstrap = new Bootstrap(); | |||
bootstrap.Group(group); | |||
bootstrap.Channel<SocketDatagramChannel>(); | |||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||
{ | |||
bootstrap | |||
.Option(ChannelOption.SoReuseport, true); | |||
} | |||
bootstrap | |||
.Option(ChannelOption.SoBacklog, configuration.SoBacklog) | |||
.Handler(new ActionChannelInitializer<IChannel>(channel => | |||
{ | |||
IChannelPipeline pipeline = channel.Pipeline; | |||
using (var scope = serviceProvider.CreateScope()) | |||
{ | |||
pipeline.AddLast(new JT808UDPDecoder()); | |||
pipeline.AddLast("jt808UDPService", scope.ServiceProvider.GetRequiredService<JT808UDPServerHandler>()); | |||
} | |||
})); | |||
logger.LogInformation($"Udp Server start at {IPAddress.Any}:{configuration.UDPPort}."); | |||
return bootstrap.BindAsync(configuration.UDPPort) | |||
.ContinueWith(i => bootstrapChannel = i.Result); | |||
} | |||
public async Task StopAsync(CancellationToken cancellationToken) | |||
{ | |||
await bootstrapChannel.CloseAsync(); | |||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||
await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||
} | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Net; | |||
using System.Text; | |||
namespace JT808.DotNetty.Metadata | |||
{ | |||
internal class JT808UDPPackage | |||
{ | |||
public JT808UDPPackage(byte[] buffer, EndPoint sender) | |||
{ | |||
Buffer = buffer; | |||
Sender = sender; | |||
} | |||
public byte[] Buffer { get; } | |||
public EndPoint Sender { get; } | |||
} | |||
} |