diff --git a/.github/workflows/dotnetcore.yml b/.github/workflows/dotnetcore.yml index da13394..5240dd8 100644 --- a/.github/workflows/dotnetcore.yml +++ b/.github/workflows/dotnetcore.yml @@ -12,7 +12,7 @@ jobs: - name: Setup .NET Core uses: actions/setup-dotnet@master with: - dotnet-version: 6.0.100 + dotnet-version: 6.0.400 - name: dotnet info run: dotnet --info - name: dotnet JT808.Gateway restore diff --git a/simples/global.json b/simples/global.json index 5395ff4..d769cd3 100644 --- a/simples/global.json +++ b/simples/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "6.0.101" + "version": "6.0.400" } } \ No newline at end of file diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj index 79375ed..57eb4f6 100644 --- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj @@ -16,10 +16,10 @@ - + - + diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj index 5ab1b93..6934ab3 100644 --- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj +++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj @@ -18,7 +18,7 @@ - + diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj index 9ce53d3..523a07e 100644 --- a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj +++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj @@ -6,7 +6,7 @@ - + diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj index c41bbd2..b1279a3 100644 --- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj +++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj @@ -10,10 +10,10 @@ - + - + diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs index 422e13e..b7ad940 100644 --- a/src/JT808.Gateway.Client/JT808TcpClient.cs +++ b/src/JT808.Gateway.Client/JT808TcpClient.cs @@ -58,36 +58,44 @@ namespace JT808.Gateway.Client clientSocket.Bind(new IPEndPoint(localIPAddress, DeviceConfig.LocalPort)); } await clientSocket.ConnectAsync(remoteEndPoint); - await Task.Factory.StartNew(async()=> { - while (!heartbeatCTS.IsCancellationRequested) + try + { + await Task.Factory.StartNew(async () => { - if (WriteableTimeout <= DateTime.UtcNow) + while (!heartbeatCTS.IsCancellationRequested) { - try + if (WriteableTimeout <= DateTime.UtcNow) { - if (Logger.IsEnabled(LogLevel.Information)) + try { - Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}"); + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}"); + } + if (DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011) + { + var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo); + await SendAsync(new JT808ClientRequest(package)); + } + else + { + var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo); + await SendAsync(new JT808ClientRequest(package)); + } } - if(DeviceConfig.Version== Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011) + catch (Exception ex) { - var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo); - await SendAsync(new JT808ClientRequest(package)); + Logger.LogError(ex, ""); } - else - { - var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo); - await SendAsync(new JT808ClientRequest(package)); - } - } - catch (Exception ex) - { - Logger.LogError(ex, ""); } + await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat), heartbeatCTS.Token); } - await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat)); - } - }, heartbeatCTS.Token); + }, heartbeatCTS.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default); + } + catch (Exception) + { + + } return true; } catch (Exception e) @@ -99,19 +107,26 @@ namespace JT808.Gateway.Client } public async void StartAsync(CancellationToken cancellationToken) { - await Task.Factory.StartNew(async (state) => + try { - var session = (Socket)state; - if (Logger.IsEnabled(LogLevel.Information)) + await Task.Factory.StartNew(async (state) => { - Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}"); - } - var pipe = new Pipe(); - Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken); - Task reading = ReadPipeAsync(session, pipe.Reader); - await Task.WhenAll(reading, writing); - RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig); - }, clientSocket); + var session = (Socket)state; + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}"); + } + var pipe = new Pipe(); + Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken); + Task reading = ReadPipeAsync(session, pipe.Reader); + await Task.WhenAll(reading, writing); + RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig); + }, clientSocket, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default); + } + catch (Exception) + { + + } } private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken) { diff --git a/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs b/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs index 34e48c4..a6d7ec9 100644 --- a/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs +++ b/src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs @@ -41,6 +41,7 @@ namespace JT808.Gateway.Client.Services { while (!stoppingToken.IsCancellationRequested) { + await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken); var clients = jT808TcpClientFactory.GetAll(); JT808Report report = new JT808Report() { @@ -60,7 +61,6 @@ namespace JT808.Gateway.Client.Services { sw.WriteLine(json); } - await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken); } } } diff --git a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs index 7028a1b..76f51a2 100644 --- a/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs +++ b/src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs @@ -37,35 +37,42 @@ namespace JT808.Gateway.Client.Services { new Thread(async () => { - foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken)) + try { - try + foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken)) { - jT808TcpClientFactory.Remove(item); - if (item.AutoReconnection) + try { - var result = await jT808TcpClientFactory.Create(item, cancellationToken); - if (result != null) + jT808TcpClientFactory.Remove(item); + if (item.AutoReconnection) { - if (logger.IsEnabled(LogLevel.Information)) + var result = await jT808TcpClientFactory.Create(item, cancellationToken); + if (result != null) { - logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}"); + if (logger.IsEnabled(LogLevel.Information)) + { + logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}"); + } } - } - else - { - if (logger.IsEnabled(LogLevel.Warning)) + else { - logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}"); + if (logger.IsEnabled(LogLevel.Warning)) + { + logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}"); + } } } } + catch (Exception ex) + { + logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}"); + await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken); + } } - catch (Exception ex) - { - logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}"); - await Task.Delay(TimeSpan.FromSeconds(5)); - } + } + catch (Exception) + { + } }).Start(); return Task.CompletedTask; diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj index e32a40b..41339f7 100644 --- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj @@ -8,7 +8,7 @@ JT808.Gateway.Kafka.xml - + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj index 0fb98c9..8a49f41 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj @@ -23,7 +23,7 @@ - + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs index 8e3c76f..d29a9b2 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs +++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs @@ -75,7 +75,7 @@ namespace JT808.Gateway.NormalHosting //httpclient客户端调用 services.AddHostedService(); //客户端测试 依赖AddClient()服务 - //services.AddHostedService(); + services.AddHostedService(); //需要跨域的 services.AddCors(options => diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj index 4b7008c..00befa3 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj @@ -10,7 +10,7 @@ - + diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj index 9094a47..1f19d2e 100644 --- a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj +++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj @@ -7,9 +7,9 @@ - + - + all diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index 93972e4..e43c76a 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -95,28 +95,37 @@ namespace JT808.Gateway public Task StartAsync(CancellationToken cancellationToken) { Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}."); - Task.Factory.StartNew(async () => + Task.Run(async () => { while (!cancellationToken.IsCancellationRequested) { - var socket = await server.AcceptAsync(); - JT808TcpSession jT808TcpSession = new JT808TcpSession(socket); - SessionManager.TryAdd(jT808TcpSession); - await Task.Factory.StartNew(async (state) => + try { - var session = (JT808TcpSession)state; - if (Logger.IsEnabled(LogLevel.Information)) + var socket = await server.AcceptAsync(); + JT808TcpSession jT808TcpSession = new JT808TcpSession(socket); + SessionManager.TryAdd(jT808TcpSession); + await Task.Factory.StartNew(async (state) => { - Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); - } - var pipe = new Pipe(); - Task writing = FillPipeAsync(session, pipe.Writer); - Task reading = ReadPipeAsync(session, pipe.Reader); - await Task.WhenAll(reading, writing); - SessionManager.RemoveBySessionId(session.SessionID); - }, jT808TcpSession); + var session = (JT808TcpSession)state; + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); + } + var pipe = new Pipe(); + Task writing = FillPipeAsync(session, pipe.Writer); + Task reading = ReadPipeAsync(session, pipe.Reader); + await Task.WhenAll(reading, writing); + SessionManager.RemoveBySessionId(session.SessionID); + }, jT808TcpSession, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default); + } + catch (OperationCanceledException) + { + } + catch (Exception) + { + } } - }, cancellationToken); + }); return Task.CompletedTask; } private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer) diff --git a/src/JT808.Gateway/JT808UdpServer.cs b/src/JT808.Gateway/JT808UdpServer.cs index 63d1499..f00344c 100644 --- a/src/JT808.Gateway/JT808UdpServer.cs +++ b/src/JT808.Gateway/JT808UdpServer.cs @@ -62,7 +62,7 @@ namespace JT808.Gateway public Task StartAsync(CancellationToken cancellationToken) { Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.UdpPort}."); - Task.Factory.StartNew(async() => { + Task.Run(async () => { while (!cancellationToken.IsCancellationRequested) { var buffer = ArrayPool.Shared.Rent(ConfigurationMonitor.CurrentValue.MiniNumBufferSize); @@ -93,7 +93,7 @@ namespace JT808.Gateway ArrayPool.Shared.Return(buffer); } } - }, cancellationToken); + }); return Task.CompletedTask; } private void ReaderBuffer(ReadOnlySpan buffer, Socket socket,SocketReceiveMessageFromResult receiveMessageFromResult) diff --git a/src/global.json b/src/global.json index 5395ff4..d769cd3 100644 --- a/src/global.json +++ b/src/global.json @@ -1,5 +1,5 @@ { "sdk": { - "version": "6.0.101" + "version": "6.0.400" } } \ No newline at end of file