From d750d41625680e6e9f4637bcab2a5a1d1b48f196 Mon Sep 17 00:00:00 2001
From: "SmallChi(Koike)" <564952747@qq.com>
Date: Wed, 26 Oct 2022 09:31:29 +0800
Subject: [PATCH] =?UTF-8?q?1.=E5=8D=87=E7=BA=A7808=E5=BA=93=202.=E8=B0=83?=
=?UTF-8?q?=E6=95=B4Task=E7=9A=84=E7=94=A8=E6=B3=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.github/workflows/dotnetcore.yml | 2 +-
simples/global.json | 2 +-
.../JT808.Gateway.Abstractions.csproj | 4 +-
.../JT808.Gateway.CleintBenchmark.csproj | 2 +-
.../JT808.Gateway.ServerBenchmark.csproj | 2 +-
.../JT808.Gateway.Client.csproj | 4 +-
src/JT808.Gateway.Client/JT808TcpClient.cs | 79 +++++++++++--------
.../Services/JT808ReportHostedService.cs | 2 +-
.../Services/JT808RetryClientHostedService.cs | 43 +++++-----
.../JT808.Gateway.Kafka.csproj | 2 +-
.../JT808.Gateway.NormalHosting.csproj | 2 +-
.../JT808.Gateway.NormalHosting/Program.cs | 2 +-
.../JT808.Gateway.QueueHosting.csproj | 2 +-
.../JT808.Gateway.Test.csproj | 4 +-
src/JT808.Gateway/JT808TcpServer.cs | 41 ++++++----
src/JT808.Gateway/JT808UdpServer.cs | 4 +-
src/global.json | 2 +-
17 files changed, 115 insertions(+), 84 deletions(-)
diff --git a/.github/workflows/dotnetcore.yml b/.github/workflows/dotnetcore.yml
index da13394..5240dd8 100644
--- a/.github/workflows/dotnetcore.yml
+++ b/.github/workflows/dotnetcore.yml
@@ -12,7 +12,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@master
with:
- dotnet-version: 6.0.100
+ dotnet-version: 6.0.400
- name: dotnet info
run: dotnet --info
- name: dotnet JT808.Gateway restore
diff --git a/simples/global.json b/simples/global.json
index 5395ff4..d769cd3 100644
--- a/simples/global.json
+++ b/simples/global.json
@@ -1,5 +1,5 @@
{
"sdk": {
- "version": "6.0.101"
+ "version": "6.0.400"
}
}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
index 79375ed..57eb4f6 100644
--- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
+++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
@@ -16,10 +16,10 @@
-
+
-
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
index 5ab1b93..6934ab3 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
@@ -18,7 +18,7 @@
-
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
index 9ce53d3..523a07e 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
@@ -6,7 +6,7 @@
-
+
diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
index c41bbd2..b1279a3 100644
--- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
+++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
@@ -10,10 +10,10 @@
-
+
-
+
diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs
index 422e13e..b7ad940 100644
--- a/src/JT808.Gateway.Client/JT808TcpClient.cs
+++ b/src/JT808.Gateway.Client/JT808TcpClient.cs
@@ -58,36 +58,44 @@ namespace JT808.Gateway.Client
clientSocket.Bind(new IPEndPoint(localIPAddress, DeviceConfig.LocalPort));
}
await clientSocket.ConnectAsync(remoteEndPoint);
- await Task.Factory.StartNew(async()=> {
- while (!heartbeatCTS.IsCancellationRequested)
+ try
+ {
+ await Task.Factory.StartNew(async () =>
{
- if (WriteableTimeout <= DateTime.UtcNow)
+ while (!heartbeatCTS.IsCancellationRequested)
{
- try
+ if (WriteableTimeout <= DateTime.UtcNow)
{
- if (Logger.IsEnabled(LogLevel.Information))
+ try
{
- Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}");
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}");
+ }
+ if (DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011)
+ {
+ var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo);
+ await SendAsync(new JT808ClientRequest(package));
+ }
+ else
+ {
+ var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo);
+ await SendAsync(new JT808ClientRequest(package));
+ }
}
- if(DeviceConfig.Version== Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011)
+ catch (Exception ex)
{
- var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo);
- await SendAsync(new JT808ClientRequest(package));
+ Logger.LogError(ex, "");
}
- else
- {
- var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo);
- await SendAsync(new JT808ClientRequest(package));
- }
- }
- catch (Exception ex)
- {
- Logger.LogError(ex, "");
}
+ await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat), heartbeatCTS.Token);
}
- await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat));
- }
- }, heartbeatCTS.Token);
+ }, heartbeatCTS.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
+ }
+ catch (Exception)
+ {
+
+ }
return true;
}
catch (Exception e)
@@ -99,19 +107,26 @@ namespace JT808.Gateway.Client
}
public async void StartAsync(CancellationToken cancellationToken)
{
- await Task.Factory.StartNew(async (state) =>
+ try
{
- var session = (Socket)state;
- if (Logger.IsEnabled(LogLevel.Information))
+ await Task.Factory.StartNew(async (state) =>
{
- Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
- }
- var pipe = new Pipe();
- Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
- Task reading = ReadPipeAsync(session, pipe.Reader);
- await Task.WhenAll(reading, writing);
- RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
- }, clientSocket);
+ var session = (Socket)state;
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
+ }
+ var pipe = new Pipe();
+ Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
+ Task reading = ReadPipeAsync(session, pipe.Reader);
+ await Task.WhenAll(reading, writing);
+ RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
+ }, clientSocket, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
+ }
+ catch (Exception)
+ {
+
+ }
}
private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken)
{
diff --git a/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs b/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs
index 34e48c4..a6d7ec9 100644
--- a/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs
+++ b/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs
@@ -41,6 +41,7 @@ namespace JT808.Gateway.Client.Services
{
while (!stoppingToken.IsCancellationRequested)
{
+ await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken);
var clients = jT808TcpClientFactory.GetAll();
JT808Report report = new JT808Report()
{
@@ -60,7 +61,6 @@ namespace JT808.Gateway.Client.Services
{
sw.WriteLine(json);
}
- await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken);
}
}
}
diff --git a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
index 7028a1b..76f51a2 100644
--- a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
+++ b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
@@ -37,35 +37,42 @@ namespace JT808.Gateway.Client.Services
{
new Thread(async () =>
{
- foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken))
+ try
{
- try
+ foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken))
{
- jT808TcpClientFactory.Remove(item);
- if (item.AutoReconnection)
+ try
{
- var result = await jT808TcpClientFactory.Create(item, cancellationToken);
- if (result != null)
+ jT808TcpClientFactory.Remove(item);
+ if (item.AutoReconnection)
{
- if (logger.IsEnabled(LogLevel.Information))
+ var result = await jT808TcpClientFactory.Create(item, cancellationToken);
+ if (result != null)
{
- logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
+ if (logger.IsEnabled(LogLevel.Information))
+ {
+ logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
+ }
}
- }
- else
- {
- if (logger.IsEnabled(LogLevel.Warning))
+ else
{
- logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
+ if (logger.IsEnabled(LogLevel.Warning))
+ {
+ logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
+ }
}
}
}
+ catch (Exception ex)
+ {
+ logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
+ await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
+ }
}
- catch (Exception ex)
- {
- logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
- await Task.Delay(TimeSpan.FromSeconds(5));
- }
+ }
+ catch (Exception)
+ {
+
}
}).Start();
return Task.CompletedTask;
diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
index e32a40b..41339f7 100644
--- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
+++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
@@ -8,7 +8,7 @@
JT808.Gateway.Kafka.xml
-
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
index 0fb98c9..8a49f41 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
@@ -23,7 +23,7 @@
-
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
index 8e3c76f..d29a9b2 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
@@ -75,7 +75,7 @@ namespace JT808.Gateway.NormalHosting
//httpclient客户端调用
services.AddHostedService();
//客户端测试 依赖AddClient()服务
- //services.AddHostedService();
+ services.AddHostedService();
//需要跨域的
services.AddCors(options =>
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
index 4b7008c..00befa3 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
@@ -10,7 +10,7 @@
-
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
index 9094a47..1f19d2e 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
@@ -7,9 +7,9 @@
-
+
-
+
all
diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs
index 93972e4..e43c76a 100644
--- a/src/JT808.Gateway/JT808TcpServer.cs
+++ b/src/JT808.Gateway/JT808TcpServer.cs
@@ -95,28 +95,37 @@ namespace JT808.Gateway
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
- Task.Factory.StartNew(async () =>
+ Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
- var socket = await server.AcceptAsync();
- JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
- SessionManager.TryAdd(jT808TcpSession);
- await Task.Factory.StartNew(async (state) =>
+ try
{
- var session = (JT808TcpSession)state;
- if (Logger.IsEnabled(LogLevel.Information))
+ var socket = await server.AcceptAsync();
+ JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
+ SessionManager.TryAdd(jT808TcpSession);
+ await Task.Factory.StartNew(async (state) =>
{
- Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
- }
- var pipe = new Pipe();
- Task writing = FillPipeAsync(session, pipe.Writer);
- Task reading = ReadPipeAsync(session, pipe.Reader);
- await Task.WhenAll(reading, writing);
- SessionManager.RemoveBySessionId(session.SessionID);
- }, jT808TcpSession);
+ var session = (JT808TcpSession)state;
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
+ }
+ var pipe = new Pipe();
+ Task writing = FillPipeAsync(session, pipe.Writer);
+ Task reading = ReadPipeAsync(session, pipe.Reader);
+ await Task.WhenAll(reading, writing);
+ SessionManager.RemoveBySessionId(session.SessionID);
+ }, jT808TcpSession, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
+ }
+ catch (OperationCanceledException)
+ {
+ }
+ catch (Exception)
+ {
+ }
}
- }, cancellationToken);
+ });
return Task.CompletedTask;
}
private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)
diff --git a/src/JT808.Gateway/JT808UdpServer.cs b/src/JT808.Gateway/JT808UdpServer.cs
index 63d1499..f00344c 100644
--- a/src/JT808.Gateway/JT808UdpServer.cs
+++ b/src/JT808.Gateway/JT808UdpServer.cs
@@ -62,7 +62,7 @@ namespace JT808.Gateway
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.UdpPort}.");
- Task.Factory.StartNew(async() => {
+ Task.Run(async () => {
while (!cancellationToken.IsCancellationRequested)
{
var buffer = ArrayPool.Shared.Rent(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
@@ -93,7 +93,7 @@ namespace JT808.Gateway
ArrayPool.Shared.Return(buffer);
}
}
- }, cancellationToken);
+ });
return Task.CompletedTask;
}
private void ReaderBuffer(ReadOnlySpan buffer, Socket socket,SocketReceiveMessageFromResult receiveMessageFromResult)
diff --git a/src/global.json b/src/global.json
index 5395ff4..d769cd3 100644
--- a/src/global.json
+++ b/src/global.json
@@ -1,5 +1,5 @@
{
"sdk": {
- "version": "6.0.101"
+ "version": "6.0.400"
}
}
\ No newline at end of file