浏览代码

1.完善各个模块的注册

2.添加测试的各个服务模块
3.去掉无用的文件
tags/pipeline-1.1.0
SmallChi(Koike) 4 年前
父节点
当前提交
d65fffce91
共有 20 个文件被更改,包括 531 次插入167 次删除
  1. +12
    -8
      src/JT808.Gateway.Abstractions/JT808MessageHandler.cs
  2. +1
    -1
      src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
  3. +20
    -8
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs
  4. +64
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs
  5. +30
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs
  6. +1
    -1
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs
  7. +6
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
  8. +28
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs
  9. +23
    -0
      src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs
  10. +23
    -0
      src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs
  11. +0
    -20
      src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs
  12. +0
    -20
      src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs
  13. +6
    -0
      src/JT808.Gateway/JT808.Gateway.csproj
  14. +186
    -0
      src/JT808.Gateway/JT808.Gateway.xml
  15. +112
    -51
      src/JT808.Gateway/JT808GatewayExtensions.cs
  16. +1
    -1
      src/JT808.Gateway/JT808TcpServer.cs
  17. +2
    -2
      src/JT808.Gateway/JT808UdpServer.cs
  18. +0
    -49
      src/JT808.Gateway/Metadata/JT808AtomicCounter.cs
  19. +13
    -1
      src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs
  20. +3
    -3
      src/JT808.Gateway/Session/JT808SessionManager.cs

+ 12
- 8
src/JT808.Gateway.Abstractions/JT808MessageHandler.cs 查看文件

@@ -23,15 +23,21 @@ namespace JT808.Gateway.Abstractions

protected IJT808MsgProducer MsgProducer;

protected IJT808MsgReplyLoggingProducer MsgReplyLoggingProducer;

protected IOptionsMonitor<JT808Configuration> JT808ConfigurationOptionsMonitor;

protected IJT808Config JT808Config;

public JT808MessageHandler(
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config)
{
this.JT808Serializer = jT808Config.GetSerializer();
this.MsgProducer = msgProducer;
this.MsgReplyLoggingProducer = msgReplyLoggingProducer;
this.JT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
HandlerDict = new Dictionary<ushort, MsgIdMethodDelegate> {
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001},
@@ -56,12 +62,6 @@ namespace JT808.Gateway.Abstractions
};
}

public JT808MessageHandler(IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor
, IJT808Config jT808Config) : this(jT808ConfigurationOptionsMonitor, null, jT808Config)
{

}

/// <summary>
/// 消息处理
/// </summary>
@@ -85,12 +85,16 @@ namespace JT808.Gateway.Abstractions
}
else
{
return func(request, session);
var data=func(request, session);
MsgReplyLoggingProducer.ProduceAsync(request.Header.TerminalPhoneNo, data);
return data;
}
}
else
{
return func(request, session);
var data = func(request, session);
MsgReplyLoggingProducer.ProduceAsync(request.Header.TerminalPhoneNo, data);
return data;
}
}
else


+ 1
- 1
src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs 查看文件

@@ -28,7 +28,7 @@ namespace JT808.Gateway.Kafka
{
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build();
TopicName = consumerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer");
logger = loggerFactory.CreateLogger<JT808MsgReplyConsumer>();
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)


+ 20
- 8
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808CustomMessageHandlerImpl.cs 查看文件

@@ -16,24 +16,33 @@ namespace JT808.Gateway.NormalHosting.Impl
{
private readonly ILogger logger;
//private readonly IJT808Traffic jT808Traffic;
private readonly IJT808MsgLogging jT808MsgLogging;
//private readonly JT808TransmitService jT808TransmitService;
private readonly IJT808MsgLogging jT808MsgLogging;
private readonly IJT808MsgReplyProducer MsgReplyProducer;

public JT808CustomMessageHandlerImpl(
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
//JT808TransmitService jT808TransmitService,
ILoggerFactory loggerFactory,
IJT808MsgReplyProducer msgReplyProducer,
IJT808MsgLogging jT808MsgLogging,
//IJT808Traffic jT808Traffic,
ILoggerFactory loggerFactory,
IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor, jT808Config)
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor,
msgProducer,
msgReplyLoggingProducer,
jT808Config)
{
MsgReplyProducer = msgReplyProducer;
//this.jT808TransmitService = jT808TransmitService;
//this.jT808Traffic = jT808Traffic;
this.jT808MsgLogging = jT808MsgLogging;
logger =loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>();
logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>();
//添加自定义消息
HandlerDict.Add(0x9999, Msg0x9999);
}



/// <summary>
/// 重写消息处理器
/// </summary>
@@ -72,7 +81,10 @@ namespace JT808.Gateway.NormalHosting.Impl
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("重写自带Msg0x0200的消息");
return base.Msg0x0200(request, session);
var data = base.Msg0x0200(request, session);
logger.LogDebug("往应答服务发送相同数据进行测试");
MsgReplyProducer.ProduceAsync(request.Header.TerminalPhoneNo, data).ConfigureAwait(false);
return data;
}

/// <summary>


+ 64
- 0
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyConsumer.cs 查看文件

@@ -0,0 +1,64 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.NormalHosting.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.NormalHosting.Impl
{
public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();

public string TopicName { get; } = JT808GatewayConstants.MsgReplyTopic;

private readonly JT808MsgReplyDataService MsgReplyDataService;

private ILogger logger;

public JT808MsgReplyConsumer(
ILoggerFactory loggerFactory,
JT808MsgReplyDataService msgReplyDataService)
{
MsgReplyDataService = msgReplyDataService;
logger = loggerFactory.CreateLogger<JT808MsgReplyConsumer>();
}

public void Dispose()
{
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await MsgReplyDataService.ReadAsync(Cts.Token);
callback(item);
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{
}

public void Unsubscribe()
{
}
}
}

+ 30
- 0
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808MsgReplyProducer.cs 查看文件

@@ -0,0 +1,30 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.NormalHosting.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.NormalHosting.Impl
{
public class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
public string TopicName { get; } = JT808GatewayConstants.MsgReplyTopic;

private readonly JT808MsgReplyDataService MsgReplyDataService;

public JT808MsgReplyProducer(JT808MsgReplyDataService msgReplyDataService)
{
MsgReplyDataService = msgReplyDataService;
}

public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await MsgReplyDataService.WriteAsync(terminalNo, data);
}

public void Dispose()
{
}
}
}

+ 1
- 1
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Impl/JT808SessionConsumer.cs 查看文件

@@ -22,7 +22,7 @@ namespace JT808.Gateway.NormalHosting.Impl
JT808SessionService jT808SessionService,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT808SessionConsumer");
logger = loggerFactory.CreateLogger<JT808SessionConsumer>();
JT808SessionService = jT808SessionService;
}



+ 6
- 2
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs 查看文件

@@ -44,11 +44,15 @@ namespace JT808.Gateway.NormalHosting
services.AddSingleton<JT808SessionService>();
services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
//使用内存队列实现应答生产消费
services.AddSingleton<JT808MsgReplyDataService>();
services.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
services.AddJT808Configure()
//添加客户端工具
//.AddClient()
.AddGateway(hostContext.Configuration)
.ReplaceMessageHandler<JT808CustomMessageHandlerImpl>()
.AddMessageHandler<JT808CustomMessageHandlerImpl>()
.AddMsgReplyConsumer<JT808MsgReplyConsumer>()
.AddMsgLogging<JT808MsgLogging>()
//.AddTraffic()
//.AddSessionNotice()
@@ -56,7 +60,7 @@ namespace JT808.Gateway.NormalHosting
.AddTcp()
//.AddUdp()
.AddHttp()
;
.Register();//必须注册的
//流量统计
//services.AddHostedService<TrafficJob>();
//grpc客户端调用


+ 28
- 0
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Services/JT808MsgReplyDataService.cs 查看文件

@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.NormalHosting.Services
{
public class JT808MsgReplyDataService
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808MsgReplyDataService()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] Data)
{
await _channel.Writer.WriteAsync((terminalNo, Data));
}
public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 23
- 0
src/JT808.Gateway/Internal/JT808MsgProducer_Empty.cs 查看文件

@@ -0,0 +1,23 @@
using JT808.Gateway.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
class JT808MsgProducer_Empty : IJT808MsgProducer
{
public string TopicName { get; }

public void Dispose()
{

}

public ValueTask ProduceAsync(string terminalNo, byte[] data)
{
return default;
}
}
}

+ 23
- 0
src/JT808.Gateway/Internal/JT808MsgReplyLoggingProducer_Empty.cs 查看文件

@@ -0,0 +1,23 @@
using JT808.Gateway.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
class JT808MsgReplyLoggingProducer_Empty : IJT808MsgReplyLoggingProducer
{
public string TopicName { get; }

public void Dispose()
{

}

public ValueTask ProduceAsync(string terminalNo, byte[] data)
{
return default;
}
}
}

+ 0
- 20
src/JT808.Gateway/Internal/JT808NormalGatewayBuilderDefault.cs 查看文件

@@ -1,20 +0,0 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;

namespace JT808.Gateway.Internal
{
public class JT808NormalGatewayBuilderDefault : IJT808NormalGatewayBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808NormalGatewayBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 0
- 20
src/JT808.Gateway/Internal/JT808QueueGatewayBuilderDefault.cs 查看文件

@@ -1,20 +0,0 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;

namespace JT808.Gateway.Internal
{
public class JT808QueueGatewayBuilderDefault : IJT808QueueGatewayBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808QueueGatewayBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 6
- 0
src/JT808.Gateway/JT808.Gateway.csproj 查看文件

@@ -19,6 +19,12 @@
<Product>JT808.Gateway</Product>
<Version>$(JT808GatewayPackageVersion)</Version>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<DocumentationFile>JT808.Gateway.xml</DocumentationFile>
</PropertyGroup>
<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Release|AnyCPU'">
<DocumentationFile>JT808.Gateway.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<Compile Remove="Internal\JT808NormalGatewayBuilderDefault.cs" />
<Compile Remove="Internal\JT808QueueGatewayBuilderDefault.cs" />


+ 186
- 0
src/JT808.Gateway/JT808.Gateway.xml 查看文件

@@ -0,0 +1,186 @@
<?xml version="1.0"?>
<doc>
<assembly>
<name>JT808.Gateway</name>
</assembly>
<members>
<member name="T:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler">
<summary>
默认消息处理业务实现
</summary>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.GetTcpSessionAll(System.String)">
<summary>
会话服务集合
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.QueryTcpSessionByTerminalPhoneNo(System.String)">
<summary>
通过终端手机号查询对应会话
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.RemoveSessionByTerminalPhoneNo(System.String)">
<summary>
会话服务-通过设备终端号移除对应会话
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.GetUdpSessionAll(System.String)">
<summary>
会话服务集合
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.QueryUdpSessionByTerminalPhoneNo(System.String)">
<summary>
通过终端手机号查询对应会话
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.RemoveUdpByTerminalPhoneNo(System.String)">
<summary>
会话服务-通过设备终端号移除对应会话
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.UnificationSend(System.String)">
<summary>
统一下发信息
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddGateway(JT808.Protocol.IJT808Builder,System.Action{JT808.Gateway.Abstractions.Configurations.JT808Configuration})">
<summary>
添加808网关
</summary>
<param name="jT808Builder"></param>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddGateway(JT808.Protocol.IJT808Builder,Microsoft.Extensions.Configuration.IConfiguration)">
<summary>
添加808网关
</summary>
<param name="jT808Builder"></param>
<param name="configuration"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddTcp(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加tcp服务器
</summary>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddUdp(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加udp服务器
</summary>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddHttp(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加http服务器
</summary>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddHttp``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加http服务器
</summary>
<typeparam name="TJT808MsgIdDefaultWebApiHandler"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddMessageHandler``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加消息业务处理程序
</summary>
<typeparam name="TJT808MessageHandler"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddHttpAuthorization``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加Http服务认证机制
</summary>
<typeparam name="TJT808Authorization"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddMsgProducer``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加消息生产者
</summary>
<typeparam name="TJT808MsgProducer"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddMsgReplyLoggingProducer``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加消息应答后的应答生产者
</summary>
<typeparam name="TJT808MsgReplyLoggingProducer"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddMsgReplyConsumer``1(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加消息应答消费者
</summary>
<typeparam name="TJT808MsgReplyConsumer"></typeparam>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.Register(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
必须注册的
</summary>
<param name="config"></param>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddJT808Core(JT808.Gateway.Abstractions.IJT808GatewayBuilder)">
<summary>
添加公共模块
</summary>
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808TcpServer.#ctor(JT808.Gateway.Abstractions.JT808MessageHandler,Microsoft.Extensions.Options.IOptions{JT808.Gateway.Abstractions.Configurations.JT808Configuration},JT808.Protocol.IJT808Config,Microsoft.Extensions.Logging.ILoggerFactory,JT808.Gateway.Session.JT808SessionManager)">
<summary>
使用队列方式
</summary>
<param name="messageHandler"></param>
<param name="jT808ConfigurationAccessor"></param>
<param name="jT808Config"></param>
<param name="loggerFactory"></param>
<param name="jT808SessionManager"></param>
</member>
<member name="T:JT808.Gateway.Session.JT808SessionManager">
<summary>
<remark>不支持变态类型:既发TCP和UDP</remark>
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808TcpSession.TerminalPhoneNo">
<summary>
终端手机号
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808UdpSession.TerminalPhoneNo">
<summary>
终端手机号
</summary>
</member>
</members>
</doc>

+ 112
- 51
src/JT808.Gateway/JT808GatewayExtensions.cs 查看文件

@@ -11,6 +11,7 @@ using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Runtime.CompilerServices;
using System.Linq;

[assembly: InternalsVisibleTo("JT808.Gateway.TestHosting")]
[assembly: InternalsVisibleTo("JT808.Gateway.Test")]
@@ -18,44 +19,12 @@ namespace JT808.Gateway
{
public static partial class JT808GatewayExtensions
{
//public static IJT808QueueGatewayBuilder AddQueueGateway(this IJT808Builder jT808Builder, Action<JT808Configuration> config)
//{
// IJT808QueueGatewayBuilder server = new JT808QueueGatewayBuilderDefault(jT808Builder);
// server.JT808Builder.Services.Configure(config);
// server.AddJT808Core();
// server.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
// return server;
//}

//public static IJT808NormalGatewayBuilder AddNormalGateway(this IJT808Builder jT808Builder, Action<JT808Configuration> config)
//{
// IJT808NormalGatewayBuilder server = new JT808NormalGatewayBuilderDefault(jT808Builder);
// server.JT808Builder.Services.AddSingleton<JT808NormalReplyMessageHandler>();
// server.JT808Builder.Services.Configure(config);
// server.AddJT808Core();
// return server;
//}

//public static IJT808QueueGatewayBuilder AddQueueGateway(this IJT808Builder jT808Builder, IConfiguration configuration)
//{
// IJT808QueueGatewayBuilder server = new JT808QueueGatewayBuilderDefault(jT808Builder);
// server.JT808Builder.Services.Configure<JT808Configuration>(configuration.GetSection("JT808Configuration"));
// server.AddJT808Core();
// server.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
// return server;
//}

//public static IJT808NormalGatewayBuilder AddNormalGateway(this IJT808Builder jT808Builder, IConfiguration configuration)
//{
// IJT808NormalGatewayBuilder server = new JT808NormalGatewayBuilderDefault(jT808Builder);
// server.JT808Builder.Services.AddSingleton<JT808NormalReplyMessageHandler>();
// server.JT808Builder.Services.Configure<JT808Configuration>(configuration.GetSection("JT808Configuration"));
// server.AddJT808Core();
// return server;
//}



/// <summary>
/// 添加808网关
/// </summary>
/// <param name="jT808Builder"></param>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddGateway(this IJT808Builder jT808Builder, Action<JT808Configuration> config)
{
JT808GatewayBuilderDefault jT808GatewayBuilderDefault = new JT808GatewayBuilderDefault(jT808Builder);
@@ -63,7 +32,12 @@ namespace JT808.Gateway
jT808GatewayBuilderDefault.AddJT808Core();
return jT808GatewayBuilderDefault;
}

/// <summary>
/// 添加808网关
/// </summary>
/// <param name="jT808Builder"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddGateway(this IJT808Builder jT808Builder, IConfiguration configuration)
{
JT808GatewayBuilderDefault jT808GatewayBuilderDefault = new JT808GatewayBuilderDefault(jT808Builder);
@@ -71,28 +45,33 @@ namespace JT808.Gateway
jT808GatewayBuilderDefault.AddJT808Core();
return jT808GatewayBuilderDefault;
}

public static IJT808GatewayBuilder ReplaceMessageHandler<TJT808MessageHandler>(this IJT808GatewayBuilder config)
where TJT808MessageHandler : JT808MessageHandler
{
config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(JT808MessageHandler), typeof(TJT808MessageHandler), ServiceLifetime.Singleton));
return config;
}

/// <summary>
/// 添加tcp服务器
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddTcp(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddHostedService<JT808TcpServer>();
config.JT808Builder.Services.AddHostedService<JT808TcpReceiveTimeoutHostedService>();
return config;
}

/// <summary>
/// 添加udp服务器
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddUdp(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddHostedService<JT808UdpServer>();
config.JT808Builder.Services.AddHostedService<JT808UdpReceiveTimeoutHostedService>();
return config;
}

/// <summary>
/// 添加http服务器
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddHttp(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddSingleton<IJT808Authorization, JT808AuthorizationDefault>();
@@ -100,7 +79,12 @@ namespace JT808.Gateway
config.JT808Builder.Services.AddHostedService<JT808HttpServer>();
return config;
}

/// <summary>
/// 添加http服务器
/// </summary>
/// <typeparam name="TJT808MsgIdDefaultWebApiHandler"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddHttp<TJT808MsgIdDefaultWebApiHandler>(this IJT808GatewayBuilder config)
where TJT808MsgIdDefaultWebApiHandler: JT808MsgIdDefaultWebApiHandler
{
@@ -109,11 +93,88 @@ namespace JT808.Gateway
config.JT808Builder.Services.AddHostedService<JT808HttpServer>();
return config;
}

/// <summary>
/// 添加消息业务处理程序
/// </summary>
/// <typeparam name="TJT808MessageHandler"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddMessageHandler<TJT808MessageHandler>(this IJT808GatewayBuilder config)
where TJT808MessageHandler : JT808MessageHandler
{
config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(JT808MessageHandler), typeof(TJT808MessageHandler), ServiceLifetime.Singleton));
return config;
}
/// <summary>
/// 添加Http服务认证机制
/// </summary>
/// <typeparam name="TJT808Authorization"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddHttpAuthorization<TJT808Authorization>(this IJT808GatewayBuilder config)
where TJT808Authorization : IJT808Authorization
{
config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808Authorization), typeof(TJT808Authorization), ServiceLifetime.Singleton));
return config;
}
/// <summary>
/// 添加消息生产者
/// </summary>
/// <typeparam name="TJT808MsgProducer"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddMsgProducer<TJT808MsgProducer>(this IJT808GatewayBuilder config)
where TJT808MsgProducer : IJT808MsgProducer
{
config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(TJT808MsgProducer), ServiceLifetime.Singleton));
return config;
}
/// <summary>
/// 添加消息应答后的应答生产者
/// </summary>
/// <typeparam name="TJT808MsgReplyLoggingProducer"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddMsgReplyLoggingProducer<TJT808MsgReplyLoggingProducer>(this IJT808GatewayBuilder config)
where TJT808MsgReplyLoggingProducer : IJT808MsgReplyLoggingProducer
{
config.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyLoggingProducer), typeof(TJT808MsgReplyLoggingProducer), ServiceLifetime.Singleton));
return config;
}
/// <summary>
/// 添加消息应答消费者
/// </summary>
/// <typeparam name="TJT808MsgReplyConsumer"></typeparam>
/// <param name="config"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddMsgReplyConsumer<TJT808MsgReplyConsumer>(this IJT808GatewayBuilder config)
where TJT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
config.JT808Builder.Services.Add(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(TJT808MsgReplyConsumer), ServiceLifetime.Singleton));
return config;
}
/// <summary>
/// 必须注册的
/// </summary>
/// <param name="config"></param>
public static void Register(this IJT808GatewayBuilder config)
{
if(config.JT808Builder.Services.Where(s => s.ServiceType == typeof(IJT808MsgReplyConsumer)).Count() > 0)
{
config.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
}
}
/// <summary>
/// 添加公共模块
/// </summary>
/// <param name="config"></param>
/// <returns></returns>
private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddSingleton<JT808MessageHandler>();
config.JT808Builder.Services.AddSingleton<JT808SessionManager>();
config.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer_Empty>();
config.JT808Builder.Services.AddSingleton<IJT808MsgReplyLoggingProducer, JT808MsgReplyLoggingProducer_Empty>();
return config;
}
}

+ 1
- 1
src/JT808.Gateway/JT808TcpServer.cs 查看文件

@@ -230,7 +230,7 @@ namespace JT808.Gateway
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("808 Tcp Server Stop");
Logger.LogInformation("JT808 Tcp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();


+ 2
- 2
src/JT808.Gateway/JT808UdpServer.cs 查看文件

@@ -43,7 +43,7 @@ namespace JT808.Gateway
JT808MessageHandler messageHandler)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808UdpServer");
Logger = loggerFactory.CreateLogger<JT808UdpServer>();
Serializer = jT808Config.GetSerializer();
Configuration = jT808ConfigurationAccessor.Value;
MessageHandler = messageHandler;
@@ -113,7 +113,7 @@ namespace JT808.Gateway
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("808 Udp Server Stop");
Logger.LogInformation("JT808 Udp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();


+ 0
- 49
src/JT808.Gateway/Metadata/JT808AtomicCounter.cs 查看文件

@@ -1,49 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Metadata
{
/// <summary>
///
/// <see cref="Grpc.Core.Internal"/>
/// </summary>
internal class JT808AtomicCounter
{
long counter = 0;

public JT808AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public void Reset()
{
Interlocked.Exchange(ref counter, 0);
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Add(long len)
{
return Interlocked.Add(ref counter,len);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 13
- 1
src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs 查看文件

@@ -1,6 +1,7 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Configurations;
using JT808.Gateway.Session;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
@@ -16,19 +17,30 @@ namespace JT808.Gateway.Services

private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer;

private ILogger logger;

public JT808MsgReplyHostedService(
ILoggerFactory loggerFactory,
IJT808MsgReplyConsumer jT808MsgReplyConsumer,
JT808SessionManager jT808SessionManager)
{
JT808MsgReplyConsumer = jT808MsgReplyConsumer;
JT808SessionManager = jT808SessionManager;
logger = loggerFactory.CreateLogger<JT808MsgReplyHostedService>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
JT808MsgReplyConsumer.OnMessage(async(item) =>
{
await JT808SessionManager.TrySendByTerminalPhoneNoAsync(item.TerminalNo, item.Data);
try
{
await JT808SessionManager.TrySendByTerminalPhoneNoAsync(item.TerminalNo, item.Data);
}
catch (Exception ex)
{
logger.LogError(ex, $"{item.TerminalNo}-{item.Data.ToHexString()}");
}
});
JT808MsgReplyConsumer.Subscribe();
return Task.CompletedTask;


+ 3
- 3
src/JT808.Gateway/Session/JT808SessionManager.cs 查看文件

@@ -29,14 +29,14 @@ namespace JT808.Gateway.Session
JT808SessionProducer = jT808SessionProducer;
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager");
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

public JT808SessionManager(ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager");
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

public int TotalSessionCount
@@ -126,7 +126,7 @@ namespace JT808.Gateway.Session

public async ValueTask<bool> TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data)
{
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session))
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var session))
{
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{


正在加载...
取消
保存