Browse Source

完善客户端心跳发送

tags/pipeline-1.1.0
SmallChi(Koike) 4 years ago
parent
commit
2f2fd989cb
1 changed files with 52 additions and 2 deletions
  1. +52
    -2
      src/JT808.Gateway.Client/JT808TcpClient.cs

+ 52
- 2
src/JT808.Gateway.Client/JT808TcpClient.cs View File

@@ -13,13 +13,13 @@ using JT808.Gateway.Client.Services;
using JT808.Gateway.Client.Metadata; using JT808.Gateway.Client.Metadata;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using JT808.Gateway.Client.Internal; using JT808.Gateway.Client.Internal;
using JT808.Protocol.Enums;


namespace JT808.Gateway.Client namespace JT808.Gateway.Client
{ {


public class JT808TcpClient:IDisposable public class JT808TcpClient:IDisposable
{ {
//todo: 客户端心跳时间
private bool disposed = false; private bool disposed = false;
private Socket clientSocket; private Socket clientSocket;
private readonly ILogger Logger; private readonly ILogger Logger;
@@ -30,11 +30,15 @@ namespace JT808.Gateway.Client
private bool socketState = true; private bool socketState = true;
public JT808DeviceConfig DeviceConfig { get; } public JT808DeviceConfig DeviceConfig { get; }
private IJT808MessageProducer producer; private IJT808MessageProducer producer;
private Task heartbeatTask;
private CancellationTokenSource heartbeatCTS;
public JT808TcpClient( public JT808TcpClient(
JT808DeviceConfig deviceConfig, JT808DeviceConfig deviceConfig,
IServiceProvider serviceProvider) IServiceProvider serviceProvider)
{ {
DeviceConfig = deviceConfig; DeviceConfig = deviceConfig;
WriteableTimeout = DateTime.UtcNow.AddSeconds(DeviceConfig.Heartbeat);
heartbeatCTS = new CancellationTokenSource();
SendAtomicCounterService = serviceProvider.GetRequiredService<JT808SendAtomicCounterService>(); SendAtomicCounterService = serviceProvider.GetRequiredService<JT808SendAtomicCounterService>();
ReceiveAtomicCounterService = serviceProvider.GetRequiredService<JT808ReceiveAtomicCounterService>(); ReceiveAtomicCounterService = serviceProvider.GetRequiredService<JT808ReceiveAtomicCounterService>();
JT808Serializer = serviceProvider.GetRequiredService<IJT808Config>().GetSerializer(); JT808Serializer = serviceProvider.GetRequiredService<IJT808Config>().GetSerializer();
@@ -48,6 +52,36 @@ namespace JT808.Gateway.Client
try try
{ {
await clientSocket.ConnectAsync(remoteEndPoint); await clientSocket.ConnectAsync(remoteEndPoint);
await Task.Factory.StartNew(async()=> {
while (!heartbeatCTS.IsCancellationRequested)
{
if (WriteableTimeout <= DateTime.UtcNow)
{
try
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"{DeviceConfig.Heartbeat}s send heartbeat:{DeviceConfig.TerminalPhoneNo}-{DeviceConfig.Version.ToString()}");
}
if(DeviceConfig.Version== Protocol.Enums.JT808Version.JTT2013)
{
var package = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
}
else
{
var package = JT808.Protocol.Enums.JT808MsgId.终端心跳.Create2019(DeviceConfig.TerminalPhoneNo);
await SendAsync(new JT808ClientRequest(package));
}
}
catch (Exception ex)
{
Logger.LogError(ex, "");
}
}
await Task.Delay(TimeSpan.FromSeconds(DeviceConfig.Heartbeat));
}
}, heartbeatCTS.Token);
return true; return true;
} }
catch (Exception e) catch (Exception e)
@@ -208,6 +242,7 @@ namespace JT808.Gateway.Client
var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize); var sendData = JT808Serializer.Serialize(message.Package, minBufferSize: message.MinBufferSize);
await clientSocket.SendAsync(sendData, SocketFlags.None); await clientSocket.SendAsync(sendData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement(); SendAtomicCounterService.MsgSuccessIncrement();
WriteableTimeout = DateTime.UtcNow.AddSeconds(DeviceConfig.Heartbeat);
return true; return true;
} }
catch (System.Net.Sockets.SocketException ex) catch (System.Net.Sockets.SocketException ex)
@@ -228,6 +263,7 @@ namespace JT808.Gateway.Client
{ {
await clientSocket.SendAsync(message.HexData, SocketFlags.None); await clientSocket.SendAsync(message.HexData, SocketFlags.None);
SendAtomicCounterService.MsgSuccessIncrement(); SendAtomicCounterService.MsgSuccessIncrement();
WriteableTimeout = DateTime.UtcNow.AddSeconds(DeviceConfig.Heartbeat);
return true; return true;
} }
catch (System.Net.Sockets.SocketException ex) catch (System.Net.Sockets.SocketException ex)
@@ -245,7 +281,6 @@ namespace JT808.Gateway.Client
} }
return false; return false;
} }

public void Close() public void Close()
{ {
if (disposed) return; if (disposed) return;
@@ -253,6 +288,7 @@ namespace JT808.Gateway.Client
try try
{ {
clientSocket?.Shutdown(SocketShutdown.Both); clientSocket?.Shutdown(SocketShutdown.Both);
heartbeatShutdown();
} }
finally finally
{ {
@@ -260,6 +296,17 @@ namespace JT808.Gateway.Client
} }
} }


private void heartbeatShutdown()
{
try
{
heartbeatCTS.Cancel();
}
catch (Exception ex)
{
Logger.LogError(ex, "");
}
}
private void Dispose(bool disposing) private void Dispose(bool disposing)
{ {
if (disposed) return; if (disposed) return;
@@ -267,10 +314,13 @@ namespace JT808.Gateway.Client
{ {
// 清理托管资源 // 清理托管资源
clientSocket.Dispose(); clientSocket.Dispose();
heartbeatCTS.Dispose();
} }
disposed = true; disposed = true;
} }


public DateTime WriteableTimeout { get; private set; }

~JT808TcpClient() ~JT808TcpClient()
{ {
//必须为false //必须为false


Loading…
Cancel
Save