Просмотр исходного кода

1.升级808库

2.调整Task的用法
pull/25/head
SmallChi(Koike) 2 лет назад
Родитель
Сommit
d750d41625
17 измененных файлов: 115 добавлений и 84 удалений
  1. +1
    -1
      .github/workflows/dotnetcore.yml
  2. +1
    -1
      simples/global.json
  3. +2
    -2
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  4. +1
    -1
      src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
  5. +1
    -1
      src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
  6. +2
    -2
      src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
  7. +47
    -32
      src/JT808.Gateway.Client/JT808TcpClient.cs
  8. +1
    -1
      src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs
  9. +25
    -18
      src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs
  10. +1
    -1
      src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
  11. +1
    -1
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
  12. +1
    -1
      src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs
  13. +1
    -1
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
  14. +2
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
  15. +25
    -16
      src/JT808.Gateway/JT808TcpServer.cs
  16. +2
    -2
      src/JT808.Gateway/JT808UdpServer.cs
  17. +1
    -1
      src/global.json

+ 1
- 1
.github/workflows/dotnetcore.yml Просмотреть файл

@@ -12,7 +12,7 @@ jobs:
- name: Setup .NET Core
uses: actions/setup-dotnet@master
with:
dotnet-version: 6.0.100
dotnet-version: 6.0.400
- name: dotnet info
run: dotnet --info
- name: dotnet JT808.Gateway restore


+ 1
- 1
simples/global.json Просмотреть файл

@@ -1,5 +1,5 @@
{
"sdk": {
"version": "6.0.101"
"version": "6.0.400"
}
}

+ 2
- 2
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj Просмотреть файл

@@ -16,10 +16,10 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="JT808" Version="2.5.0-preview3" />
<PackageReference Include="JT808" Version="2.5.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.2" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="6.0.0" />
</ItemGroup>
<ItemGroup>


+ 1
- 1
src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj Просмотреть файл

@@ -18,7 +18,7 @@
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="NLog.Extensions.Logging" Version="5.0.4" />
</ItemGroup>


+ 1
- 1
src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj Просмотреть файл

@@ -6,7 +6,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="5.0.4" />


+ 2
- 2
src/JT808.Gateway.Client/JT808.Gateway.Client.csproj Просмотреть файл

@@ -10,10 +10,10 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JT808" Version="2.5.0-preview3" />
<PackageReference Include="JT808" Version="2.5.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="6.0.0" />
<PackageReference Include="System.IO.Pipelines" Version="6.0.3" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="6.0.2" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="6.0.0" />
</ItemGroup>
<ItemGroup>


+ 47
- 32
src/JT808.Gateway.Client/JT808TcpClient.cs Просмотреть файл

@@ -58,36 +58,44 @@ namespace JT808.Gateway.Client
clientSocket.Bind(new IPEndPoint(localIPAddress, DeviceConfig.LocalPort));
}
await clientSocket.ConnectAsync(remoteEndPoint);
await Task.Factory.StartNew(async()=> {
while (!heartbeatCTS.IsCancellationRequested)
try
{
await Task.Factory.StartNew(async () =>
{
if (WriteableTimeout <= DateTime.UtcNow)
while (!heartbeatCTS.IsCancellationRequested)
{
try
if (WriteableTimeout <= DateTime.UtcNow)
{
if (Logger.IsEnabled(LogLevel.Information))
try
{
Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}");
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}");
}
if (DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011)
{
var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
}
else
{
var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
}
}
if(DeviceConfig.Version== Protocol.Enums.JT808Version.JTT2013 || DeviceConfig.Version == Protocol.Enums.JT808Version.JTT2011)
catch (Exception ex)
{
var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
Logger.LogError(ex, "");
}
else
{
var package = JT808.Protocol.Enums.JT808MsgId._0x0002.Create2019(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
}
}
catch (Exception ex)
{
Logger.LogError(ex, "");
}
await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat), heartbeatCTS.Token);
}
await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat));
}
}, heartbeatCTS.Token);
}, heartbeatCTS.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default);
}
catch (Exception)
{

}
return true;
}
catch (Exception e)
@@ -99,19 +107,26 @@ namespace JT808.Gateway.Client
}
public async void StartAsync(CancellationToken cancellationToken)
{
await Task.Factory.StartNew(async (state) =>
try
{
var session = (Socket)state;
if (Logger.IsEnabled(LogLevel.Information))
await Task.Factory.StartNew(async (state) =>
{
Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
}, clientSocket);
var session = (Socket)state;
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
RetryBlockingCollection.RetryBlockingCollection.Add(DeviceConfig);
}, clientSocket, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
}
catch (Exception)
{

}
}
private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken)
{


+ 1
- 1
src/JT808.Gateway.Client/Services/JT808ReportHostedService.cs Просмотреть файл

@@ -41,6 +41,7 @@ namespace JT808.Gateway.Client.Services
{
while (!stoppingToken.IsCancellationRequested)
{
await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken);
var clients = jT808TcpClientFactory.GetAll();
JT808Report report = new JT808Report()
{
@@ -60,7 +61,6 @@ namespace JT808.Gateway.Client.Services
{
sw.WriteLine(json);
}
await Task.Delay(TimeSpan.FromSeconds(jT808ReportOptions.CurrentValue.Interval), stoppingToken);
}
}
}


+ 25
- 18
src/JT808.Gateway.Client/Services/JT808RetryClientHostedService.cs Просмотреть файл

@@ -37,35 +37,42 @@ namespace JT808.Gateway.Client.Services
{
new Thread(async () =>
{
foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken))
try
{
try
foreach (var item in RetryBlockingCollection.RetryBlockingCollection.GetConsumingEnumerable(cancellationToken))
{
jT808TcpClientFactory.Remove(item);
if (item.AutoReconnection)
try
{
var result = await jT808TcpClientFactory.Create(item, cancellationToken);
if (result != null)
jT808TcpClientFactory.Remove(item);
if (item.AutoReconnection)
{
if (logger.IsEnabled(LogLevel.Information))
var result = await jT808TcpClientFactory.Create(item, cancellationToken);
if (result != null)
{
logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"Retry Success-{JsonSerializer.Serialize(item)}");
}
}
}
else
{
if (logger.IsEnabled(LogLevel.Warning))
else
{
logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
if (logger.IsEnabled(LogLevel.Warning))
{
logger.LogWarning($"Retry Fail-{JsonSerializer.Serialize(item)}");
}
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
await Task.Delay(TimeSpan.FromSeconds(5), cancellationToken);
}
}
catch (Exception ex)
{
logger.LogError(ex, $"Retry Error-{JsonSerializer.Serialize(item)}");
await Task.Delay(TimeSpan.FromSeconds(5));
}
}
catch (Exception)
{

}
}).Start();
return Task.CompletedTask;


+ 1
- 1
src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj Просмотреть файл

@@ -8,7 +8,7 @@
<DocumentationFile>JT808.Gateway.Kafka.xml</DocumentationFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.9.2" />
<PackageReference Include="Confluent.Kafka" Version="1.9.3" />
</ItemGroup>

<ItemGroup>


+ 1
- 1
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj Просмотреть файл

@@ -23,7 +23,7 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="5.0.4" />


+ 1
- 1
src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/Program.cs Просмотреть файл

@@ -75,7 +75,7 @@ namespace JT808.Gateway.NormalHosting
//httpclient客户端调用
services.AddHostedService<CallHttpClientJob>();
//客户端测试 依赖AddClient()服务
//services.AddHostedService<UpJob>();
services.AddHostedService<UpJob>();
//需要跨域的
services.AddCors(options =>


+ 1
- 1
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj Просмотреть файл

@@ -10,7 +10,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="6.0.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.17.0" />
<PackageReference Include="NLog.Extensions.Logging" Version="5.0.4" />


+ 2
- 2
src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj Просмотреть файл

@@ -7,9 +7,9 @@

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="6.0.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="6.0.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="17.3.2" />
<PackageReference Include="xunit" Version="2.4.2" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.5">
<PrivateAssets>all</PrivateAssets>


+ 25
- 16
src/JT808.Gateway/JT808TcpServer.cs Просмотреть файл

@@ -95,28 +95,37 @@ namespace JT808.Gateway
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.TcpPort}.");
Task.Factory.StartNew(async () =>
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
var socket = await server.AcceptAsync();
JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
SessionManager.TryAdd(jT808TcpSession);
await Task.Factory.StartNew(async (state) =>
try
{
var session = (JT808TcpSession)state;
if (Logger.IsEnabled(LogLevel.Information))
var socket = await server.AcceptAsync();
JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
SessionManager.TryAdd(jT808TcpSession);
await Task.Factory.StartNew(async (state) =>
{
Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
SessionManager.RemoveBySessionId(session.SessionID);
}, jT808TcpSession);
var session = (JT808TcpSession)state;
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
SessionManager.RemoveBySessionId(session.SessionID);
}, jT808TcpSession, cancellationToken, TaskCreationOptions.PreferFairness, TaskScheduler.Default);
}
catch (OperationCanceledException)
{
}
catch (Exception)
{
}
}
}, cancellationToken);
});
return Task.CompletedTask;
}
private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)


+ 2
- 2
src/JT808.Gateway/JT808UdpServer.cs Просмотреть файл

@@ -62,7 +62,7 @@ namespace JT808.Gateway
public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{ConfigurationMonitor.CurrentValue.UdpPort}.");
Task.Factory.StartNew(async() => {
Task.Run(async () => {
while (!cancellationToken.IsCancellationRequested)
{
var buffer = ArrayPool<byte>.Shared.Rent(ConfigurationMonitor.CurrentValue.MiniNumBufferSize);
@@ -93,7 +93,7 @@ namespace JT808.Gateway
ArrayPool<byte>.Shared.Return(buffer);
}
}
}, cancellationToken);
});
return Task.CompletedTask;
}
private void ReaderBuffer(ReadOnlySpan<byte> buffer, Socket socket,SocketReceiveMessageFromResult receiveMessageFromResult)


+ 1
- 1
src/global.json Просмотреть файл

@@ -1,5 +1,5 @@
{
"sdk": {
"version": "6.0.101"
"version": "6.0.400"
}
}

Загрузка…
Отмена
Сохранить