@@ -14,7 +14,7 @@ | |||||
[玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md) | [玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md) | ||||
[](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)[]() | |||||
[](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE) | |||||
## 新网关的优势 | ## 新网关的优势 | ||||
@@ -59,14 +59,22 @@ | |||||
[GRPC消息业务处理协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto) | [GRPC消息业务处理协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto) | ||||
## 基于core 3.1 Pipeline的NuGet安装 | |||||
## 基于core 3.1 Pipeline | |||||
Pipeline分为两种方式使用,一种是使用队列的方式,一种是网关集成的方式。 | |||||
| 使用方式 | 特性 |备注| | |||||
|:---|:---|:---|:---| | |||||
|使用队列|网关不需要重启,相当于透传数据,设备上来的数据直接入队列,通过服务去处理消息。|设备多的可以这样搞,这样关注点在业务上面。| | |||||
|使用网关集成|网关需要根据消息业务的变化去处理,也就意味着更改业务,需要重启网关,但是上手简单。|设备少的,开发能力弱的,允许设备丢点数据的。| | |||||
### Pipeline的NuGet安装 | |||||
| Package Name | Version | Downloads | | | Package Name | Version | Downloads | | ||||
| --------------------- | -------------------------------------------------- | --------------------------------------------------- | | | --------------------- | -------------------------------------------------- | --------------------------------------------------- | | ||||
| Install-Package JT808.Gateway.Abstractions|  |  | | | Install-Package JT808.Gateway.Abstractions|  |  | | ||||
| Install-Package JT808.Gateway |  |  | | | Install-Package JT808.Gateway |  |  | | ||||
| Install-Package JT808.Gateway.Client|  |  | | | Install-Package JT808.Gateway.Client|  |  | | ||||
| Install-Package JT808.Gateway.InMemoryMQ|  |  | | |||||
| Install-Package JT808.Gateway.Kafka|  |  | | | Install-Package JT808.Gateway.Kafka|  |  | | ||||
| Install-Package JT808.Gateway.Transmit |  |  | | | Install-Package JT808.Gateway.Transmit |  |  | | ||||
| Install-Package JT808.Gateway.Traffic |  | | | | Install-Package JT808.Gateway.Traffic |  | | | ||||
@@ -75,7 +83,9 @@ | |||||
| Install-Package JT808.Gateway.MsgLogging |  | | | | Install-Package JT808.Gateway.MsgLogging |  | | | ||||
| Install-Package JT808.Gateway.MsgIdHandler |  | | | | Install-Package JT808.Gateway.MsgIdHandler |  | | | ||||
## 基于DotNetty的NuGet安装 | |||||
## 基于DotNetty | |||||
### DotNetty的NuGet安装 | |||||
| Package Name | Version | Downloads | | | Package Name | Version | Downloads | | ||||
| --------------------- | -------------------------------------------------- | --------------------------------------------------- | | | --------------------- | -------------------------------------------------- | --------------------------------------------------- | | ||||
@@ -6,14 +6,14 @@ | |||||
> 注意1:连接数和并发数要区分开; | > 注意1:连接数和并发数要区分开; | ||||
> 注意2:阿里云的机器默认有连接数限制(5000),可以先创建一台,把该装的软件安装好,tcp参数内核调优后,在备份一个系统镜像在玩; | > 注意2:阿里云的机器默认有连接数限制(5000),可以先创建一台,把该装的软件安装好,tcp参数内核调优后,在备份一个系统镜像在玩; | ||||
> 注意3: 使用的是内存队列(InMemoryMQ)进行测试。 | |||||
> 注意3: 使用的是网关集成的方式进行测试。 | |||||
``` 1 | ``` 1 | ||||
//使用PM2托管 | //使用PM2托管 | ||||
//服务端 | //服务端 | ||||
cd /data/JT808.Gateway | cd /data/JT808.Gateway | ||||
pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" | |||||
pm2 start "dotnet JT808.Gateway.ServerBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" | |||||
//客户端 | //客户端 | ||||
cd /data/JT808Client | cd /data/JT808Client | ||||
@@ -51,8 +51,8 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"JT808Configuration": { | "JT808Configuration": { | ||||
"TcpPort": 808, | "TcpPort": 808, | ||||
"UdpPort": 808, | "UdpPort": 808, | ||||
"MiniNumBufferSize": 80960, | |||||
"SoBacklog": 102400 | |||||
"MiniNumBufferSize": 102400, | |||||
"SoBacklog": 204800 | |||||
} | } | ||||
``` | ``` | ||||
@@ -89,8 +89,8 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"JT808Configuration": { | "JT808Configuration": { | ||||
"TcpPort": 808, | "TcpPort": 808, | ||||
"UdpPort": 808, | "UdpPort": 808, | ||||
"MiniNumBufferSize": 80960, | |||||
"SoBacklog": 102400 | |||||
"MiniNumBufferSize": 102400, | |||||
"SoBacklog": 204800 | |||||
} | } | ||||
``` | ``` | ||||
@@ -127,8 +127,8 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"JT808Configuration": { | "JT808Configuration": { | ||||
"TcpPort": 808, | "TcpPort": 808, | ||||
"UdpPort": 808, | "UdpPort": 808, | ||||
"MiniNumBufferSize": 80960, | |||||
"SoBacklog": 102400 | |||||
"MiniNumBufferSize": 102400, | |||||
"SoBacklog": 204800 | |||||
} | } | ||||
``` | ``` | ||||
@@ -144,6 +144,7 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
|:-------:|:-------:|:-------:| | |:-------:|:-------:|:-------:| | ||||
| centos7 | 8c16g | JT808服务端 | | | centos7 | 8c16g | JT808服务端 | | ||||
| centos7 | 8c16g | JT808客户端 | | | centos7 | 8c16g | JT808客户端 | | ||||
| centos7 | 8c16g | JT808客户端 | | |||||
> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS | > 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS | ||||
@@ -158,12 +159,10 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"Interval": 1000, | "Interval": 1000, | ||||
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 | "DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 | ||||
} | } | ||||
修改wwwroot下index.html的webapi接口地址 | |||||
127.0.0.1:15004/index.html | |||||
``` | ``` | ||||
``` 2 | ``` 2 | ||||
"urls": "http://*:15005;", | |||||
"urls": "http://*:15004;", | |||||
"ClientBenchmarkOptions": { | "ClientBenchmarkOptions": { | ||||
"IP": "", | "IP": "", | ||||
"Port": 808, | "Port": 808, | ||||
@@ -171,8 +170,6 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"Interval": 1000, | "Interval": 1000, | ||||
"DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 | "DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 | ||||
} | } | ||||
修改wwwroot下index.html的webapi接口地址 | |||||
127.0.0.1:15005/index.html | |||||
``` | ``` | ||||
> 一个进程的线程是有限的,所以分两个进程进行测试 | > 一个进程的线程是有限的,所以分两个进程进行测试 | ||||
@@ -183,7 +180,7 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
"JT808Configuration": { | "JT808Configuration": { | ||||
"TcpPort": 808, | "TcpPort": 808, | ||||
"UdpPort": 808, | "UdpPort": 808, | ||||
"MiniNumBufferSize": 80960, | |||||
"MiniNumBufferSize": 102400, | |||||
"SoBacklog": 102400 | "SoBacklog": 102400 | ||||
} | } | ||||
``` | ``` | ||||
@@ -192,7 +189,7 @@ pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Produ | |||||
 |  | ||||
> 由于资源被占满了,所以客户端的界面访问不到,但是不影响总体。 | |||||
 | |||||
### 60K | ### 60K | ||||
@@ -0,0 +1,18 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Enums | |||||
{ | |||||
public enum JT808UseType : byte | |||||
{ | |||||
/// <summary> | |||||
/// 使用正常方式 | |||||
/// </summary> | |||||
Normal = 1, | |||||
/// <summary> | |||||
/// 使用队列方式 | |||||
/// </summary> | |||||
Queue = 2 | |||||
} | |||||
} |
@@ -0,0 +1,23 @@ | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Net.Sockets; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public static class JT808SessionExtensions | |||||
{ | |||||
public static async void SendAsync(this IJT808Session session,byte[] data) | |||||
{ | |||||
if (session.TransportProtocolType == JT808TransportProtocolType.tcp) | |||||
{ | |||||
await session.Client.SendAsync(data, SocketFlags.None); | |||||
} | |||||
else | |||||
{ | |||||
await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -1,12 +0,0 @@ | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public interface IJT808MsgConsumerFactory | |||||
{ | |||||
IJT808MsgConsumer Create(JT808ConsumerType consumerType); | |||||
} | |||||
} |
@@ -1,13 +0,0 @@ | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public interface IJT808MsgReplyConsumerFactory | |||||
{ | |||||
IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType); | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public interface IJT808NormalGatewayBuilder: IJT808GatewayBuilder | |||||
{ | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public interface IJT808QueueGatewayBuilder: IJT808GatewayBuilder | |||||
{ | |||||
} | |||||
} |
@@ -6,7 +6,7 @@ using System.Net.Sockets; | |||||
using System.Text; | using System.Text; | ||||
using System.Threading; | using System.Threading; | ||||
namespace JT808.Gateway.Interfaces | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | { | ||||
public interface IJT808Session | public interface IJT808Session | ||||
{ | { |
@@ -29,7 +29,7 @@ | |||||
<PrivateAssets>all</PrivateAssets> | <PrivateAssets>all</PrivateAssets> | ||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | ||||
</PackageReference> | </PackageReference> | ||||
<PackageReference Include="JT808" Version="2.2.6" /> | |||||
<PackageReference Include="JT808" Version="2.2.7" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.1" /> | <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.1" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
@@ -0,0 +1,179 @@ | |||||
using JT808.Protocol; | |||||
using JT808.Protocol.Enums; | |||||
using JT808.Protocol.Extensions; | |||||
using JT808.Protocol.MessageBody; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Abstractions | |||||
{ | |||||
public class JT808NormalReplyMessageHandler | |||||
{ | |||||
protected Dictionary<ushort, MsgIdMethodDelegate> HandlerDict { get; } | |||||
protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package, IJT808Session session); | |||||
protected JT808Serializer JT808Serializer { get; } | |||||
public JT808NormalReplyMessageHandler( | |||||
IJT808Config jT808Config) | |||||
{ | |||||
this.JT808Serializer = jT808Config.GetSerializer(); | |||||
HandlerDict = new Dictionary<ushort, MsgIdMethodDelegate> { | |||||
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, | |||||
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, | |||||
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, | |||||
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, | |||||
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, | |||||
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, | |||||
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, | |||||
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } | |||||
}; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="request">请求数据</param> | |||||
/// <param name="session">当前会话</param> | |||||
/// <returns>应答消息数据</returns> | |||||
public virtual byte[] Processor(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
if (HandlerDict.TryGetValue(request.Header.MsgId, out var func)) | |||||
{ | |||||
return func(request, session); | |||||
} | |||||
else | |||||
{ | |||||
//处理不了的消息统一回复通用应答 | |||||
return CommonReply(request, session); | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// 平台通用应答 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <param name="session"></param> | |||||
public virtual byte[] CommonReply(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
byte[] data = JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
session.SendAsync(data); | |||||
return data; | |||||
} | |||||
else | |||||
{ | |||||
byte[] data = JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
session.SendAsync(data); | |||||
return data; | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// 终端通用应答 | |||||
/// 平台无需回复 | |||||
/// 实现自己的业务 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0001(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return default; | |||||
} | |||||
/// <summary> | |||||
/// 终端心跳 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0002(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 终端注销 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0003(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 终端注册 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0100(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
byte[] data = JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create_终端注册应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8100() | |||||
{ | |||||
Code = "J" + request.Header.TerminalPhoneNo, | |||||
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, | |||||
AckMsgNum = request.Header.MsgNum | |||||
})); | |||||
session.SendAsync(data); | |||||
return data; | |||||
} | |||||
else | |||||
{ | |||||
byte[] data = JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8100() | |||||
{ | |||||
Code = "J" + request.Header.TerminalPhoneNo, | |||||
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, | |||||
AckMsgNum = request.Header.MsgNum | |||||
})); | |||||
session.SendAsync(data); | |||||
return data; | |||||
} | |||||
} | |||||
/// <summary> | |||||
/// 终端鉴权 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0102(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 位置信息汇报 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 定位数据批量上传 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0704(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 数据上行透传 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0900(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
return CommonReply(request, session); | |||||
} | |||||
} | |||||
} |
@@ -8,14 +8,14 @@ using System.Text; | |||||
namespace JT808.Gateway.Abstractions | namespace JT808.Gateway.Abstractions | ||||
{ | { | ||||
public class JT808ReplyMessageHandler | |||||
public class JT808QueueReplyMessageHandler | |||||
{ | { | ||||
protected Dictionary<ushort, MsgIdMethodDelegate> HandlerDict { get; } | protected Dictionary<ushort, MsgIdMethodDelegate> HandlerDict { get; } | ||||
protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package); | protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package); | ||||
protected JT808Serializer JT808Serializer { get; } | protected JT808Serializer JT808Serializer { get; } | ||||
protected IJT808MsgReplyProducer JT808MsgReplyProducer { get; } | protected IJT808MsgReplyProducer JT808MsgReplyProducer { get; } | ||||
public JT808ReplyMessageHandler( | |||||
public JT808QueueReplyMessageHandler( | |||||
IJT808Config jT808Config, | IJT808Config jT808Config, | ||||
IJT808MsgReplyProducer jT808MsgReplyProducer) | IJT808MsgReplyProducer jT808MsgReplyProducer) | ||||
{ | { | ||||
@@ -35,40 +35,23 @@ namespace JT808.Gateway.Abstractions | |||||
public virtual void Processor((string TerminalNo, byte[] Data) parameter) | public virtual void Processor((string TerminalNo, byte[] Data) parameter) | ||||
{ | { | ||||
try | |||||
var request = JT808Serializer.HeaderDeserialize(parameter.Data); | |||||
if (HandlerDict.TryGetValue(request.Header.MsgId, out var func)) | |||||
{ | { | ||||
var request = JT808Serializer.HeaderDeserialize(parameter.Data); | |||||
if (HandlerDict.TryGetValue(request.Header.MsgId, out var func)) | |||||
var buffer = func(request); | |||||
if (buffer != null) | |||||
{ | { | ||||
var buffer = func(request); | |||||
if (buffer != null) | |||||
{ | |||||
JT808MsgReplyProducer.ProduceAsync(parameter.TerminalNo, buffer); | |||||
} | |||||
JT808MsgReplyProducer.ProduceAsync(parameter.TerminalNo, buffer); | |||||
} | } | ||||
} | } | ||||
catch | |||||
{ | |||||
} | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 终端通用应答 | |||||
/// 平台无需回复 | |||||
/// 实现自己的业务 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0001(JT808HeaderPackage request) | |||||
{ | |||||
return null; | |||||
} | |||||
/// <summary> | |||||
/// 终端心跳 | |||||
/// 平台通用应答 | |||||
/// </summary> | /// </summary> | ||||
/// <param name="request"></param> | /// <param name="request"></param> | ||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0002(JT808HeaderPackage request) | |||||
/// <param name="session"></param> | |||||
public virtual byte[] CommonReply(JT808HeaderPackage request) | |||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | if (request.Version == JT808Version.JTT2019) | ||||
{ | { | ||||
@@ -89,6 +72,27 @@ namespace JT808.Gateway.Abstractions | |||||
})); | })); | ||||
} | } | ||||
} | } | ||||
/// <summary> | |||||
/// 终端通用应答 | |||||
/// 平台无需回复 | |||||
/// 实现自己的业务 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0001(JT808HeaderPackage request) | |||||
{ | |||||
return null; | |||||
} | |||||
/// <summary> | |||||
/// 终端心跳 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public virtual byte[] Msg0x0002(JT808HeaderPackage request) | |||||
{ | |||||
return CommonReply(request); | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// 终端注销 | /// 终端注销 | ||||
/// </summary> | /// </summary> | ||||
@@ -96,24 +100,7 @@ namespace JT808.Gateway.Abstractions | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public virtual byte[] Msg0x0003(JT808HeaderPackage request) | public virtual byte[] Msg0x0003(JT808HeaderPackage request) | ||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
else | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
return CommonReply(request); | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 终端注册 | /// 终端注册 | ||||
@@ -148,24 +135,7 @@ namespace JT808.Gateway.Abstractions | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public virtual byte[] Msg0x0102(JT808HeaderPackage request) | public virtual byte[] Msg0x0102(JT808HeaderPackage request) | ||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
else | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
return CommonReply(request); | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 位置信息汇报 | /// 位置信息汇报 | ||||
@@ -174,24 +144,7 @@ namespace JT808.Gateway.Abstractions | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public virtual byte[] Msg0x0200(JT808HeaderPackage request) | public virtual byte[] Msg0x0200(JT808HeaderPackage request) | ||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
else | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
return CommonReply(request); | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 定位数据批量上传 | /// 定位数据批量上传 | ||||
@@ -200,24 +153,7 @@ namespace JT808.Gateway.Abstractions | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public virtual byte[] Msg0x0704(JT808HeaderPackage request) | public virtual byte[] Msg0x0704(JT808HeaderPackage request) | ||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
else | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
return CommonReply(request); | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 数据上行透传 | /// 数据上行透传 | ||||
@@ -226,24 +162,7 @@ namespace JT808.Gateway.Abstractions | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public virtual byte[] Msg0x0900(JT808HeaderPackage request) | public virtual byte[] Msg0x0900(JT808HeaderPackage request) | ||||
{ | { | ||||
if (request.Version == JT808Version.JTT2019) | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
else | |||||
{ | |||||
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() | |||||
{ | |||||
AckMsgId = request.Header.MsgId, | |||||
JT808PlatformResult = JT808PlatformResult.成功, | |||||
MsgNum = request.Header.MsgNum | |||||
})); | |||||
} | |||||
return CommonReply(request); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -20,7 +20,7 @@ | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\JT808.Gateway.Client\JT808.Gateway.Client.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Client\JT808.Gateway.Client.csproj" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | <None Include="..\..\LICENSE" Pack="true" PackagePath="" /> |
@@ -0,0 +1,35 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp3.1</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.9.10" /> | |||||
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\JT808.Gateway\JT808.Gateway.csproj" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<None Update="appsettings.json"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\nlog.Unix.config"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\nlog.Win32NT.config"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\NLog.xsd"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,43 @@ | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.Configuration; | |||||
using NLog.Extensions.Logging; | |||||
namespace JT808.Gateway.ServerBenchmark | |||||
{ | |||||
class Program | |||||
{ | |||||
static async Task Main(string[] args) | |||||
{ | |||||
var serverHostBuilder = new HostBuilder() | |||||
.ConfigureAppConfiguration((hostingContext, config) => | |||||
{ | |||||
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory) | |||||
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) | |||||
.AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true); | |||||
}) | |||||
.ConfigureLogging((context, logging) => | |||||
{ | |||||
Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}"); | |||||
NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config"); | |||||
logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true }); | |||||
logging.SetMinimumLevel(LogLevel.Trace); | |||||
}) | |||||
.ConfigureServices((hostContext, services) => | |||||
{ | |||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
services.AddJT808Configure() | |||||
.AddNormalGateway(hostContext.Configuration) | |||||
.AddTcp() | |||||
.AddUdp(); | |||||
}); | |||||
await serverHostBuilder.RunConsoleAsync(); | |||||
} | |||||
} | |||||
} |
@@ -1,4 +1,4 @@ | |||||
pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log" | pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log" | ||||
pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" | |||||
pm2 start "dotnet JT808.Gateway.ServerBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgIdHandlerConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgIdHandlerService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgIdHandlerConsumer( | |||||
JT808MsgIdHandlerService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgIdHandlerConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgLoggingConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgLoggingService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgLoggingConsumer( | |||||
JT808MsgLoggingService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgLoggingConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgReplyMessageConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgReplyMessageService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgReplyMessageConsumer( | |||||
JT808MsgReplyMessageService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgReplyMessageConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgReplyMessageLoggingConsumer : IJT808MsgReplyConsumer | |||||
{ | |||||
private readonly JT808MsgReplyMessageLoggingService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgReplyMessageLoggingConsumer( | |||||
JT808MsgReplyMessageLoggingService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgReplyMessageLoggingConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgTrafficConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgTrafficService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgTrafficConsumer( | |||||
JT808MsgTrafficService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgTrafficConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,60 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgTransmitConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgTransmitService JT808MsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgTransmitConsumer( | |||||
JT808MsgTransmitService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
logger = loggerFactory.CreateLogger("JT808MsgTransmitConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,34 +0,0 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<Import Project="..\Version.props" /> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.1</TargetFramework> | |||||
<LangVersion>8.0</LangVersion> | |||||
<Copyright>Copyright 2019.</Copyright> | |||||
<Authors>SmallChi(Koike)</Authors> | |||||
<RepositoryUrl>https://github.com/SmallChi/JT808Gateway</RepositoryUrl> | |||||
<PackageProjectUrl>https://github.com/SmallChi/JT808Gateway</PackageProjectUrl> | |||||
<licenseUrl>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</licenseUrl> | |||||
<license>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</license> | |||||
<GeneratePackageOnBuild>false</GeneratePackageOnBuild> | |||||
<SignAssembly>false</SignAssembly> | |||||
<PackageLicenseFile>LICENSE</PackageLicenseFile> | |||||
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance> | |||||
<PackageId>JT808.Gateway.InMemoryMQ</PackageId> | |||||
<Product>JT808.Gateway.InMemoryMQ</Product> | |||||
<Description>基于InMemory的JT808消息发布与订阅</Description> | |||||
<PackageReleaseNotes>基于InMemory的JT808消息发布与订阅</PackageReleaseNotes> | |||||
<Version>$(JT808GatewayPackageVersion)</Version> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="System.Threading.Channels" Version="4.7.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -1,72 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgConsumer : IJT808MsgConsumer | |||||
{ | |||||
private readonly JT808MsgService JT808MsgService; | |||||
private readonly Func<JT808ConsumerType, JT808MsgServiceBase> func; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgConsumer( | |||||
Func<JT808ConsumerType, JT808MsgServiceBase> func, | |||||
JT808MsgService jT808MsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
this.func = func; | |||||
logger = loggerFactory.CreateLogger("JT808MsgConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item = await JT808MsgService.ReadAsync(Cts.Token); | |||||
foreach(var type in JT808ServerInMemoryMQExtensions.ConsumerTypes) | |||||
{ | |||||
var method = func(type); | |||||
if (method != null) | |||||
{ | |||||
await method.WriteAsync(item.TerminalNo, item.Data); | |||||
} | |||||
} | |||||
//callback(item); | |||||
} | |||||
catch(Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,23 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgConsumerFactory : IJT808MsgConsumerFactory | |||||
{ | |||||
private readonly Func<JT808ConsumerType, IJT808MsgConsumer> factory; | |||||
public JT808MsgConsumerFactory(Func<JT808ConsumerType, IJT808MsgConsumer> accesor) | |||||
{ | |||||
factory = accesor; | |||||
} | |||||
public IJT808MsgConsumer Create(JT808ConsumerType consumerType) | |||||
{ | |||||
return factory(consumerType); | |||||
} | |||||
} | |||||
} |
@@ -1,32 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgConsumerInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
public JT808MsgConsumerInMemoryHostedService( | |||||
IJT808MsgConsumer jT808MsgConsumer) | |||||
{ | |||||
this.jT808MsgConsumer = jT808MsgConsumer; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage(null); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -1,27 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgProducer : IJT808MsgProducer | |||||
{ | |||||
private readonly JT808MsgService JT808MsgService; | |||||
public string TopicName => JT808GatewayConstants.MsgTopic; | |||||
public JT808MsgProducer(JT808MsgService jT808MsgService) | |||||
{ | |||||
JT808MsgService = jT808MsgService; | |||||
} | |||||
public async ValueTask ProduceAsync(string terminalNo, byte[] data) | |||||
{ | |||||
await JT808MsgService.WriteAsync(terminalNo, data); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
} | |||||
} | |||||
} |
@@ -1,63 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer | |||||
{ | |||||
private readonly JT808ReplyMsgService JT808ReplyMsgService; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly ILogger logger; | |||||
public string TopicName => JT808GatewayConstants.MsgReplyTopic; | |||||
public JT808MsgReplyConsumer( | |||||
JT808ReplyMsgService jT808ReplyMsgService, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); | |||||
JT808ReplyMsgService = jT808ReplyMsgService; | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(async() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
var item= await JT808ReplyMsgService.ReadAsync(Cts.Token); | |||||
callback(item); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, ""); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
Cts.Cancel(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Cts.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -1,23 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgReplyConsumerFactory : IJT808MsgReplyConsumerFactory | |||||
{ | |||||
private readonly Func<JT808ConsumerType, IJT808MsgReplyConsumer> factory; | |||||
public JT808MsgReplyConsumerFactory(Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor) | |||||
{ | |||||
factory = accesor; | |||||
} | |||||
public IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType) | |||||
{ | |||||
return factory(consumerType); | |||||
} | |||||
} | |||||
} |
@@ -1,43 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public class JT808MsgReplyProducer : IJT808MsgReplyProducer | |||||
{ | |||||
public string TopicName => JT808GatewayConstants.MsgReplyTopic; | |||||
//JT808ServerInMemoryMQExtensions | |||||
private readonly JT808ReplyMsgService JT808ReplyMsgService; | |||||
private readonly Func<JT808ConsumerType, JT808MsgServiceBase> func; | |||||
public JT808MsgReplyProducer( | |||||
Func<JT808ConsumerType, JT808MsgServiceBase> func, | |||||
JT808ReplyMsgService jT808ReplyMsgService) | |||||
{ | |||||
this.func = func; | |||||
JT808ReplyMsgService = jT808ReplyMsgService; | |||||
} | |||||
public async ValueTask ProduceAsync(string terminalNo, byte[] data) | |||||
{ | |||||
await JT808ReplyMsgService.WriteAsync(terminalNo, data); | |||||
if (JT808ServerInMemoryMQExtensions.ReplyMessageLoggingConsumer.HasValue) | |||||
{ | |||||
var method = func(JT808ConsumerType.ReplyMessageLoggingConsumer); | |||||
if (method != null) | |||||
{ | |||||
await method.WriteAsync(terminalNo, data); | |||||
} | |||||
} | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
} | |||||
} | |||||
} |
@@ -1,363 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Runtime.CompilerServices; | |||||
[assembly: InternalsVisibleTo("JT808.Gateway.InMemoryMQ.Test")] | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
{ | |||||
public static class JT808ServerInMemoryMQExtensions | |||||
{ | |||||
internal static List<JT808ConsumerType> ConsumerTypes { get; private set; } | |||||
static JT808ServerInMemoryMQExtensions() | |||||
{ | |||||
ConsumerTypes = new List<JT808ConsumerType>(); | |||||
} | |||||
internal static JT808ConsumerType? ReplyMessageLoggingConsumer { get; private set; } | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder, JT808ConsumerType consumerType) | |||||
{ | |||||
if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageLoggingConsumer); | |||||
} | |||||
else | |||||
{ | |||||
if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer) | |||||
{ | |||||
// | |||||
ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer; | |||||
} | |||||
} | |||||
jT808GatewayBuilder.AddServerInMemoryConsumers(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMsgService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgConsumerInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder,params JT808ConsumerType[] consumerTypes) | |||||
{ | |||||
if (consumerTypes == null) | |||||
{ | |||||
throw new ArgumentNullException("消费类型不为空!"); | |||||
} | |||||
ConsumerTypes = consumerTypes.ToList(); | |||||
jT808GatewayBuilder.AddServerInMemoryConsumers(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMsgService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgConsumerInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <returns></returns> | |||||
internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, JT808ConsumerType consumerType) | |||||
{ | |||||
if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); | |||||
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); | |||||
} | |||||
else | |||||
{ | |||||
if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer) | |||||
{ | |||||
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer); | |||||
} | |||||
if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer) | |||||
{ | |||||
ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer; | |||||
} | |||||
} | |||||
serviceDescriptors.AddServerInMemoryConsumers(); | |||||
serviceDescriptors.AddSingleton<JT808MsgService>(); | |||||
serviceDescriptors.AddSingleton<JT808ReplyMsgService>(); | |||||
serviceDescriptors.AddSingleton<JT808SessionService>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgProducer, JT808MsgProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808SessionProducer, JT808SessionProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>(); | |||||
serviceDescriptors.AddHostedService<JT808MsgConsumerInMemoryHostedService>(); | |||||
return serviceDescriptors; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <returns></returns> | |||||
internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, params JT808ConsumerType[] consumerTypes) | |||||
{ | |||||
if (consumerTypes == null) | |||||
{ | |||||
throw new ArgumentNullException("消费类型不为空!"); | |||||
} | |||||
ConsumerTypes = consumerTypes.ToList(); | |||||
serviceDescriptors.AddServerInMemoryConsumers(); | |||||
serviceDescriptors.AddSingleton<JT808MsgService>(); | |||||
serviceDescriptors.AddSingleton<JT808ReplyMsgService>(); | |||||
serviceDescriptors.AddSingleton<JT808SessionService>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgProducer, JT808MsgProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808SessionProducer, JT808SessionProducer>(); | |||||
serviceDescriptors.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>(); | |||||
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>(); | |||||
serviceDescriptors.AddHostedService<JT808MsgConsumerInMemoryHostedService>(); | |||||
return serviceDescriptors; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
private static IJT808GatewayBuilder AddServerInMemoryConsumers(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgIdHandlerService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgLoggingService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageLoggingService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTrafficService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTransmitService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgIdHandlerConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgLoggingConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTrafficConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTransmitConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageLoggingConsumer>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, IJT808MsgConsumer> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.MsgIdHandlerConsumer: | |||||
return factory.GetRequiredService<JT808MsgIdHandlerConsumer>(); | |||||
case JT808ConsumerType.MsgLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgLoggingConsumer>(); | |||||
case JT808ConsumerType.TrafficConsumer: | |||||
return factory.GetRequiredService<JT808MsgTrafficConsumer>(); | |||||
case JT808ConsumerType.TransmitConsumer: | |||||
return factory.GetRequiredService<JT808MsgTransmitConsumer>(); | |||||
case JT808ConsumerType.ReplyMessageConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageConsumer>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.ReplyMessageLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageLoggingConsumer>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, JT808MsgServiceBase> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.MsgIdHandlerConsumer: | |||||
return factory.GetRequiredService<JT808MsgIdHandlerService>(); | |||||
case JT808ConsumerType.MsgLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgLoggingService>(); | |||||
case JT808ConsumerType.TrafficConsumer: | |||||
return factory.GetRequiredService<JT808MsgTrafficService>(); | |||||
case JT808ConsumerType.TransmitConsumer: | |||||
return factory.GetRequiredService<JT808MsgTransmitService>(); | |||||
case JT808ConsumerType.ReplyMessageConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageService>(); | |||||
case JT808ConsumerType.ReplyMessageLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageLoggingService>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
private static IServiceCollection AddServerInMemoryConsumers(this IServiceCollection serviceDescriptors) | |||||
{ | |||||
serviceDescriptors.AddSingleton<JT808MsgIdHandlerService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgLoggingService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgReplyMessageService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgReplyMessageLoggingService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgTrafficService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgTransmitService>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgIdHandlerConsumer>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgLoggingConsumer>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgReplyMessageConsumer>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgTrafficConsumer>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgTransmitConsumer>(); | |||||
serviceDescriptors.AddSingleton<JT808MsgReplyMessageLoggingConsumer>(); | |||||
serviceDescriptors.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, IJT808MsgConsumer> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.MsgIdHandlerConsumer: | |||||
return factory.GetRequiredService<JT808MsgIdHandlerConsumer>(); | |||||
case JT808ConsumerType.MsgLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgLoggingConsumer>(); | |||||
case JT808ConsumerType.TrafficConsumer: | |||||
return factory.GetRequiredService<JT808MsgTrafficConsumer>(); | |||||
case JT808ConsumerType.TransmitConsumer: | |||||
return factory.GetRequiredService<JT808MsgTransmitConsumer>(); | |||||
case JT808ConsumerType.ReplyMessageConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageConsumer>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
serviceDescriptors.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.ReplyMessageLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageLoggingConsumer>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
serviceDescriptors.AddSingleton((factory) => | |||||
{ | |||||
Func<JT808ConsumerType, JT808MsgServiceBase> accesor = type => | |||||
{ | |||||
switch (type) | |||||
{ | |||||
case JT808ConsumerType.MsgIdHandlerConsumer: | |||||
return factory.GetRequiredService<JT808MsgIdHandlerService>(); | |||||
case JT808ConsumerType.MsgLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgLoggingService>(); | |||||
case JT808ConsumerType.TrafficConsumer: | |||||
return factory.GetRequiredService<JT808MsgTrafficService>(); | |||||
case JT808ConsumerType.TransmitConsumer: | |||||
return factory.GetRequiredService<JT808MsgTransmitService>(); | |||||
case JT808ConsumerType.ReplyMessageConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageService>(); | |||||
case JT808ConsumerType.ReplyMessageLoggingConsumer: | |||||
return factory.GetRequiredService<JT808MsgReplyMessageLoggingService>(); | |||||
default: | |||||
return default; | |||||
} | |||||
}; | |||||
return accesor; | |||||
}); | |||||
return serviceDescriptors; | |||||
} | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgIdHandlerService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgLoggingService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgReplyMessageLoggingService : JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgReplyMessageService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,29 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgServiceBase | |||||
{ | |||||
private readonly Channel<(string TerminalNo, byte[] Data)> _channel; | |||||
public JT808MsgServiceBase() | |||||
{ | |||||
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>(); | |||||
} | |||||
public async ValueTask WriteAsync(string terminalNo, byte[] data) | |||||
{ | |||||
await _channel.Writer.WriteAsync((terminalNo, data)); | |||||
} | |||||
public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return await _channel.Reader.ReadAsync(cancellationToken); | |||||
} | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgTrafficService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808MsgTransmitService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Channels; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
{ | |||||
public class JT808ReplyMsgService: JT808MsgServiceBase | |||||
{ | |||||
} | |||||
} |
@@ -14,7 +14,7 @@ namespace JT808.Gateway.Kafka | |||||
/// <param name="jT808GatewayBuilder"></param> | /// <param name="jT808GatewayBuilder"></param> | ||||
/// <param name="configuration">GetSection("JT808MsgProducerConfig")</param> | /// <param name="configuration">GetSection("JT808MsgProducerConfig")</param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
public static IJT808QueueGatewayBuilder AddServerKafkaMsgProducer(this IJT808QueueGatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgProducerConfig>(configuration.GetSection("JT808MsgProducerConfig")); | jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgProducerConfig>(configuration.GetSection("JT808MsgProducerConfig")); | ||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); | jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); | ||||
@@ -26,7 +26,7 @@ namespace JT808.Gateway.Kafka | |||||
/// <param name="jT808GatewayBuilder"></param> | /// <param name="jT808GatewayBuilder"></param> | ||||
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param> | /// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
public static IJT808QueueGatewayBuilder AddServerKafkaMsgReplyConsumer(this IJT808QueueGatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig")); | jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig")); | ||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); | jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); | ||||
@@ -38,7 +38,7 @@ namespace JT808.Gateway.Kafka | |||||
/// <param name="jT808GatewayBuilder"></param> | /// <param name="jT808GatewayBuilder"></param> | ||||
/// <param name="configuration">GetSection("JT808SessionProducerConfig")</param> | /// <param name="configuration">GetSection("JT808SessionProducerConfig")</param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
public static IJT808QueueGatewayBuilder AddServerKafkaSessionProducer(this IJT808QueueGatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808SessionProducerConfig>(configuration.GetSection("JT808SessionProducerConfig")); | jT808GatewayBuilder.JT808Builder.Services.Configure<JT808SessionProducerConfig>(configuration.GetSection("JT808SessionProducerConfig")); | ||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); | jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); | ||||
@@ -15,13 +15,5 @@ namespace JT808.Gateway.MsgIdHandler | |||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerHostedService>(); | ||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
public static IJT808GatewayBuilder AddInMemoryMsgIdHandler<TJT808MsgIdHandler>(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
where TJT808MsgIdHandler : IJT808MsgIdHandler | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler), typeof(TJT808MsgIdHandler)); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
} | } | ||||
} | } |
@@ -1,35 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.MsgIdHandler | |||||
{ | |||||
public class JT808MsgIdHandlerInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
private readonly IJT808MsgIdHandler jT808MsgIdHandler; | |||||
public JT808MsgIdHandlerInMemoryHostedService( | |||||
IJT808MsgIdHandler jT808MsgIdHandler, | |||||
IJT808MsgConsumerFactory jT808MsgConsumerFactory) | |||||
{ | |||||
this.jT808MsgIdHandler = jT808MsgIdHandler; | |||||
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage(jT808MsgIdHandler.Processor); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -1,37 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.MsgLogging | |||||
{ | |||||
public class JT808MsgDownLoggingInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgReplyConsumer jT808MsgReplyConsumer; | |||||
private readonly IJT808MsgLogging jT808MsgLogging; | |||||
public JT808MsgDownLoggingInMemoryHostedService( | |||||
IJT808MsgLogging jT808MsgLogging, | |||||
IJT808MsgReplyConsumerFactory jT808MsgReplyConsumerFactory) | |||||
{ | |||||
this.jT808MsgReplyConsumer = jT808MsgReplyConsumerFactory.Create(JT808ConsumerType.ReplyMessageLoggingConsumer); | |||||
this.jT808MsgLogging = jT808MsgLogging; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgReplyConsumer.Subscribe(); | |||||
jT808MsgReplyConsumer.OnMessage(item=> | |||||
{ | |||||
jT808MsgLogging.Processor(item, JT808MsgLoggingType.down); | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgReplyConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -17,13 +17,11 @@ namespace JT808.Gateway.MsgLogging | |||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
public static IJT808GatewayBuilder AddInMemoryMsgLogging<TJT808MsgLogging>(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
public static IJT808NormalGatewayBuilder AddMsgLogging<TJT808MsgLogging>(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder) | |||||
where TJT808MsgLogging : IJT808MsgLogging | where TJT808MsgLogging : IJT808MsgLogging | ||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging), typeof(TJT808MsgLogging)); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgDownLoggingInMemoryHostedService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgUpLoggingInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging), typeof(TJT808MsgLogging)); | |||||
return jT808NormalGatewayBuilder; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,37 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.MsgLogging | |||||
{ | |||||
public class JT808MsgUpLoggingInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
private readonly IJT808MsgLogging jT808MsgLogging; | |||||
public JT808MsgUpLoggingInMemoryHostedService( | |||||
IJT808MsgLogging jT808MsgLogging, | |||||
IJT808MsgConsumerFactory jT808MsgConsumerFactory) | |||||
{ | |||||
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer); | |||||
this.jT808MsgLogging = jT808MsgLogging; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage(item=> | |||||
{ | |||||
jT808MsgLogging.Processor(item, JT808MsgLoggingType.up); | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -14,9 +14,9 @@ namespace JT808.Gateway.ReplyMessage | |||||
/// </summary> | /// </summary> | ||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) | |||||
public static IJT808ClientBuilder AddReplyMessage(this IJT808ClientBuilder jT808ClientBuilder) | |||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>(); | |||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808QueueReplyMessageHandler>(); | |||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>(); | ||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
@@ -26,36 +26,12 @@ namespace JT808.Gateway.ReplyMessage | |||||
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam> | /// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam> | ||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugReplyMessage<TReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
where TReplyMessageHandler : JT808ReplyMessageHandler | |||||
public static IJT808ClientBuilder AddReplyMessage<TReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
where TReplyMessageHandler : JT808QueueReplyMessageHandler | |||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>(); | |||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808QueueReplyMessageHandler, TReplyMessageHandler>(); | |||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>(); | ||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
/// <summary> | |||||
/// 消息应答服务(消费者单实例) | |||||
/// </summary> | |||||
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddInMemoryReplyMessage<TReplyMessageHandler>(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
where TReplyMessageHandler : JT808ReplyMessageHandler | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// 消息应答服务(消费者单实例) | |||||
/// </summary> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddInMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
} | } | ||||
} | } |
@@ -8,10 +8,10 @@ namespace JT808.Gateway.ReplyMessage | |||||
public class JT808ReplyMessageHostedService : IHostedService | public class JT808ReplyMessageHostedService : IHostedService | ||||
{ | { | ||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | private readonly IJT808MsgConsumer jT808MsgConsumer; | ||||
private readonly JT808ReplyMessageHandler jT808ReplyMessageHandler; | |||||
private readonly JT808QueueReplyMessageHandler jT808ReplyMessageHandler; | |||||
public JT808ReplyMessageHostedService( | public JT808ReplyMessageHostedService( | ||||
JT808ReplyMessageHandler jT808ReplyMessageHandler, | |||||
JT808QueueReplyMessageHandler jT808ReplyMessageHandler, | |||||
IJT808MsgConsumer jT808MsgConsumer) | IJT808MsgConsumer jT808MsgConsumer) | ||||
{ | { | ||||
this.jT808MsgConsumer = jT808MsgConsumer; | this.jT808MsgConsumer = jT808MsgConsumer; | ||||
@@ -1,35 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.ReplyMessage | |||||
{ | |||||
public class JT808ReplyMessageInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
private readonly JT808ReplyMessageHandler jT808ReplyMessageHandler; | |||||
public JT808ReplyMessageInMemoryHostedService( | |||||
JT808ReplyMessageHandler jT808ReplyMessageHandler, | |||||
IJT808MsgConsumerFactory jT808MsgConsumerFactory) | |||||
{ | |||||
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); | |||||
this.jT808ReplyMessageHandler = jT808ReplyMessageHandler; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage(jT808ReplyMessageHandler.Processor); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -14,7 +14,7 @@ namespace JT808.Gateway.SessionNotice | |||||
/// </summary> | /// </summary> | ||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) | |||||
public static IJT808ClientBuilder AddSessionNotice(this IJT808ClientBuilder jT808ClientBuilder) | |||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>(); | jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>(); | ||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | ||||
@@ -27,7 +27,7 @@ namespace JT808.Gateway.SessionNotice | |||||
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam> | /// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam> | ||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugSessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
public static IJT808ClientBuilder AddSessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
where TSessionNoticeService : JT808SessionNoticeService | where TSessionNoticeService : JT808SessionNoticeService | ||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService,TSessionNoticeService>(); | jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService,TSessionNoticeService>(); | ||||
@@ -36,29 +36,29 @@ namespace JT808.Gateway.SessionNotice | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 消息会话通知服务(消费者单实例) | |||||
/// 会话通知服务(不同的消费者实例) | |||||
/// </summary> | /// </summary> | ||||
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <param name="jT808NormalGatewayBuilder"></param> | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddInMemorySessionNotice<TSessionNoticeService>(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
where TSessionNoticeService : JT808SessionNoticeService | |||||
public static IJT808NormalGatewayBuilder AddSessionNotice(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService, TSessionNoticeService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>(); | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | |||||
return jT808NormalGatewayBuilder; | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 消息会话通知服务(消费者单实例) | |||||
/// 消息会话通知服务(不同的消费者实例) | |||||
/// </summary> | /// </summary> | ||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam> | |||||
/// <param name="jT808NormalGatewayBuilder"></param> | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddInMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
public static IJT808NormalGatewayBuilder AddSessionNotice<TSessionNoticeService>(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder) | |||||
where TSessionNoticeService : JT808SessionNoticeService | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService, TSessionNoticeService>(); | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>(); | |||||
return jT808NormalGatewayBuilder; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -13,7 +13,7 @@ namespace JT808.Gateway.Traffic | |||||
List<(string,long)> GetAll(); | List<(string,long)> GetAll(); | ||||
} | } | ||||
public class JT808TrafficDefault : IJT808Traffic | |||||
class JT808TrafficDefault : IJT808Traffic | |||||
{ | { | ||||
private ConcurrentDictionary<string, long> dict = new ConcurrentDictionary<string, long>(); | private ConcurrentDictionary<string, long> dict = new ConcurrentDictionary<string, long>(); | ||||
@@ -13,36 +13,48 @@ namespace JT808.Gateway.Traffic | |||||
/// </summary> | /// </summary> | ||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugTraffic<TIJT808Traffic>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
public static IJT808ClientBuilder AddTraffic<TIJT808Traffic>(this IJT808ClientBuilder jT808ClientBuilder) | |||||
where TIJT808Traffic:IJT808Traffic | where TIJT808Traffic:IJT808Traffic | ||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); | jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); | ||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>(); | ||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 消息流量统计服务(消费者单实例) | |||||
/// 消息流量统计服务(不同的消费者实例) | |||||
/// </summary> | /// </summary> | ||||
/// <typeparam name="TReplyMessageService"></typeparam> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <param name="jT808ClientBuilder"></param> | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddInMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
public static IJT808ClientBuilder AddTraffic(this IJT808ClientBuilder jT808ClientBuilder) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(JT808TrafficDefault)); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(JT808TrafficDefault)); | |||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>(); | |||||
return jT808ClientBuilder; | |||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 消息流量统计服务(消费者单实例) | |||||
/// 消息流量统计服务(不同的消费者实例) | |||||
/// </summary> | |||||
/// <param name="jT808NormalGatewayBuilder"></param> | |||||
/// <returns></returns> | |||||
public static IJT808NormalGatewayBuilder AddTraffic<TIJT808Traffic>(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder) | |||||
where TIJT808Traffic : IJT808Traffic | |||||
{ | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); | |||||
return jT808NormalGatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// 消息流量统计服务(不同的消费者实例) | |||||
/// </summary> | /// </summary> | ||||
/// <typeparam name="TReplyMessageService"></typeparam> | |||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <param name="jT808NormalGatewayBuilder"></param> | |||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddInMemoryTraffic<TIJT808Traffic>(this IJT808GatewayBuilder jT808GatewayBuilder) | |||||
public static IJT808NormalGatewayBuilder AddTraffic(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic)); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(JT808TrafficDefault)); | |||||
return jT808NormalGatewayBuilder; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,40 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Protocol.Extensions; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using System; | |||||
namespace JT808.Gateway.Traffic | |||||
{ | |||||
public class JT808TrafficServiceInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
private readonly IJT808Traffic jT808Traffic; | |||||
public JT808TrafficServiceInMemoryHostedService( | |||||
IJT808Traffic jT808Traffic, | |||||
IJT808MsgConsumerFactory jT808MsgConsumerFactory) | |||||
{ | |||||
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TrafficConsumer); | |||||
this.jT808Traffic = jT808Traffic; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage((item)=> { | |||||
//string str = item.Data.ToHexString(); | |||||
jT808Traffic.Increment(item.TerminalNo, DateTime.Now.ToString("yyyyMMdd"), item.Data.Length); | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -17,25 +17,25 @@ namespace JT808.Gateway.Transmit | |||||
/// <param name="jT808ClientBuilder"></param> | /// <param name="jT808ClientBuilder"></param> | ||||
/// <param name="configuration"></param> | /// <param name="configuration"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808ClientBuilder AddInPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration) | |||||
public static IJT808ClientBuilder AddTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration) | |||||
{ | { | ||||
jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions")); | jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions")); | ||||
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>(); | jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>(); | ||||
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TransmitHostedService>(); | jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TransmitHostedService>(); | ||||
return jT808ClientBuilder; | return jT808ClientBuilder; | ||||
} | } | ||||
/// <summary> | /// <summary> | ||||
/// 转发服务(消费者单实例) | |||||
/// 转发服务(不同的消费者实例) | |||||
/// </summary> | /// </summary> | ||||
/// <param name="jT808GatewayBuilder"></param> | |||||
/// <param name="jT808NormalGatewayBuilder"></param> | |||||
/// <param name="configuration"></param> | /// <param name="configuration"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IJT808GatewayBuilder AddInMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
public static IJT808NormalGatewayBuilder AddTransmit(this IJT808NormalGatewayBuilder jT808NormalGatewayBuilder, IConfiguration configuration) | |||||
{ | { | ||||
jT808GatewayBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions")); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>(); | |||||
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TransmitInMemoryHostedService>(); | |||||
return jT808GatewayBuilder; | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions")); | |||||
jT808NormalGatewayBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>(); | |||||
return jT808NormalGatewayBuilder; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -1,34 +0,0 @@ | |||||
using System.Threading.Tasks; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System.Threading; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
namespace JT808.Gateway.Transmit | |||||
{ | |||||
public class JT808TransmitInMemoryHostedService : IHostedService | |||||
{ | |||||
private readonly JT808TransmitService jT808TransmitService; | |||||
private readonly IJT808MsgConsumer jT808MsgConsumer; | |||||
public JT808TransmitInMemoryHostedService( | |||||
IJT808MsgConsumerFactory jT808MsgConsumerFactory, | |||||
JT808TransmitService jT808TransmitService) | |||||
{ | |||||
this.jT808TransmitService = jT808TransmitService; | |||||
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TransmitConsumer); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Subscribe(); | |||||
jT808MsgConsumer.OnMessage(jT808TransmitService.Send); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808MsgConsumer.Unsubscribe(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -1,23 +0,0 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netcoreapp3.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" /> | |||||
<PackageReference Include="xunit" Version="2.4.0" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" /> | |||||
<PackageReference Include="coverlet.collector" Version="1.0.1" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway\JT808.Gateway.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -1,124 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using JT808.Gateway.Internal; | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using Xunit; | |||||
namespace JT808.Gateway.InMemoryMQ.Test | |||||
{ | |||||
public class JT808MsgProducerTest | |||||
{ | |||||
[Fact] | |||||
public void Test1() | |||||
{ | |||||
IServiceCollection serviceDescriptors = new ServiceCollection(); | |||||
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer | JT808ConsumerType.ReplyMessageConsumer); | |||||
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); | |||||
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>(); | |||||
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); | |||||
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>(); | |||||
consumer.OnMessage((item) => { | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>(); | |||||
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
msgIdHandlerConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); | |||||
replyMessageConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void Test2() | |||||
{ | |||||
IServiceCollection serviceDescriptors = new ServiceCollection(); | |||||
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer); | |||||
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); | |||||
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>(); | |||||
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); | |||||
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>(); | |||||
consumer.OnMessage((item) => { | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>(); | |||||
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
msgIdHandlerConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); | |||||
replyMessageConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void Test3() | |||||
{ | |||||
IServiceCollection serviceDescriptors = new ServiceCollection(); | |||||
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.All); | |||||
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider(); | |||||
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>(); | |||||
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 }); | |||||
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>(); | |||||
consumer.OnMessage((item) => { | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>(); | |||||
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer); | |||||
msgIdHandlerConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer); | |||||
replyMessageConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var msgLoggingConsumer = consumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer); | |||||
msgLoggingConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var trafficConsumer = consumerFactory.Create(JT808ConsumerType.TrafficConsumer); | |||||
trafficConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
var transmitConsumer = consumerFactory.Create(JT808ConsumerType.TransmitConsumer); | |||||
transmitConsumer.OnMessage((item) => | |||||
{ | |||||
Assert.Equal("123", item.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data); | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -1,22 +0,0 @@ | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using System; | |||||
using System.Collections.Concurrent; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Xunit; | |||||
namespace JT808.Gateway.InMemoryMQ.Test.Services | |||||
{ | |||||
public class JT808MsgServiceTest | |||||
{ | |||||
[Fact] | |||||
public void Test1() | |||||
{ | |||||
JT808MsgService jT808MsgService = new JT808MsgService(); | |||||
jT808MsgService.WriteAsync("132", new byte[] { 1, 2, 3 }).GetAwaiter().GetResult(); | |||||
var result = jT808MsgService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); | |||||
Assert.Equal("132", result.TerminalNo); | |||||
Assert.Equal(new byte[] { 1, 2, 3 }, result.Data); | |||||
} | |||||
} | |||||
} |
@@ -1,32 +0,0 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using Xunit; | |||||
namespace JT808.Gateway.InMemoryMQ.Test.Services | |||||
{ | |||||
public class JT808SessionServiceTest | |||||
{ | |||||
[Fact] | |||||
public void Test1() | |||||
{ | |||||
JT808SessionService jT808SessionService = new JT808SessionService(); | |||||
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456").GetAwaiter().GetResult(); | |||||
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOffline, "123457").GetAwaiter().GetResult(); | |||||
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456,123457").GetAwaiter().GetResult(); | |||||
var result1 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); | |||||
var result2 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); | |||||
var result3 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult(); | |||||
Assert.Equal(JT808GatewayConstants.SessionOnline, result1.Notice); | |||||
Assert.Equal("123456", result1.TerminalNo); | |||||
Assert.Equal(JT808GatewayConstants.SessionOffline, result2.Notice); | |||||
Assert.Equal("123457", result2.TerminalNo); | |||||
//转发 | |||||
Assert.Equal(JT808GatewayConstants.SessionOnline, result3.Notice); | |||||
Assert.Equal("123456,123457", result3.TerminalNo); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,36 @@ | |||||
<?xml version="1.0" encoding="utf-8" ?> | |||||
<!-- | |||||
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html | |||||
autoReload:自动再配置 | |||||
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。 | |||||
<nlog throwExceptions="true" /> | |||||
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。 | |||||
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。 | |||||
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。 | |||||
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。 | |||||
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。 | |||||
--> | |||||
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd" | |||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |||||
autoReload="true" | |||||
internalLogFile="/data/serviceslogs/JT808.Gateway/internalLog.txt" | |||||
internalLogLevel="Debug" > | |||||
<variable name="Directory" value="/data/serviceslogs/JT808.Gateway"/> | |||||
<targets> | |||||
<target name="Gateway" xsi:type="File" | |||||
fileName="${Directory}/Gateway.${shortdate}.log" | |||||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||||
<target name="console" xsi:type="ColoredConsole" | |||||
useDefaultRowHighlightingRules="false" | |||||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"> | |||||
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" /> | |||||
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" /> | |||||
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" /> | |||||
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" /> | |||||
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" /> | |||||
</target> | |||||
</targets> | |||||
<rules> | |||||
<logger name="*" minlevel="Info" maxlevel="Fatal" writeTo="Gateway"/> | |||||
</rules> | |||||
</nlog> |
@@ -0,0 +1,36 @@ | |||||
<?xml version="1.0" encoding="utf-8" ?> | |||||
<!-- | |||||
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html | |||||
autoReload:自动再配置 | |||||
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。 | |||||
<nlog throwExceptions="true" /> | |||||
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。 | |||||
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。 | |||||
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。 | |||||
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。 | |||||
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。 | |||||
--> | |||||
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd" | |||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |||||
autoReload="true" | |||||
internalLogFile="wwwroot/logs/JT808.Gateway/internalLog.txt" | |||||
internalLogLevel="Debug" > | |||||
<variable name="Directory" value="/data/logs/JT808.Gateway"/> | |||||
<targets> | |||||
<target name="Gateway" xsi:type="File" | |||||
fileName="${Directory}/Gateway.${shortdate}.log" | |||||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||||
<target name="console" xsi:type="ColoredConsole" | |||||
useDefaultRowHighlightingRules="false" | |||||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"> | |||||
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" /> | |||||
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" /> | |||||
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" /> | |||||
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" /> | |||||
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" /> | |||||
</target> | |||||
</targets> | |||||
<rules> | |||||
<logger name="*" minlevel="Trace" maxlevel="Fatal" writeTo="Gateway,console"/> | |||||
</rules> | |||||
</nlog> |
@@ -5,7 +5,7 @@ using System; | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
namespace JT808.Gateway.TestHosting.Impl | |||||
namespace JT808.Gateway.NormalHosting.Impl | |||||
{ | { | ||||
public class JT808MsgLogging : IJT808MsgLogging | public class JT808MsgLogging : IJT808MsgLogging | ||||
{ | { |
@@ -0,0 +1,79 @@ | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.MsgLogging; | |||||
using JT808.Gateway.Traffic; | |||||
using JT808.Gateway.Transmit; | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.NormalHosting.Impl | |||||
{ | |||||
public class JT808NormalReplyMessageHandlerImpl : JT808NormalReplyMessageHandler | |||||
{ | |||||
private readonly ILogger logger; | |||||
private readonly IJT808Traffic jT808Traffic; | |||||
private readonly IJT808MsgLogging jT808MsgLogging; | |||||
private readonly JT808TransmitService jT808TransmitService; | |||||
public JT808NormalReplyMessageHandlerImpl( | |||||
JT808TransmitService jT808TransmitService, | |||||
IJT808MsgLogging jT808MsgLogging, | |||||
IJT808Traffic jT808Traffic, | |||||
ILoggerFactory loggerFactory, | |||||
IJT808Config jT808Config) : base(jT808Config) | |||||
{ | |||||
this.jT808TransmitService = jT808TransmitService; | |||||
this.jT808Traffic = jT808Traffic; | |||||
this.jT808MsgLogging = jT808MsgLogging; | |||||
logger =loggerFactory.CreateLogger("JT808NormalReplyMessageHandlerImpl"); | |||||
//添加自定义消息 | |||||
HandlerDict.Add(0x9999, Msg0x9999); | |||||
} | |||||
/// <summary> | |||||
/// 重写消息处理器 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <param name="session"></param> | |||||
public override byte[] Processor(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
//AOP 可以自定义添加一些东西:上下行日志、数据转发 | |||||
logger.LogDebug("可以自定义添加一些东西:上下行日志、数据转发"); | |||||
//流量 | |||||
jT808Traffic.Increment(request.Header.TerminalPhoneNo, DateTime.Now.ToString("yyyyMMdd"), request.OriginalData.Length); | |||||
var parameter = (request.Header.TerminalPhoneNo, request.OriginalData.ToArray()); | |||||
//转发数据(可同步也可以使用队列进行异步) | |||||
jT808TransmitService.Send(parameter); | |||||
//上行日志(可同步也可以使用队列进行异步) | |||||
jT808MsgLogging.Processor(parameter, JT808MsgLoggingType.up); | |||||
//处理上行消息 | |||||
var down= base.Processor(request, session); | |||||
//下行日志(可同步也可以使用队列进行异步) | |||||
jT808MsgLogging.Processor((request.Header.TerminalPhoneNo, down), JT808MsgLoggingType.down); | |||||
return down; | |||||
} | |||||
/// <summary> | |||||
/// 重写自带的消息 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <param name="session"></param> | |||||
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
logger.LogDebug("重写自带Msg0x0200的消息"); | |||||
return base.Msg0x0200(request, session); | |||||
} | |||||
/// <summary> | |||||
/// 自定义消息 | |||||
/// </summary> | |||||
/// <param name="request"></param> | |||||
/// <returns></returns> | |||||
public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session) | |||||
{ | |||||
logger.LogDebug("自定义消息"); | |||||
return default; | |||||
} | |||||
} | |||||
} |
@@ -5,9 +5,9 @@ using System.Collections.Generic; | |||||
using System.Text; | using System.Text; | ||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using JT808.Gateway.NormalHosting.Services; | |||||
namespace JT808.Gateway.InMemoryMQ | |||||
namespace JT808.Gateway.NormalHosting.Impl | |||||
{ | { | ||||
public class JT808SessionConsumer : IJT808SessionConsumer | public class JT808SessionConsumer : IJT808SessionConsumer | ||||
{ | { |
@@ -1,11 +1,11 @@ | |||||
using JT808.Gateway.Abstractions; | using JT808.Gateway.Abstractions; | ||||
using JT808.Gateway.InMemoryMQ.Services; | |||||
using JT808.Gateway.NormalHosting.Services; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace JT808.Gateway.InMemoryMQ | |||||
namespace JT808.Gateway.NormalHosting.Impl | |||||
{ | { | ||||
public class JT808SessionProducer : IJT808SessionProducer | public class JT808SessionProducer : IJT808SessionProducer | ||||
{ | { |
@@ -0,0 +1,40 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp3.1</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" /> | |||||
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.9.10" /> | |||||
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Client\JT808.Gateway.Client.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.MsgLogging\JT808.Gateway.MsgLogging.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.SessionNotice\JT808.Gateway.SessionNotice.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.Traffic\JT808.Gateway.Traffic.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.Transmit\JT808.Gateway.Transmit.csproj" /> | |||||
<ProjectReference Include="..\..\JT808.Gateway\JT808.Gateway.csproj" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<None Update="appsettings.json"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\nlog.Unix.config"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\nlog.Win32NT.config"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="Configs\NLog.xsd"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,69 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Grpc.Core; | |||||
using JT808.Gateway.Configurations; | |||||
using JT808.Gateway.GrpcService; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using System.Text.Json; | |||||
namespace JT808.Gateway.NormalHosting.Jobs | |||||
{ | |||||
public class CallGrpcClientJob :IHostedService | |||||
{ | |||||
private Channel channel; | |||||
private readonly ILogger Logger; | |||||
private Grpc.Core.Metadata AuthMetadata; | |||||
public CallGrpcClientJob( | |||||
ILoggerFactory loggerFactory, | |||||
JT808Configuration configuration) | |||||
{ | |||||
Logger = loggerFactory.CreateLogger("CallGrpcClientJob"); | |||||
channel = new Channel($"{configuration.WebApiHost}:{configuration.WebApiPort}", | |||||
ChannelCredentials.Insecure); | |||||
AuthMetadata = new Grpc.Core.Metadata(); | |||||
AuthMetadata.Add("token", configuration.WebApiToken); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Task.Run(() => | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
JT808Gateway.JT808GatewayClient jT808GatewayClient = new JT808Gateway.JT808GatewayClient(channel); | |||||
try | |||||
{ | |||||
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty(), AuthMetadata); | |||||
var result2 = jT808GatewayClient.GetTcpSessionAll(new Empty(), AuthMetadata); | |||||
Logger.LogInformation($"[GetTcpAtomicCounter]:{JsonSerializer.Serialize(result1)}"); | |||||
Logger.LogInformation($"[GetTcpSessionAll]:{JsonSerializer.Serialize(result2)}"); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, "Call Grpc Error"); | |||||
} | |||||
try | |||||
{ | |||||
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty()); | |||||
} | |||||
catch (RpcException ex) | |||||
{ | |||||
Logger.LogError($"{ex.StatusCode.ToString()}-{ex.Message}"); | |||||
} | |||||
Thread.Sleep(3000); | |||||
} | |||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
channel.ShutdownAsync(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -1,5 +1,4 @@ | |||||
using JT808.Gateway.Client; | |||||
using JT808.Gateway.Traffic; | |||||
using JT808.Gateway.Traffic; | |||||
using JT808.Protocol.Enums; | using JT808.Protocol.Enums; | ||||
using JT808.Protocol.Extensions; | using JT808.Protocol.Extensions; | ||||
using JT808.Protocol.MessageBody; | using JT808.Protocol.MessageBody; | ||||
@@ -11,7 +10,7 @@ using System.Text; | |||||
using System.Threading; | using System.Threading; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace JT808.Gateway.TestHosting.Jobs | |||||
namespace JT808.Gateway.NormalHosting.Jobs | |||||
{ | { | ||||
public class TrafficJob : IHostedService | public class TrafficJob : IHostedService | ||||
{ | { |
@@ -0,0 +1,70 @@ | |||||
using JT808.Gateway.Client; | |||||
using JT808.Protocol.Enums; | |||||
using JT808.Protocol.Extensions; | |||||
using JT808.Protocol.MessageBody; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.NormalHosting.Jobs | |||||
{ | |||||
public class UpJob : IHostedService | |||||
{ | |||||
private readonly IJT808TcpClientFactory jT808TcpClientFactory; | |||||
private readonly ILogger Logger; | |||||
public UpJob( | |||||
ILoggerFactory loggerFactory, | |||||
IJT808TcpClientFactory jT808TcpClientFactory) | |||||
{ | |||||
Logger = loggerFactory.CreateLogger("UpJob"); | |||||
this.jT808TcpClientFactory = jT808TcpClientFactory; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Task.Run(async () => | |||||
{ | |||||
await Task.Delay(2 * 1000); | |||||
var client = await jT808TcpClientFactory.Create(new JT808DeviceConfig("1234567890", "127.0.0.1", 808), cancellationToken); | |||||
if (client != null) | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
int lat = new Random(1000).Next(100000, 180000); | |||||
int Lng = new Random(1000).Next(100000, 180000); | |||||
await client.SendAsync(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo, new JT808_0x0200() | |||||
{ | |||||
Lat = lat, | |||||
Lng = Lng, | |||||
GPSTime = DateTime.Now, | |||||
Speed = 50, | |||||
Direction = 30, | |||||
AlarmFlag = 5, | |||||
Altitude = 50, | |||||
StatusFlag = 10 | |||||
})); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex.Message); | |||||
} | |||||
await Task.Delay(3 * 1000); | |||||
} | |||||
} | |||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808TcpClientFactory.Dispose(); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -6,20 +6,17 @@ using Microsoft.Extensions.Logging; | |||||
using JT808.Protocol; | using JT808.Protocol; | ||||
using Microsoft.Extensions.Configuration; | using Microsoft.Extensions.Configuration; | ||||
using NLog.Extensions.Logging; | using NLog.Extensions.Logging; | ||||
using JT808.Gateway.TestHosting.Jobs; | |||||
using JT808.Gateway.Kafka; | |||||
using JT808.Gateway.InMemoryMQ; | |||||
using JT808.Gateway.ReplyMessage; | |||||
using JT808.Gateway.Client; | |||||
using JT808.Gateway.SessionNotice; | |||||
using JT808.Gateway.Abstractions.Enums; | |||||
using JT808.Gateway.MsgIdHandler; | |||||
using JT808.Gateway.NormalHosting.Impl; | |||||
using JT808.Gateway.MsgLogging; | using JT808.Gateway.MsgLogging; | ||||
using JT808.Gateway.Traffic; | |||||
using JT808.Gateway.Transmit; | using JT808.Gateway.Transmit; | ||||
using JT808.Gateway.TestHosting.Impl; | |||||
using JT808.Gateway.Traffic; | |||||
using JT808.Gateway.NormalHosting.Services; | |||||
using JT808.Gateway.Abstractions; | |||||
using JT808.Gateway.SessionNotice; | |||||
using JT808.Gateway.Client; | |||||
using JT808.Gateway.NormalHosting.Jobs; | |||||
namespace JT808.Gateway.TestHosting | |||||
namespace JT808.Gateway.NormalHosting | |||||
{ | { | ||||
class Program | class Program | ||||
{ | { | ||||
@@ -43,38 +40,30 @@ namespace JT808.Gateway.TestHosting | |||||
{ | { | ||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | services.AddSingleton<ILoggerFactory, LoggerFactory>(); | ||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | ||||
//使用内存队列实现会话通知 | |||||
services.AddSingleton<JT808SessionService>(); | |||||
services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>(); | |||||
services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
services.AddJT808Configure() | services.AddJT808Configure() | ||||
//添加客户端工具 | //添加客户端工具 | ||||
//.AddClient() | //.AddClient() | ||||
//.AddGateway(options => | |||||
//.AddNormalGateway(options => | |||||
////{ | ////{ | ||||
//// options.TcpPort = 808; | //// options.TcpPort = 808; | ||||
//// options.UdpPort = 808; | //// options.UdpPort = 808; | ||||
////}) | ////}) | ||||
.AddGateway(hostContext.Configuration) | |||||
.AddNormalGateway(hostContext.Configuration) | |||||
.ReplaceNormalReplyMessageHandler<JT808NormalReplyMessageHandlerImpl>() | |||||
.AddMsgLogging<JT808MsgLogging>() | |||||
.AddTraffic() | |||||
.AddSessionNotice() | |||||
.AddTransmit(hostContext.Configuration) | |||||
.AddTcp() | .AddTcp() | ||||
.AddUdp() | .AddUdp() | ||||
.AddGrpc() | .AddGrpc() | ||||
//InMemoryMQ 按需要加载对应的服务 | |||||
//注意:不需要的就不用add进来了 | |||||
.AddServerInMemoryMQ(JT808ConsumerType.All) | |||||
//方式1 | |||||
//.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer| JT808ConsumerType.ReplyMessageConsumer) | |||||
//方式2 | |||||
//.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer) | |||||
.AddInMemoryTraffic() | |||||
.AddInMemoryTransmit(hostContext.Configuration) | |||||
.AddInMemoryMsgIdHandler<JT808MsgIdHandler>() | |||||
.AddInMemoryMsgLogging<JT808MsgLogging>() | |||||
.AddInMemorySessionNotice() | |||||
.AddInMemoryReplyMessage() | |||||
//kafka插件 | |||||
//.AddServerKafkaMsgProducer(hostContext.Configuration) | |||||
//.AddServerKafkaMsgReplyConsumer(hostContext.Configuration) | |||||
//.AddServerKafkaSessionProducer(hostContext.Configuration) | |||||
; | ; | ||||
//流量统计 | //流量统计 | ||||
//services.AddHostedService<TrafficJob>(); | |||||
services.AddHostedService<TrafficJob>(); | |||||
//grpc客户端调用 | //grpc客户端调用 | ||||
//services.AddHostedService<CallGrpcClientJob>(); | //services.AddHostedService<CallGrpcClientJob>(); | ||||
//客户端测试 | //客户端测试 |
@@ -5,7 +5,7 @@ using System.Threading; | |||||
using System.Threading.Channels; | using System.Threading.Channels; | ||||
using System.Threading.Tasks; | using System.Threading.Tasks; | ||||
namespace JT808.Gateway.InMemoryMQ.Services | |||||
namespace JT808.Gateway.NormalHosting.Services | |||||
{ | { | ||||
public class JT808SessionService | public class JT808SessionService | ||||
{ | { |