Procházet zdrojové kódy

1.添加SIM卡黑名单管理及对应的WebApi

2.修复tcp下发未判断连接状态是否异常导致程序异常
3.升级kafka版本库
tags/v2.3.3-dotnetty_v1.1.1-pipeline
SmallChi(Koike) před 4 roky
rodič
revize
3e8161d264
20 změnil soubory, kde provedl 727 přidání a 73 odebrání
  1. +20
    -2
      src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs
  2. +18
    -4
      src/JT808.Gateway.Abstractions/Extensions/JT808SessionExtensions.cs
  3. +5
    -0
      src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs
  4. +71
    -0
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml
  5. +12
    -0
      src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs
  6. +1
    -1
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
  7. +1
    -1
      src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
  8. +1
    -1
      src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
  9. +2
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
  10. +2
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
  11. +2
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
  12. +36
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.Test/Services/JT808BlacklistManagerTest.cs
  13. +20
    -0
      src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml
  14. +46
    -0
      src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs
  15. +108
    -10
      src/JT808.Gateway/Handlers/JT808MsgIdDefaultWebApiHandler.cs
  16. +166
    -2
      src/JT808.Gateway/JT808.Gateway.xml
  17. +4
    -0
      src/JT808.Gateway/JT808GatewayExtensions.cs
  18. +58
    -27
      src/JT808.Gateway/JT808TcpServer.cs
  19. +101
    -0
      src/JT808.Gateway/Services/JT808BlacklistManager.cs
  20. +53
    -19
      src/JT808.Gateway/Session/JT808SessionManager.cs

+ 20
- 2
src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs Zobrazit soubor

@@ -4,17 +4,35 @@ using System.Text;

namespace JT808.Gateway.Abstractions.Configurations
{
/// <summary>
/// JT808网关配置
/// </summary>
public class JT808Configuration
{
/// <summary>
/// tcp端口
/// </summary>
public int TcpPort { get; set; } = 808;
/// <summary>
/// udp端口
/// </summary>
public int UdpPort { get; set; } = 808;
/// <summary>
/// http webapi端口
/// </summary>
public int WebApiPort { get; set; } = 828;
/// <summary>
/// WebApi 默认token 123456
/// </summary>
public string WebApiToken { get; set; } = "123456";
public int SoBacklog { get; set; } = 8192;
public int MiniNumBufferSize { get; set; } = 8096;
/// <summary>
/// tcp连接能够成功连接上的数量
/// </summary>
public int SoBacklog { get; set; } = 10000;
/// <summary>
/// 默认4k
/// </summary>
public int MiniNumBufferSize { get; set; } = 4096;
/// <summary>
/// Tcp读超时
/// 默认10分钟检查一次


+ 18
- 4
src/JT808.Gateway.Abstractions/Extensions/JT808SessionExtensions.cs Zobrazit soubor

@@ -7,31 +7,45 @@ using System.Threading.Tasks;

namespace JT808.Gateway.Abstractions
{
/// <summary>
/// JT808会话扩展
/// </summary>
public static class JT808SessionExtensions
{
/// <summary>
/// 下发消息
/// </summary>
/// <param name="session"></param>
/// <param name="data"></param>
public static async void SendAsync(this IJT808Session session,byte[] data)
{
if (data == null) return;
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
await session.Client.SendAsync(data, SocketFlags.None);
if (session.Client.Connected)
await session.Client.SendAsync(data, SocketFlags.None);
}
else
{
await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint);
}
}

/// <summary>
/// 下发消息
/// </summary>
/// <param name="session"></param>
/// <param name="data"></param>
public static void Send(this IJT808Session session, byte[] data)
{
if (data == null) return;
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
session.Client.Send(data, SocketFlags.None);
if (session.Client.Connected)
session.Client.Send(data, SocketFlags.None);
}
else
{
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
}
}
}


+ 5
- 0
src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs Zobrazit soubor

@@ -8,6 +8,11 @@ namespace JT808.Gateway.Abstractions
/// </summary>
public interface IJT808SessionProducer : IJT808PubSub, IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="notice"></param>
/// <param name="terminalNo"></param>
void ProduceAsync(string notice,string terminalNo);
}
}

+ 71
- 0
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.xml Zobrazit soubor

@@ -4,11 +4,41 @@
<name>JT808.Gateway.Abstractions</name>
</assembly>
<members>
<member name="T:JT808.Gateway.Abstractions.Configurations.JT808Configuration">
<summary>
JT808网关配置
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.TcpPort">
<summary>
tcp端口
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.UdpPort">
<summary>
udp端口
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.WebApiPort">
<summary>
http webapi端口
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.WebApiToken">
<summary>
WebApi 默认token 123456
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.SoBacklog">
<summary>
tcp连接能够成功连接上的数量
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.MiniNumBufferSize">
<summary>
默认4k
</summary>
</member>
<member name="P:JT808.Gateway.Abstractions.Configurations.JT808Configuration.TcpReaderIdleTimeSeconds">
<summary>
Tcp读超时
@@ -85,6 +115,25 @@
传输协议类型
</summary>
</member>
<member name="T:JT808.Gateway.Abstractions.JT808SessionExtensions">
<summary>
JT808会话扩展
</summary>
</member>
<member name="M:JT808.Gateway.Abstractions.JT808SessionExtensions.SendAsync(JT808.Gateway.Abstractions.IJT808Session,System.Byte[])">
<summary>
下发消息
</summary>
<param name="session"></param>
<param name="data"></param>
</member>
<member name="M:JT808.Gateway.Abstractions.JT808SessionExtensions.Send(JT808.Gateway.Abstractions.IJT808Session,System.Byte[])">
<summary>
下发消息
</summary>
<param name="session"></param>
<param name="data"></param>
</member>
<member name="M:JT808.Gateway.Abstractions.IJT808MsgProducer.ProduceAsync(System.String,System.Byte[])">
<summary>
@@ -129,6 +178,13 @@
会话通知(在线/离线)
</summary>
</member>
<member name="M:JT808.Gateway.Abstractions.IJT808SessionProducer.ProduceAsync(System.String,System.String)">
<summary>
</summary>
<param name="notice"></param>
<param name="terminalNo"></param>
</member>
<member name="F:JT808.Gateway.Abstractions.JT808GatewayConstants.JT808WebApiRouteTable.SessionTcpGetAll">
<summary>
基于Tcp的会话服务集合
@@ -164,6 +220,21 @@
会话服务-通过设备终端号查询对应会话
</summary>
</member>
<member name="F:JT808.Gateway.Abstractions.JT808GatewayConstants.JT808WebApiRouteTable.BlacklistAdd">
<summary>
黑名单添加
</summary>
</member>
<member name="F:JT808.Gateway.Abstractions.JT808GatewayConstants.JT808WebApiRouteTable.BlacklistRemove">
<summary>
黑名单删除
</summary>
</member>
<member name="F:JT808.Gateway.Abstractions.JT808GatewayConstants.JT808WebApiRouteTable.BlacklistGet">
<summary>
黑名单查询
</summary>
</member>
<member name="T:JT808.Gateway.Abstractions.JT808MessageHandler">
<summary>
通用消息处理程序


+ 12
- 0
src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs Zobrazit soubor

@@ -45,6 +45,18 @@
/// 会话服务-通过设备终端号查询对应会话
/// </summary>
public static string QueryUdpSessionByTerminalPhoneNo = $"{RouteTablePrefix}/{UdpPrefix}/{SessionPrefix}/QueryUdpSessionByTerminalPhoneNo";
/// <summary>
/// 黑名单添加
/// </summary>
public static string BlacklistAdd = $"{RouteTablePrefix}/Blacklist/Add";
/// <summary>
/// 黑名单删除
/// </summary>
public static string BlacklistRemove = $"{RouteTablePrefix}/Blacklist/Remove";
/// <summary>
/// 黑名单查询
/// </summary>
public static string BlacklistGet = $"{RouteTablePrefix}/Blacklist/Get";
}
}
}

+ 1
- 1
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj Zobrazit soubor

@@ -20,7 +20,7 @@
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.1" />
</ItemGroup>

<ItemGroup>


+ 1
- 1
src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj Zobrazit soubor

@@ -9,7 +9,7 @@
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.8" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.1" />
</ItemGroup>

<ItemGroup>


+ 1
- 1
src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj Zobrazit soubor

@@ -21,7 +21,7 @@
<Version>$(JT808GatewayPackageVersion)</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.5.3" />
<PackageReference Include="Confluent.Kafka" Version="1.6.2" />
</ItemGroup>

<ItemGroup>


+ 2
- 2
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj Zobrazit soubor

@@ -7,10 +7,10 @@


<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.8" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.5" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.1" />
</ItemGroup>
<ItemGroup>


+ 2
- 2
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs Zobrazit soubor

@@ -65,9 +65,9 @@ namespace JT808.Gateway.NormalHosting
//方式2:客户端webapi调用
//services.AddJT808WebApiClientTool(hostContext.Configuration);
//httpclient客户端调用
services.AddHostedService<CallHttpClientJob>();
//services.AddHostedService<CallHttpClientJob>();
//客户端测试 依赖AddClient()服务
services.AddHostedService<UpJob>();
//services.AddHostedService<UpJob>();
});

await serverHostBuilder.RunConsoleAsync();


+ 2
- 2
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj Zobrazit soubor

@@ -10,10 +10,10 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.8" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.5" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.7.1" />
</ItemGroup>

<ItemGroup>


+ 36
- 0
src/JT808.Gateway.Tests/JT808.Gateway.Test/Services/JT808BlacklistManagerTest.cs Zobrazit soubor

@@ -0,0 +1,36 @@
using JT808.Gateway.Services;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace JT808.Gateway.Test.Services
{
public class JT808BlacklistManagerTest
{
[Fact]
public void Test1()
{
JT808BlacklistManager jT808BlacklistManager = new JT808BlacklistManager();
jT808BlacklistManager.Add("1");
Assert.True(jT808BlacklistManager.Contains("1"));
jT808BlacklistManager.Add("2");
Assert.True(jT808BlacklistManager.Contains("2"));
jT808BlacklistManager.Remove("1");
Assert.False(jT808BlacklistManager.Contains("1"));
}

[Fact]
public void Test2()
{
JT808BlacklistManager jT808BlacklistManager = new JT808BlacklistManager();
jT808BlacklistManager.Add("1");
jT808BlacklistManager.Add("2");
Assert.True(jT808BlacklistManager.Contains("1"));
Assert.True(jT808BlacklistManager.Contains("2"));
}
}
}

+ 20
- 0
src/JT808.Gateway.WebApiClientTool/JT808.Gateway.WebApiClientTool.xml Zobrazit soubor

@@ -67,6 +67,26 @@
<param name="terminalPhoneNo"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.WebApiClientTool.JT808HttpClient.BlacklistAdd(System.String)">
<summary>
SIM卡黑名单服务-将对应SIM号加入黑名单
</summary>
<param name="terminalPhoneNo"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.WebApiClientTool.JT808HttpClient.BlacklistRemove(System.String)">
<summary>
SIM卡黑名单服务-将对应SIM号移除黑名单
</summary>
<param name="terminalPhoneNo"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.WebApiClientTool.JT808HttpClient.GetBlacklistAll">
<summary>
SIM卡黑名单服务-获取所有sim的黑名单列表
</summary>
<returns></returns>
</member>
<member name="T:JT808.Gateway.WebApiClientTool.JT808HttpClientExtensions">
<summary>


+ 46
- 0
src/JT808.Gateway.WebApiClientTool/JT808HttpClient.cs Zobrazit soubor

@@ -136,5 +136,51 @@ namespace JT808.Gateway.WebApiClientTool
var value = await JsonSerializer.DeserializeAsync<JT808ResultDto<bool>>(data);
return value;
}

/// <summary>
/// SIM卡黑名单服务-将对应SIM号加入黑名单
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <returns></returns>
public async ValueTask<JT808ResultDto<bool>> BlacklistAdd(string terminalPhoneNo)
{
var request = new HttpRequestMessage(HttpMethod.Post, JT808GatewayConstants.JT808WebApiRouteTable.BlacklistAdd);
request.Content = new StringContent(terminalPhoneNo);
var response = await HttpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var data = await response.Content.ReadAsStreamAsync();
var value = await JsonSerializer.DeserializeAsync<JT808ResultDto<bool>>(data);
return value;
}

/// <summary>
/// SIM卡黑名单服务-将对应SIM号移除黑名单
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <returns></returns>
public async ValueTask<JT808ResultDto<bool>> BlacklistRemove(string terminalPhoneNo)
{
var request = new HttpRequestMessage(HttpMethod.Post, JT808GatewayConstants.JT808WebApiRouteTable.BlacklistRemove);
request.Content = new StringContent(terminalPhoneNo);
var response = await HttpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var data = await response.Content.ReadAsStreamAsync();
var value = await JsonSerializer.DeserializeAsync<JT808ResultDto<bool>>(data);
return value;
}

/// <summary>
/// SIM卡黑名单服务-获取所有sim的黑名单列表
/// </summary>
/// <returns></returns>
public async ValueTask<JT808ResultDto<List<string>>> GetBlacklistAll()
{
var request = new HttpRequestMessage(HttpMethod.Post, JT808GatewayConstants.JT808WebApiRouteTable.BlacklistGet);
var response = await HttpClient.SendAsync(request);
response.EnsureSuccessStatusCode();
var data = await response.Content.ReadAsStreamAsync();
var value = await JsonSerializer.DeserializeAsync<JT808ResultDto<List<string>>>(data);
return value;
}
}
}

+ 108
- 10
src/JT808.Gateway/Handlers/JT808MsgIdDefaultWebApiHandler.cs Zobrazit soubor

@@ -1,5 +1,6 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Dtos;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol.Extensions;
using System;
@@ -15,10 +16,19 @@ namespace JT808.Gateway.Handlers
/// </summary>
public class JT808MsgIdDefaultWebApiHandler : JT808MsgIdHttpHandlerBase
{
private JT808SessionManager JT808SessionManager;
public JT808MsgIdDefaultWebApiHandler(JT808SessionManager jT808SessionManager)
private JT808SessionManager SessionManager;
private JT808BlacklistManager BlacklistManager;
/// <summary>
///
/// </summary>
/// <param name="jT808SessionManager"></param>
/// <param name="jT808BlacklistManager"></param>
public JT808MsgIdDefaultWebApiHandler(
JT808SessionManager jT808SessionManager,
JT808BlacklistManager jT808BlacklistManager)
{
this.JT808SessionManager = jT808SessionManager;
this.SessionManager = jT808SessionManager;
this.BlacklistManager = jT808BlacklistManager;
InitTcpRoute();
InitUdpRoute();
InitCommontRoute();
@@ -34,7 +44,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<List<JT808TcpSessionInfoDto>> resultDto = new JT808ResultDto<List<JT808TcpSessionInfoDto>>();
try
{
resultDto.Data = JT808SessionManager.GetTcpAll().Select(s => new JT808TcpSessionInfoDto
resultDto.Data = SessionManager.GetTcpAll().Select(s => new JT808TcpSessionInfoDto
{
LastActiveTime = s.ActiveTime,
StartTime = s.StartTime,
@@ -66,7 +76,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<JT808TcpSessionInfoDto> resultDto = new JT808ResultDto<JT808TcpSessionInfoDto>();
try
{
resultDto.Data = JT808SessionManager.GetTcpAll().Where(w => w.TerminalPhoneNo == json).Select(s => new JT808TcpSessionInfoDto
resultDto.Data = SessionManager.GetTcpAll().Where(w => w.TerminalPhoneNo == json).Select(s => new JT808TcpSessionInfoDto
{
LastActiveTime = s.ActiveTime,
StartTime = s.StartTime,
@@ -98,7 +108,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
JT808SessionManager.RemoveByTerminalPhoneNo(json);
SessionManager.RemoveByTerminalPhoneNo(json);
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
@@ -127,7 +137,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<List<JT808UdpSessionInfoDto>> resultDto = new JT808ResultDto<List<JT808UdpSessionInfoDto>>();
try
{
resultDto.Data = JT808SessionManager.GetUdpAll().Select(s => new JT808UdpSessionInfoDto
resultDto.Data = SessionManager.GetUdpAll().Select(s => new JT808UdpSessionInfoDto
{
LastActiveTime = s.ActiveTime,
StartTime = s.StartTime,
@@ -159,7 +169,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<JT808UdpSessionInfoDto> resultDto = new JT808ResultDto<JT808UdpSessionInfoDto>();
try
{
resultDto.Data = JT808SessionManager.GetUdpAll().Where(w => w.TerminalPhoneNo == json).Select(s => new JT808UdpSessionInfoDto
resultDto.Data = SessionManager.GetUdpAll().Where(w => w.TerminalPhoneNo == json).Select(s => new JT808UdpSessionInfoDto
{
LastActiveTime = s.ActiveTime,
StartTime = s.StartTime,
@@ -191,7 +201,7 @@ namespace JT808.Gateway.Handlers
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
JT808SessionManager.RemoveByTerminalPhoneNo(json);
SessionManager.RemoveByTerminalPhoneNo(json);
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
@@ -225,7 +235,7 @@ namespace JT808.Gateway.Handlers
try
{
JT808UnificationSendRequestDto jT808UnificationSendRequestDto = JsonSerializer.Deserialize<JT808UnificationSendRequestDto>(json);
resultDto.Data = JT808SessionManager.TrySendByTerminalPhoneNoAsync(jT808UnificationSendRequestDto.TerminalPhoneNo, jT808UnificationSendRequestDto.HexData.ToHexBytes())
resultDto.Data = SessionManager.TrySendByTerminalPhoneNoAsync(jT808UnificationSendRequestDto.TerminalPhoneNo, jT808UnificationSendRequestDto.HexData.ToHexBytes())
.GetAwaiter()
.GetResult();
resultDto.Code = JT808ResultCode.Ok;
@@ -239,11 +249,96 @@ namespace JT808.Gateway.Handlers
return CreateHttpResponse(resultDto);
}

/// <summary>
/// 添加sim卡黑名单
/// </summary>
/// <param name="json"></param>
/// <returns></returns>
public byte[] BlacklistAdd(string json)
{
if (string.IsNullOrEmpty(json))
{
return EmptyHttpResponse();
}
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
BlacklistManager.Add(json);
resultDto.Data = true;
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = ex.StackTrace;
}
return CreateHttpResponse(resultDto);
}

/// <summary>
/// 移除sim卡黑名单
/// </summary>
/// <param name="json"></param>
/// <returns></returns>
public byte[] BlacklistRemove(string json)
{
if (string.IsNullOrEmpty(json))
{
return EmptyHttpResponse();
}
JT808ResultDto<bool> resultDto = new JT808ResultDto<bool>();
try
{
BlacklistManager.Remove(json);
resultDto.Data = true;
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = ex.StackTrace;
}
return CreateHttpResponse(resultDto);
}

/// <summary>
/// 移除sim卡黑名单
/// </summary>
/// <param name="json"></param>
/// <returns></returns>
public byte[] QueryBlacklist(string json)
{
JT808ResultDto<List<string>> resultDto = new JT808ResultDto<List<string>>();
try
{
resultDto.Data = BlacklistManager.GetAll();
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = null;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = ex.StackTrace;
}
return CreateHttpResponse(resultDto);
}

/// <summary>
///
/// </summary>
protected virtual void InitCommontRoute()
{
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.UnificationSend, UnificationSend);
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.BlacklistAdd, BlacklistAdd);
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.BlacklistRemove, BlacklistRemove);
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.BlacklistGet, QueryBlacklist);
}

/// <summary>
///
/// </summary>
protected virtual void InitTcpRoute()
{
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.SessionTcpGetAll, GetTcpSessionAll);
@@ -251,6 +346,9 @@ namespace JT808.Gateway.Handlers
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.SessionRemoveByTerminalPhoneNo, RemoveSessionByTerminalPhoneNo);
}

/// <summary>
///
/// </summary>
protected virtual void InitUdpRoute()
{
CreateRoute(JT808GatewayConstants.JT808WebApiRouteTable.SessionUdpGetAll, GetUdpSessionAll);


+ 166
- 2
src/JT808.Gateway/JT808.Gateway.xml Zobrazit soubor

@@ -9,6 +9,13 @@
默认消息处理业务实现
</summary>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.#ctor(JT808.Gateway.Session.JT808SessionManager,JT808.Gateway.Services.JT808BlacklistManager)">
<summary>
</summary>
<param name="jT808SessionManager"></param>
<param name="jT808BlacklistManager"></param>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.GetTcpSessionAll(System.String)">
<summary>
会话服务集合
@@ -58,6 +65,47 @@
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.BlacklistAdd(System.String)">
<summary>
添加sim卡黑名单
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.BlacklistRemove(System.String)">
<summary>
移除sim卡黑名单
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.QueryBlacklist(System.String)">
<summary>
移除sim卡黑名单
</summary>
<param name="json"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.InitCommontRoute">
<summary>
</summary>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.InitTcpRoute">
<summary>
</summary>
</member>
<member name="M:JT808.Gateway.Handlers.JT808MsgIdDefaultWebApiHandler.InitUdpRoute">
<summary>
</summary>
</member>
<member name="T:JT808.Gateway.JT808GatewayExtensions">
<summary>
JT808网关注册扩展
</summary>
</member>
<member name="M:JT808.Gateway.JT808GatewayExtensions.AddGateway(JT808.Protocol.IJT808Builder,System.Action{JT808.Gateway.Abstractions.Configurations.JT808Configuration})">
<summary>
添加808网关
@@ -150,9 +198,14 @@
<param name="config"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808TcpServer.#ctor(Microsoft.Extensions.Options.IOptionsMonitor{JT808.Gateway.Abstractions.Configurations.JT808Configuration},JT808.Gateway.Abstractions.IJT808MsgProducer,JT808.Gateway.Abstractions.IJT808MsgReplyLoggingProducer,JT808.Gateway.Abstractions.JT808MessageHandler,JT808.Protocol.IJT808Config,Microsoft.Extensions.Logging.ILoggerFactory,JT808.Gateway.Session.JT808SessionManager)">
<member name="T:JT808.Gateway.JT808TcpServer">
<summary>
808 tcp服务器
</summary>
</member>
<member name="M:JT808.Gateway.JT808TcpServer.#ctor(Microsoft.Extensions.Options.IOptionsMonitor{JT808.Gateway.Abstractions.Configurations.JT808Configuration},JT808.Gateway.Abstractions.IJT808MsgProducer,JT808.Gateway.Abstractions.IJT808MsgReplyLoggingProducer,JT808.Gateway.Abstractions.JT808MessageHandler,JT808.Protocol.IJT808Config,Microsoft.Extensions.Logging.ILoggerFactory,JT808.Gateway.Session.JT808SessionManager,JT808.Gateway.Services.JT808BlacklistManager)">
<summary>
使用队列方式
初始化服务注册
</summary>
<param name="configurationMonitor"></param>
<param name="msgProducer"></param>
@@ -161,6 +214,56 @@
<param name="jT808Config"></param>
<param name="loggerFactory"></param>
<param name="jT808SessionManager"></param>
<param name="jT808BlacklistManager"></param>
</member>
<member name="M:JT808.Gateway.JT808TcpServer.StartAsync(System.Threading.CancellationToken)">
<summary>
</summary>
<param name="cancellationToken"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.JT808TcpServer.StopAsync(System.Threading.CancellationToken)">
<summary>
</summary>
<param name="cancellationToken"></param>
<returns></returns>
</member>
<member name="T:JT808.Gateway.Services.JT808BlacklistManager">
<summary>
SIM黑名单管理
</summary>
</member>
<member name="M:JT808.Gateway.Services.JT808BlacklistManager.#ctor">
<summary>
</summary>
</member>
<member name="M:JT808.Gateway.Services.JT808BlacklistManager.Contains(System.String)">
<summary>
是否包含
</summary>
<param name="sim"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Services.JT808BlacklistManager.Add(System.String)">
<summary>
添加
</summary>
<param name="sim"></param>
</member>
<member name="M:JT808.Gateway.Services.JT808BlacklistManager.Remove(System.String)">
<summary>
移除
</summary>
<param name="sim"></param>
</member>
<member name="M:JT808.Gateway.Services.JT808BlacklistManager.GetAll">
<summary>
查询所有黑名单
</summary>
<returns></returns>
</member>
<member name="T:JT808.Gateway.Session.JT808SessionManager">
<summary>
@@ -168,6 +271,67 @@
<remark>不支持变态类型:既发TCP和UDP</remark>
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808SessionManager.Sessions">
<summary>
socket连接会话
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808SessionManager.TerminalPhoneNoSessions">
<summary>
socket绑定的终端SIM连接会话
</summary>
</member>
<member name="M:JT808.Gateway.Session.JT808SessionManager.#ctor(JT808.Gateway.Abstractions.IJT808SessionProducer,Microsoft.Extensions.Logging.ILoggerFactory)">
<summary>
</summary>
<param name="jT808SessionProducer"></param>
<param name="loggerFactory"></param>
</member>
<member name="M:JT808.Gateway.Session.JT808SessionManager.#ctor(Microsoft.Extensions.Logging.ILoggerFactory)">
<summary>
</summary>
<param name="loggerFactory"></param>
</member>
<member name="P:JT808.Gateway.Session.JT808SessionManager.TotalSessionCount">
<summary>
获取会话总数量
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808SessionManager.TcpSessionCount">
<summary>
获取tcp会话数量
</summary>
</member>
<member name="P:JT808.Gateway.Session.JT808SessionManager.UdpSessionCount">
<summary>
获取udp会话数量
</summary>
</member>
<member name="M:JT808.Gateway.Session.JT808SessionManager.TryLink(System.String,JT808.Gateway.Abstractions.IJT808Session)">
<summary>
</summary>
<param name="terminalPhoneNo"></param>
<param name="session"></param>
</member>
<member name="M:JT808.Gateway.Session.JT808SessionManager.TryLink(System.String,System.Net.Sockets.Socket,System.Net.EndPoint)">
<summary>
</summary>
<param name="terminalPhoneNo"></param>
<param name="socket"></param>
<param name="remoteEndPoint"></param>
<returns></returns>
</member>
<member name="M:JT808.Gateway.Session.JT808SessionManager.TryAdd(JT808.Gateway.Abstractions.IJT808Session)">
<summary>
</summary>
<param name="session"></param>
<returns></returns>
</member>
<member name="P:JT808.Gateway.Session.JT808TcpSession.TerminalPhoneNo">
<summary>
终端手机号


+ 4
- 0
src/JT808.Gateway/JT808GatewayExtensions.cs Zobrazit soubor

@@ -17,6 +17,9 @@ using System.Linq;
[assembly: InternalsVisibleTo("JT808.Gateway.Test")]
namespace JT808.Gateway
{
/// <summary>
/// JT808网关注册扩展
/// </summary>
public static partial class JT808GatewayExtensions
{
/// <summary>
@@ -161,6 +164,7 @@ namespace JT808.Gateway
private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddSingleton<JT808MessageHandler>();
config.JT808Builder.Services.AddSingleton<JT808BlacklistManager>();
config.JT808Builder.Services.AddSingleton<JT808SessionManager>();
config.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer_Empty>();
config.JT808Builder.Services.AddSingleton<IJT808MsgReplyLoggingProducer, JT808MsgReplyLoggingProducer_Empty>();


+ 58
- 27
src/JT808.Gateway/JT808TcpServer.cs Zobrazit soubor

@@ -7,6 +7,7 @@ using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Configurations;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
using JT808.Protocol.Exceptions;
@@ -17,6 +18,9 @@ using Microsoft.Extensions.Options;

namespace JT808.Gateway
{
/// <summary>
/// 808 tcp服务器
/// </summary>
public class JT808TcpServer : IHostedService
{
private Socket server;
@@ -25,6 +29,8 @@ namespace JT808.Gateway

private readonly JT808SessionManager SessionManager;

private readonly JT808BlacklistManager BlacklistManager;

private readonly JT808Serializer Serializer;

private readonly JT808MessageHandler MessageHandler;
@@ -35,8 +41,10 @@ namespace JT808.Gateway

private readonly IOptionsMonitor<JT808Configuration> ConfigurationMonitor;

private long MessageReceiveCounter = 0;

/// <summary>
/// 使用队列方式
/// 初始化服务注册
/// </summary>
/// <param name="configurationMonitor"></param>
/// <param name="msgProducer"></param>
@@ -45,6 +53,7 @@ namespace JT808.Gateway
/// <param name="jT808Config"></param>
/// <param name="loggerFactory"></param>
/// <param name="jT808SessionManager"></param>
/// <param name="jT808BlacklistManager"></param>
public JT808TcpServer(
IOptionsMonitor<JT808Configuration> configurationMonitor,
IJT808MsgProducer msgProducer,
@@ -52,13 +61,15 @@ namespace JT808.Gateway
JT808MessageHandler messageHandler,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager)
JT808SessionManager jT808SessionManager,
JT808BlacklistManager jT808BlacklistManager)
{
MessageHandler = messageHandler;
MsgProducer = msgProducer;
MsgReplyLoggingProducer = msgReplyLoggingProducer;
ConfigurationMonitor = configurationMonitor;
SessionManager = jT808SessionManager;
BlacklistManager = jT808BlacklistManager;
Logger = loggerFactory.CreateLogger<JT808TcpServer>();
Serializer = jT808Config.GetSerializer();
InitServer();
@@ -76,7 +87,11 @@ namespace JT808.Gateway
server.Bind(IPEndPoint);
server.Listen(ConfigurationMonitor.CurrentValue.SoBacklog);
}

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
@@ -118,15 +133,20 @@ namespace JT808.Gateway
break;
}
writer.Advance(bytesRead);
FlushResult result = await writer.FlushAsync(session.ReceiveTimeout.Token);
if (result.IsCompleted)
{
break;
}
}
catch (OperationCanceledException ex)
catch (OperationCanceledException)
{
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
Logger.LogError($"[Receive Timeout Or Operation Canceled]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break;
}
catch (System.Net.Sockets.SocketException ex)
{
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
Logger.LogError($"[{ex.SocketErrorCode},{ex.Message}]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break;
}
#pragma warning disable CA1031 // Do not catch general exception types
@@ -136,11 +156,6 @@ namespace JT808.Gateway
break;
}
#pragma warning restore CA1031 // Do not catch general exception types
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
writer.Complete();
}
@@ -161,16 +176,14 @@ namespace JT808.Gateway
if (result.IsCanceled) break;
if (buffer.Length > 0)
{
ReaderBuffer(ref buffer, session, out consumed, out examined);
ReaderBuffer(ref buffer, session, out consumed);
}
}
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{
Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
break;
}
#pragma warning restore CA1031 // Do not catch general exception types
finally
{
reader.AdvanceTo(consumed, examined);
@@ -178,10 +191,8 @@ namespace JT808.Gateway
}
reader.Complete();
}
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined)
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed)
{
consumed = buffer.Start;
examined = buffer.End;
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
if (seqReader.TryPeek(out byte beginMark))
{
@@ -195,29 +206,43 @@ namespace JT808.Gateway
{
if (mark == 1)
{
ReadOnlySpan<byte> contentSpan = ReadOnlySpan<byte>.Empty;
byte[] data = null;
try
{
contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan;
data = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
//过滤掉不是808标准包(14)
//(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1
if (contentSpan.Length > 14)
if (data != null && data.Length > 14)
{
var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240);
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
var package = Serializer.HeaderDeserialize(data);
if (BlacklistManager.Contains(package.Header.TerminalPhoneNo))
{
if (Logger.IsEnabled(LogLevel.Warning))
Logger.LogWarning($"[Blacklist {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
session.ReceiveTimeout.Cancel();
break;
}
# if DEBUG
Interlocked.Increment(ref MessageReceiveCounter);
if (Logger.IsEnabled(LogLevel.Trace))
Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()},Counter:{MessageReceiveCounter}");
#else
if (Logger.IsEnabled(LogLevel.Trace))
Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}-{session.TerminalPhoneNo}]:{package.OriginalData.ToHexString()}");
#endif
SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
Processor(session, package);
}
}
catch (NotImplementedException ex)
{
Logger.LogError(ex.Message,$"{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
Logger.LogError(ex.Message, $"[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
}
catch (JT808Exception ex)
{
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{data?.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
totalConsumed += seqReader.Consumed - totalConsumed;
if (seqReader.End) break;
seqReader.Advance(1);
mark = 0;
@@ -231,14 +256,14 @@ namespace JT808.Gateway
}
if (seqReader.Length == totalConsumed)
{
examined = consumed = buffer.End;
consumed = buffer.End;
}
else
{
consumed = buffer.GetPosition(totalConsumed);
}
}
private void Processor(in IJT808Session session,in JT808HeaderPackage package)
private void Processor(in IJT808Session session, in JT808HeaderPackage package)
{
try
{
@@ -266,6 +291,12 @@ namespace JT808.Gateway
Logger.LogError(ex, $"[Processor]:{package.OriginalData.ToHexString()},{session.Client.RemoteEndPoint},{session.TerminalPhoneNo}");
}
}

/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("JT808 Tcp Server Stop");


+ 101
- 0
src/JT808.Gateway/Services/JT808BlacklistManager.cs Zobrazit soubor

@@ -0,0 +1,101 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Concurrent;
using System.IO;
using Microsoft.Extensions.Options;
using System.Linq;

namespace JT808.Gateway.Services
{
/// <summary>
/// SIM黑名单管理
/// </summary>
public class JT808BlacklistManager
{
private ConcurrentDictionary<string, byte> Blacklist;

private const string BlacklistFileName = "blacklist.ini";

private FileSystemWatcher fileSystemWatcher;

private string FullPath;

/// <summary>
///
/// </summary>
public JT808BlacklistManager()
{
Blacklist = new ConcurrentDictionary<string, byte>();
FullPath = Path.Combine(AppDomain.CurrentDomain.BaseDirectory, BlacklistFileName);
if (!File.Exists(FullPath))
{
File.Create(FullPath).Close();
}
else
{
Init(FullPath);
}
fileSystemWatcher = new FileSystemWatcher(AppDomain.CurrentDomain.BaseDirectory, BlacklistFileName);
fileSystemWatcher.NotifyFilter = NotifyFilters.LastWrite;
fileSystemWatcher.EnableRaisingEvents = true;
fileSystemWatcher.Changed += (sender, e) =>
{
Init(e.FullPath);
};
}

private void Init(string fullPath)
{
var values = File.ReadAllLines(fullPath);
foreach (var item in values)
{
if (!string.IsNullOrEmpty(item))
{
Blacklist.TryAdd(item, 0);
}
}
}

/// <summary>
/// 是否包含
/// </summary>
/// <param name="sim"></param>
/// <returns></returns>
public bool Contains(string sim)
{
return Blacklist.ContainsKey(sim);
}

/// <summary>
/// 添加
/// </summary>
/// <param name="sim"></param>
public void Add(string sim)
{
if(Blacklist.TryAdd(sim, 0))
{
File.AppendAllLines(FullPath, new List<string> { sim });
}
}

/// <summary>
/// 移除
/// </summary>
/// <param name="sim"></param>
public void Remove(string sim)
{
Blacklist.TryRemove(sim, out _);
File.WriteAllLines(FullPath, Blacklist.Select(s => s.Key).OrderBy(o=>o).ToList());
}

/// <summary>
/// 查询所有黑名单
/// </summary>
/// <returns></returns>
public List<string> GetAll()
{
return Blacklist.Select(s => s.Key).ToList();
}
}
}

+ 53
- 19
src/JT808.Gateway/Session/JT808SessionManager.cs Zobrazit soubor

@@ -18,27 +18,43 @@ namespace JT808.Gateway.Session
public class JT808SessionManager
{
private readonly ILogger logger;
private readonly IJT808SessionProducer JT808SessionProducer;
private readonly IJT808SessionProducer SessionProducer;
/// <summary>
/// socket连接会话
/// </summary>
public ConcurrentDictionary<string, IJT808Session> Sessions { get; }
/// <summary>
/// socket绑定的终端SIM连接会话
/// </summary>
public ConcurrentDictionary<string, IJT808Session> TerminalPhoneNoSessions { get; }
/// <summary>
///
/// </summary>
/// <param name="jT808SessionProducer"></param>
/// <param name="loggerFactory"></param>
public JT808SessionManager(
IJT808SessionProducer jT808SessionProducer,
ILoggerFactory loggerFactory
)
{
JT808SessionProducer = jT808SessionProducer;
SessionProducer = jT808SessionProducer;
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

/// <summary>
///
/// </summary>
/// <param name="loggerFactory"></param>
public JT808SessionManager(ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

/// <summary>
/// 获取会话总数量
/// </summary>
public int TotalSessionCount
{
get
@@ -46,7 +62,9 @@ namespace JT808.Gateway.Session
return Sessions.Count;
}
}

/// <summary>
/// 获取tcp会话数量
/// </summary>
public int TcpSessionCount
{
get
@@ -54,7 +72,9 @@ namespace JT808.Gateway.Session
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Count();
}
}

/// <summary>
/// 获取udp会话数量
/// </summary>
public int UdpSessionCount
{
get
@@ -62,7 +82,11 @@ namespace JT808.Gateway.Session
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Count();
}
}

/// <summary>
///
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <param name="session"></param>
internal void TryLink(string terminalPhoneNo, IJT808Session session)
{
DateTime curretDatetime= DateTime.Now;
@@ -74,9 +98,9 @@ namespace JT808.Gateway.Session
session.ActiveTime = curretDatetime;
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession);
//会话通知
if(JT808SessionProducer != null)
if(SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
}
else
@@ -91,14 +115,20 @@ namespace JT808.Gateway.Session
if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
{
//会话通知
if (JT808SessionProducer != null)
if (SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
}
}
}

/// <summary>
///
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <param name="socket"></param>
/// <param name="remoteEndPoint"></param>
/// <returns></returns>
public IJT808Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint)
{
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT808Session currentSession))
@@ -121,13 +151,17 @@ namespace JT808.Gateway.Session
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅
if (JT808SessionProducer != null)
if (SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
}
return currentSession;
}

/// <summary>
///
/// </summary>
/// <param name="session"></param>
/// <returns></returns>
internal bool TryAdd(IJT808Session session)
{
return Sessions.TryAdd(session.SessionID, session);
@@ -196,9 +230,9 @@ namespace JT808.Gateway.Session
removeSession.Close();
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
if (JT808SessionProducer != null)
if (SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
}
}
}
@@ -216,9 +250,9 @@ namespace JT808.Gateway.Session
TerminalPhoneNoSessions.TryRemove(item, out _);
}
var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
if (JT808SessionProducer != null)
if (SessionProducer != null)
{
JT808SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
SessionProducer.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
}
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}");


Načítá se…
Zrušit
Uložit