Bladeren bron

1.调整session的关联,避免下发多查询一次

2.随便玩下压测,感觉够用。
tags/pipeline-1.0.0
smallchi(Koike) 5 jaren geleden
bovenliggende
commit
8aab104134
24 gewijzigde bestanden met toevoegingen van 217 en 59 verwijderingen
  1. +165
    -10
      doc/README.md
  2. BIN
      doc/pipeline/client_10k.png
  3. BIN
      doc/pipeline/client_20k.png
  4. BIN
      doc/pipeline/client_40k.png
  5. BIN
      doc/pipeline/client_60k.png
  6. BIN
      doc/pipeline/client_80k.png
  7. BIN
      doc/pipeline/server_network_10k.png
  8. BIN
      doc/pipeline/server_network_20k.png
  9. BIN
      doc/pipeline/server_network_40k.png
  10. BIN
      doc/pipeline/server_network_60k.png
  11. BIN
      doc/pipeline/server_network_80k.png
  12. BIN
      doc/pipeline/server_proccess_10k.png
  13. BIN
      doc/pipeline/server_proccess_20k.png
  14. BIN
      doc/pipeline/server_proccess_40k.png
  15. BIN
      doc/pipeline/server_proccess_60k.png
  16. BIN
      doc/pipeline/server_proccess_80k.png
  17. +1
    -1
      src/JT808.Gateway.CleintBenchmark/Configs/nlog.unix.config
  18. +1
    -1
      src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs
  19. +2
    -2
      src/JT808.Gateway.Client/JT808TcpClient.cs
  20. +2
    -2
      src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs
  21. +1
    -1
      src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config
  22. +6
    -2
      src/JT808.Gateway/JT808TcpServer.cs
  23. +9
    -5
      src/JT808.Gateway/JT808UdpServer.cs
  24. +30
    -35
      src/JT808.Gateway/Session/JT808SessionManager.cs

+ 165
- 10
doc/README.md Bestand weergeven

@@ -9,25 +9,180 @@
| win server 2016 | 4c8g | 压力测试客户端 | | win server 2016 | 4c8g | 压力测试客户端 |
| centos7 | 4c8g | JT808服务端 | | centos7 | 4c8g | JT808服务端 |


![performance_1000](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/performance_1000.png)
![performance_1000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_1000.png)


![performance_2000](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/performance_2000.png)
![performance_2000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_2000.png)


![performance_5000](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/performance_5000.png)
![performance_5000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_5000.png)


![performance_10000](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/performance_10000.png)
![performance_10000](https://github.com/SmallChi/JT808Gateway/blob/master/doc/dotnetty/performance_10000.png)


## 基于pipeline ## 基于pipeline


**由于【参数配置】不同导致测试的效果可能不同,只是谁便测试玩玩,反正机器便宜。**

> 注意1:连接数和并发数要区分开;
> 注意2:阿里云的机器默认有连接数限制(5000),可以先创建一台,把该装的软件安装好,tcp参数内核调优后,在备份一个系统镜像在玩。

``` 1
使用PM2托管

//服务端
cd /data/JT808.Gateway
pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log"

//客户端
cd /data/JT808Client
pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log"

修改wwwroot下index.html的webapi接口地址
127.0.0.1:15004/index.html
```

### 10K

| 操作系统 | 配置 | 使用 | | 操作系统 | 配置 | 使用 |
|:-------:|:-------:|:-------:| |:-------:|:-------:|:-------:|
| centos7 | 4c8g | JT808服务端 |
| centos7 | 4c8g | JT808客户端 |
| centos7 | 8c16g | JT808服务端 |
| centos7 | 8c16g | JT808客户端 |

> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS

客户端参数配置appsettings.json

``` 1
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 10000,
"Interval": 10,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```

![server_proccess_10k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_10k.png)

![server_network_10k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_10k.png)

![client_10k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_10k.png)

### 20K

| 操作系统 | 配置 | 使用 |
|:-------:|:-------:|:-------:|
| centos7 | 8c16g | JT808服务端 |
| centos7 | 8c16g | JT808客户端 |

> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS

客户端参数配置appsettings.json

``` 1
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 20000,
"Interval": 300,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```

![server_proccess_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_20k.png)

![server_network_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_20k.png)

![client_20k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_20k.png)

### 40K

| 操作系统 | 配置 | 使用 |
|:-------:|:-------:|:-------:|
| centos7 | 8c16g | JT808服务端 |
| centos7 | 8c16g | JT808客户端 |

> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS

客户端参数配置appsettings.json

``` 1
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 40000,
"Interval": 1000,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```

![server_proccess_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_40k.png)

![server_network_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_40k.png)

![client_40k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_40k.png)

### 60K

| 操作系统 | 配置 | 使用 |
|:-------:|:-------:|:-------:|
| centos7 | 8c16g | JT808服务端 |
| centos7 | 8c16g | JT808客户端 |

> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS

客户端参数配置appsettings.json

``` 1
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 60000,
"Interval": 1000,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```

![server_proccess_60k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_60k.png)

![server_network_60k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_60k.png)

![client_60k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_60k.png)

### 80K

| 操作系统 | 配置 | 使用 |
|:-------:|:-------:|:-------:|
| centos7 | 8c16g | JT808服务端 |
| centos7 | 8c16g | JT808客户端 |
| centos7 | 8c16g | JT808客户端 |

> 计算网络增强型 sn1ne ecs.sn1ne.2xlarge 8 vCPU 16 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 2 Gbps 100 万 PPS

客户端1的参数配置appsettings.json

``` 1
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 40000,
"Interval": 3000,
"DeviceTemplate": 100000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```

客户端2的参数配置appsettings.json


> 计算网络增强型 sn1ne ecs.sn1ne.xlarge 4 vCPU 8 GiB Intel Xeon E5-2682v4 / Intel Xeon(Skylake) Platinum 8163 2.5 GHz 1.5 Gbps 50 万 PPS
``` 2
"ClientBenchmarkOptions": {
"IP": "",
"Port": 808,
"DeviceCount": 40000,
"Interval": 3000,
"DeviceTemplate": 200000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
}
```


![server_proccess_10k](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/pipeline/server_proccess_10k.png)
![server_proccess_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_proccess_80k.png)


![server_network_10k](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/pipeline/server_network_10k.png)
![server_network_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/server_network_80k.png)


![client_10k](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/pipeline/client_10k.png)
![client_80k](https://github.com/SmallChi/JT808Gateway/blob/master/doc/pipeline/client_80k.png)

BIN
doc/pipeline/client_10k.png Bestand weergeven

Voor Na
Breedte: 1035  |  Hoogte: 612  |  Grootte: 22 KiB Breedte: 1418  |  Hoogte: 740  |  Grootte: 53 KiB

BIN
doc/pipeline/client_20k.png Bestand weergeven

Voor Na
Breedte: 1401  |  Hoogte: 737  |  Grootte: 53 KiB

BIN
doc/pipeline/client_40k.png Bestand weergeven

Voor Na
Breedte: 1412  |  Hoogte: 742  |  Grootte: 54 KiB

BIN
doc/pipeline/client_60k.png Bestand weergeven

Voor Na
Breedte: 1398  |  Hoogte: 745  |  Grootte: 56 KiB

BIN
doc/pipeline/client_80k.png Bestand weergeven

Voor Na
Breedte: 1916  |  Hoogte: 752  |  Grootte: 103 KiB

BIN
doc/pipeline/server_network_10k.png Bestand weergeven

Voor Na
Breedte: 1694  |  Hoogte: 672  |  Grootte: 62 KiB Breedte: 1678  |  Hoogte: 818  |  Grootte: 110 KiB

BIN
doc/pipeline/server_network_20k.png Bestand weergeven

Voor Na
Breedte: 1661  |  Hoogte: 818  |  Grootte: 101 KiB

BIN
doc/pipeline/server_network_40k.png Bestand weergeven

Voor Na
Breedte: 1652  |  Hoogte: 820  |  Grootte: 96 KiB

BIN
doc/pipeline/server_network_60k.png Bestand weergeven

Voor Na
Breedte: 1655  |  Hoogte: 828  |  Grootte: 110 KiB

BIN
doc/pipeline/server_network_80k.png Bestand weergeven

Voor Na
Breedte: 1658  |  Hoogte: 816  |  Grootte: 132 KiB

BIN
doc/pipeline/server_proccess_10k.png Bestand weergeven

Voor Na
Breedte: 1708  |  Hoogte: 629  |  Grootte: 59 KiB Breedte: 1658  |  Hoogte: 768  |  Grootte: 85 KiB

BIN
doc/pipeline/server_proccess_20k.png Bestand weergeven

Voor Na
Breedte: 1653  |  Hoogte: 792  |  Grootte: 90 KiB

BIN
doc/pipeline/server_proccess_40k.png Bestand weergeven

Voor Na
Breedte: 1652  |  Hoogte: 799  |  Grootte: 82 KiB

BIN
doc/pipeline/server_proccess_60k.png Bestand weergeven

Voor Na
Breedte: 1645  |  Hoogte: 825  |  Grootte: 87 KiB

BIN
doc/pipeline/server_proccess_80k.png Bestand weergeven

Voor Na
Breedte: 1663  |  Hoogte: 798  |  Grootte: 93 KiB

+ 1
- 1
src/JT808.Gateway.CleintBenchmark/Configs/nlog.unix.config Bestand weergeven

@@ -31,6 +31,6 @@
</target> </target>
</targets> </targets>
<rules> <rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="all,console"/>
<logger name="*" minlevel="Info" maxlevel="Fatal" writeTo="all,console"/>
</rules> </rules>
</nlog> </nlog>

+ 1
- 1
src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs Bestand weergeven

@@ -75,7 +75,7 @@ namespace JT808.Gateway.CleintBenchmark.Services
logger.LogError(ex.Message); logger.LogError(ex.Message);
} }
} }
Thread.Sleep(100);
Thread.Sleep(clientBenchmarkOptions.Interval);
} }
}); });
return Task.CompletedTask; return Task.CompletedTask;


+ 2
- 2
src/JT808.Gateway.Client/JT808TcpClient.cs Bestand weergeven

@@ -69,7 +69,7 @@ namespace JT808.Gateway.Client
{ {
try try
{ {
Memory<byte> memory = writer.GetMemory(10240);
Memory<byte> memory = writer.GetMemory(80960);
int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None, cancellationToken); int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None, cancellationToken);
if (bytesRead == 0) if (bytesRead == 0)
{ {
@@ -146,7 +146,7 @@ namespace JT808.Gateway.Client
{ {
try try
{ {
var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan);
var package = JT808Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan,minBufferSize:10240);
ReceiveAtomicCounterService.MsgSuccessIncrement(); ReceiveAtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}"); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{ReceiveAtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");


+ 2
- 2
src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs Bestand weergeven

@@ -108,10 +108,10 @@ namespace JT808.Gateway.Test.Session
var result3 = jT808SessionManager.TryAdd(session3); var result3 = jT808SessionManager.TryAdd(session3);
jT808SessionManager.TryLink(tno2, session3); jT808SessionManager.TryLink(tno2, session3);
Assert.True(result3); Assert.True(result3);
if (jT808SessionManager.TerminalPhoneNoSessions.TryGetValue(tno2,out string sessionid))
if (jT808SessionManager.TerminalPhoneNoSessions.TryGetValue(tno2,out var sessionInfo))
{ {
//实际的通道Id //实际的通道Id
Assert.Equal(session3.SessionID, sessionid);
Assert.Equal(session3.SessionID, sessionInfo.SessionID);
} }
Assert.Equal(3, jT808SessionManager.TotalSessionCount); Assert.Equal(3, jT808SessionManager.TotalSessionCount);
Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count); Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count);


+ 1
- 1
src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config Bestand weergeven

@@ -31,6 +31,6 @@
</target> </target>
</targets> </targets>
<rules> <rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="Gateway"/>
<logger name="*" minlevel="Info" maxlevel="Fatal" writeTo="Gateway"/>
</rules> </rules>
</nlog> </nlog>

+ 6
- 2
src/JT808.Gateway/JT808TcpServer.cs Bestand weergeven

@@ -23,7 +23,7 @@ namespace JT808.Gateway
{ {
public class JT808TcpServer:IHostedService public class JT808TcpServer:IHostedService
{ {
private Socket server;
private readonly Socket server;


private readonly ILogger Logger; private readonly ILogger Logger;


@@ -101,7 +101,7 @@ namespace JT808.Gateway
} }
writer.Advance(bytesRead); writer.Advance(bytesRead);
} }
catch(OperationCanceledException)
catch(OperationCanceledException ex)
{ {
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}");
break; break;
@@ -111,11 +111,13 @@ namespace JT808.Gateway
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}"); Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}");
break; break;
} }
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}");
break; break;
} }
#pragma warning restore CA1031 // Do not catch general exception types
FlushResult result = await writer.FlushAsync(); FlushResult result = await writer.FlushAsync();
if (result.IsCompleted) if (result.IsCompleted)
{ {
@@ -144,11 +146,13 @@ namespace JT808.Gateway
ReaderBuffer(ref buffer, session,out consumed, out examined); ReaderBuffer(ref buffer, session,out consumed, out examined);
} }
} }
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex) catch (Exception ex)
{ {
SessionManager.RemoveBySessionId(session.SessionID); SessionManager.RemoveBySessionId(session.SessionID);
break; break;
} }
#pragma warning restore CA1031 // Do not catch general exception types
finally finally
{ {
reader.AdvanceTo(consumed, examined); reader.AdvanceTo(consumed, examined);


+ 9
- 5
src/JT808.Gateway/JT808UdpServer.cs Bestand weergeven

@@ -23,7 +23,7 @@ namespace JT808.Gateway
{ {
public class JT808UdpServer : IHostedService public class JT808UdpServer : IHostedService
{ {
private Socket server;
private readonly Socket server;


private readonly ILogger Logger; private readonly ILogger Logger;


@@ -37,7 +37,7 @@ namespace JT808.Gateway


private readonly JT808Configuration Configuration; private readonly JT808Configuration Configuration;


private IPEndPoint LocalIPEndPoint;
private readonly IPEndPoint LocalIPEndPoint;


public JT808UdpServer( public JT808UdpServer(
IOptions<JT808Configuration> jT808ConfigurationAccessor, IOptions<JT808Configuration> jT808ConfigurationAccessor,
@@ -75,10 +75,12 @@ namespace JT808.Gateway
{ {
Logger.LogError(ex, "Receive MessageFrom Async"); Logger.LogError(ex, "Receive MessageFrom Async");
} }
catch(Exception ex)
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex)
{ {
Logger.LogError(ex, $"Received Bytes"); Logger.LogError(ex, $"Received Bytes");
} }
#pragma warning restore CA1031 // Do not catch general exception types
finally finally
{ {
ArrayPool<byte>.Shared.Return(buffer); ArrayPool<byte>.Shared.Return(buffer);
@@ -91,11 +93,11 @@ namespace JT808.Gateway
{ {
try try
{ {
var package = Serializer.HeaderDeserialize(buffer);
var package = Serializer.HeaderDeserialize(buffer, minBufferSize: 10240);
AtomicCounterService.MsgSuccessIncrement(); AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
string sessionId= SessionManager.TryLink(package.Header.TerminalPhoneNo, socket, receiveMessageFromResult.RemoteEndPoint);
SessionManager.TryLink(package.Header.TerminalPhoneNo, socket, receiveMessageFromResult.RemoteEndPoint);
if (Logger.IsEnabled(LogLevel.Information)) if (Logger.IsEnabled(LogLevel.Information))
{ {
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}"); Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
@@ -108,11 +110,13 @@ namespace JT808.Gateway
if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{buffer.ToArray().ToHexString()}"); Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{buffer.ToArray().ToHexString()}");
} }
#pragma warning disable CA1031 // Do not catch general exception types
catch (Exception ex) catch (Exception ex)
{ {
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}"); Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}");
} }
#pragma warning restore CA1031 // Do not catch general exception types
} }
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {


+ 30
- 35
src/JT808.Gateway/Session/JT808SessionManager.cs Bestand weergeven

@@ -8,8 +8,6 @@ using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;


namespace JT808.Gateway.Session namespace JT808.Gateway.Session
{ {
@@ -22,7 +20,7 @@ namespace JT808.Gateway.Session
private readonly ILogger logger; private readonly ILogger logger;
private readonly IJT808SessionProducer JT808SessionProducer; private readonly IJT808SessionProducer JT808SessionProducer;
public ConcurrentDictionary<string, IJT808Session> Sessions { get; } public ConcurrentDictionary<string, IJT808Session> Sessions { get; }
public ConcurrentDictionary<string, string> TerminalPhoneNoSessions { get; }
public ConcurrentDictionary<string, IJT808Session> TerminalPhoneNoSessions { get; }
public JT808SessionManager( public JT808SessionManager(
IJT808SessionProducer jT808SessionProducer, IJT808SessionProducer jT808SessionProducer,
ILoggerFactory loggerFactory ILoggerFactory loggerFactory
@@ -30,14 +28,14 @@ namespace JT808.Gateway.Session
{ {
JT808SessionProducer = jT808SessionProducer; JT808SessionProducer = jT808SessionProducer;
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase); Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager"); logger = loggerFactory.CreateLogger("JT808SessionManager");
} }


public JT808SessionManager(ILoggerFactory loggerFactory) public JT808SessionManager(ILoggerFactory loggerFactory)
{ {
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase); Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager"); logger = loggerFactory.CreateLogger("JT808SessionManager");
} }


@@ -67,40 +65,44 @@ namespace JT808.Gateway.Session


internal void TryLink(string terminalPhoneNo, IJT808Session session) internal void TryLink(string terminalPhoneNo, IJT808Session session)
{ {
session.ActiveTime = DateTime.Now;
DateTime curretDatetime= DateTime.Now;
session.ActiveTime = curretDatetime;
session.TerminalPhoneNo = terminalPhoneNo; session.TerminalPhoneNo = terminalPhoneNo;
Sessions.TryUpdate(session.SessionID, session, session); Sessions.TryUpdate(session.SessionID, session, session);
TerminalPhoneNoSessions.AddOrUpdate(terminalPhoneNo, session.SessionID, (key, oldValue)=>
TerminalPhoneNoSessions.AddOrUpdate(terminalPhoneNo, session, (key, oldValue)=>
{ {
if(session.SessionID!= oldValue)
if(session.SessionID!= oldValue.SessionID)
{ {
//会话通知 //会话通知
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, key); JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, key);
return session.SessionID;
return session;
}
else
{
oldValue.StartTime = curretDatetime;
} }
return oldValue; return oldValue;
}); });
} }


public string TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint)
public IJT808Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint)
{ {
string sessionId = string.Empty;
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out sessionId))
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT808Session currentSession))
{ {
if (Sessions.TryGetValue(sessionId, out IJT808Session sessionInfo))
if (Sessions.TryGetValue(currentSession.SessionID, out IJT808Session sessionInfo))
{ {
sessionInfo.ActiveTime = DateTime.Now; sessionInfo.ActiveTime = DateTime.Now;
sessionInfo.TerminalPhoneNo = terminalPhoneNo; sessionInfo.TerminalPhoneNo = terminalPhoneNo;
sessionInfo.RemoteEndPoint = remoteEndPoint; sessionInfo.RemoteEndPoint = remoteEndPoint;
Sessions.TryUpdate(sessionId, sessionInfo, sessionInfo);
Sessions.TryUpdate(currentSession.SessionID, sessionInfo, sessionInfo);
} }
} }
else else
{ {
JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint); JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint);
Sessions.TryAdd(session.SessionID, session); Sessions.TryAdd(session.SessionID, session);
TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session.SessionID);
sessionId = session.SessionID;
TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session);
currentSession = session;
} }
//会话通知 //会话通知
//使用场景: //使用场景:
@@ -108,7 +110,7 @@ namespace JT808.Gateway.Session
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅 //有设备关联上来可以进行通知 例如:使用Redis发布订阅
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
return sessionId;
return currentSession;
} }


internal bool TryAdd(IJT808Session session) internal bool TryAdd(IJT808Session session)
@@ -118,24 +120,17 @@ namespace JT808.Gateway.Session


public bool TrySendByTerminalPhoneNo(string terminalPhoneNo, byte[] data) public bool TrySendByTerminalPhoneNo(string terminalPhoneNo, byte[] data)
{ {
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var sessionid))
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session))
{ {
if (Sessions.TryGetValue(sessionid, out var session))
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{ {
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
session.Client.Send(data, SocketFlags.None);
}
else
{
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
}
return true;
session.Client.Send(data, SocketFlags.None);
} }
else else
{ {
return false;
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
} }
return true;
} }
else else
{ {
@@ -165,11 +160,11 @@ namespace JT808.Gateway.Session


public void RemoveByTerminalPhoneNo(string terminalPhoneNo) public void RemoveByTerminalPhoneNo(string terminalPhoneNo)
{ {
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeSessionId))
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeTerminalPhoneNoSessions))
{ {
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == removeSessionId).Select(s => s.Key).ToList();
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == removeTerminalPhoneNoSessions.SessionID).Select(s => s.Key).ToList();
//2.存在则一个个移除 //2.存在则一个个移除
string tmpTerminalPhoneNo = terminalPhoneNo; string tmpTerminalPhoneNo = terminalPhoneNo;
if (terminalPhoneNos.Count > 0) if (terminalPhoneNos.Count > 0)
@@ -181,10 +176,10 @@ namespace JT808.Gateway.Session
} }
tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
} }
if (Sessions.TryRemove(removeSessionId, out var removeSession))
if (Sessions.TryRemove(removeTerminalPhoneNoSessions.SessionID, out var removeSession))
{ {
removeSession.Close(); removeSession.Close();
if(logger.IsEnabled(LogLevel.Information))
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo); JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
} }
@@ -193,9 +188,9 @@ namespace JT808.Gateway.Session


public void RemoveBySessionId(string sessionId) public void RemoveBySessionId(string sessionId)
{ {
if(Sessions.TryRemove(sessionId,out var removeSession))
if (Sessions.TryRemove(sessionId, out var removeSession))
{ {
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == sessionId).Select(s => s.Key).ToList();
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == sessionId).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0) if (terminalPhoneNos.Count > 0)
{ {
foreach (var item in terminalPhoneNos) foreach (var item in terminalPhoneNos)


Laden…
Annuleren
Opslaan