Browse Source

1.修改消息和消息应答消费服务为异步方式

2.修改客户端为串行下发
tags/pipeline-1.0.0
smallchi(Koike) 5 years ago
parent
commit
025c182ddc
10 changed files with 62 additions and 75 deletions
  1. +4
    -36
      src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs
  2. +33
    -22
      src/JT808.Gateway.Client/JT808TcpClient.cs
  3. +1
    -1
      src/JT808.Gateway.Client/JT808TcpClientFactory.cs
  4. +3
    -5
      src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs
  5. +3
    -5
      src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs
  6. +5
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs
  7. +5
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs
  8. +1
    -1
      src/JT808.Gateway.TestHosting/startup.txt
  9. +5
    -3
      src/JT808.Gateway/JT808TcpServer.cs
  10. +2
    -2
      src/JT808.Gateway/JT808UdpServer.cs

+ 4
- 36
src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs View File

@@ -9,6 +9,7 @@ using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -24,8 +25,6 @@ namespace JT808.Gateway.CleintBenchmark.Services

private readonly IJT808TcpClientFactory jT808TcpClientFactory;

private ConcurrentQueue<string> failDeviceNoQueue;

public CleintBenchmarkHostedService(
ILoggerFactory loggerFactory,
IJT808TcpClientFactory jT808TcpClientFactory,
@@ -42,49 +41,18 @@ namespace JT808.Gateway.CleintBenchmark.Services
ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads);
logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}");
logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}");
//先建立连接
failDeviceNoQueue = new ConcurrentQueue<string>();
for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++)
{
string deviceNo = (i + 1 + clientBenchmarkOptions.DeviceTemplate).ToString();
var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo,
clientBenchmarkOptions.IP,
clientBenchmarkOptions.Port), cancellationToken);
if (client == null)
{
failDeviceNoQueue.Enqueue(deviceNo);
}
}
int successCount = clientBenchmarkOptions.DeviceCount - failDeviceNoQueue.Count;
logger.LogInformation($"总连接数:{clientBenchmarkOptions.DeviceCount}");
logger.LogInformation($"已建立连接数:{successCount}");
logger.LogInformation($"失败连接数:{failDeviceNoQueue.Count}");
Task.Run(() => {
while (!cancellationToken.IsCancellationRequested)
{
if(failDeviceNoQueue.TryDequeue(out string deviceNo))
{
logger.LogInformation($"尝试重连{deviceNo}...");
var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo,
clientBenchmarkOptions.IP,
clientBenchmarkOptions.Port), cancellationToken);
if (client == null)
{
failDeviceNoQueue.Enqueue(deviceNo);
}
Thread.Sleep(1000);
}
else
{
Thread.Sleep(3000);
}
}
}, cancellationToken);
ThreadPool.QueueUserWorkItem((state) =>
{
while (!cancellationToken.IsCancellationRequested)
{
Parallel.ForEach(jT808TcpClientFactory.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 100 }, (item) =>
foreach (var item in jT808TcpClientFactory.GetAll())
{
try
{
@@ -106,8 +74,8 @@ namespace JT808.Gateway.CleintBenchmark.Services
{
logger.LogError(ex.Message);
}
});
Thread.Sleep(clientBenchmarkOptions.Interval);
}
Thread.Sleep(100);
}
});
return Task.CompletedTask;


+ 33
- 22
src/JT808.Gateway.Client/JT808TcpClient.cs View File

@@ -23,6 +23,7 @@ namespace JT808.Gateway.Client
private readonly JT808Serializer JT808Serializer;
private readonly JT808SendAtomicCounterService SendAtomicCounterService;
private readonly JT808ReceiveAtomicCounterService ReceiveAtomicCounterService;
private bool socketState = true;
public JT808DeviceConfig DeviceConfig { get; }
public JT808TcpClient(
JT808DeviceConfig deviceConfig,
@@ -47,32 +48,29 @@ namespace JT808.Gateway.Client
return false;
}
}
public Task StartAsync(CancellationToken cancellationToken)
public async void StartAsync(CancellationToken cancellationToken)
{
Task.Run(async () => {
await Task.Factory.StartNew(async (state) =>
await Task.Factory.StartNew(async (state) =>
{
var session = (Socket)state;
if (Logger.IsEnabled(LogLevel.Information))
{
var session = (Socket)state;
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
}, clientSocket);
}, cancellationToken);
return Task.CompletedTask;
Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
}, clientSocket);
}
private async Task FillPipeAsync(Socket session, PipeWriter writer)
private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken)
{
while (true)
{
try
{
Memory<byte> memory = writer.GetMemory(10240);
int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None);
int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None, cancellationToken);
if (bytesRead == 0)
{
break;
@@ -181,7 +179,7 @@ namespace JT808.Gateway.Client
public void Send(JT808ClientRequest message)
{
if (disposed) return;
if (IsOpen)
if (IsOpen && socketState)
{
if (message.Package != null)
{
@@ -193,7 +191,8 @@ namespace JT808.Gateway.Client
}
catch (System.Net.Sockets.SocketException ex)
{
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]");
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
}
catch (System.Exception ex)
{
@@ -202,8 +201,20 @@ namespace JT808.Gateway.Client
}
else if (message.HexData != null)
{
clientSocket.Send(message.HexData);
SendAtomicCounterService.MsgSuccessIncrement();
try
{
clientSocket.Send(message.HexData);
SendAtomicCounterService.MsgSuccessIncrement();
}
catch (System.Net.Sockets.SocketException ex)
{
socketState = false;
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]");
}
catch (System.Exception ex)
{
Logger.LogError(ex.Message);
}
}
}
}
@@ -260,7 +271,7 @@ namespace JT808.Gateway.Client
if (disposed) return false;
if (clientSocket != null)
{
return clientSocket.Connected;
return clientSocket.Connected && socketState;
}
return false;
}


+ 1
- 1
src/JT808.Gateway.Client/JT808TcpClientFactory.cs View File

@@ -41,7 +41,7 @@ namespace JT808.Gateway.Client
var successed= await jT808TcpClient.ConnectAsync(new IPEndPoint(IPAddress.Parse(deviceConfig.TcpHost), deviceConfig.TcpPort));
if (successed)
{
await jT808TcpClient.StartAsync(cancellationToken);
jT808TcpClient.StartAsync(cancellationToken);
dict.TryAdd(deviceConfig.TerminalPhoneNo, jT808TcpClient);
return jT808TcpClient;
}


+ 3
- 5
src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs View File

@@ -25,16 +25,14 @@ namespace JT808.Gateway.InMemoryMQ

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
if (JT808MsgService.TryRead(out var item))
{
callback(item);
}
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{


+ 3
- 5
src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs View File

@@ -28,16 +28,14 @@ namespace JT808.Gateway.InMemoryMQ

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
if (JT808ReplyMsgService.TryRead(out var item))
{
callback(item);
}
var item= await JT808ReplyMsgService.ReadAsync(Cts.Token);
callback(item);
}
catch (Exception ex)
{


+ 5
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs View File

@@ -25,5 +25,10 @@ namespace JT808.Gateway.InMemoryMQ.Services
{
return _channel.Reader.TryRead(out item);
}

public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 5
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs View File

@@ -25,5 +25,10 @@ namespace JT808.Gateway.InMemoryMQ.Services
{
return _channel.Reader.TryRead(out item);
}

public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 1
- 1
src/JT808.Gateway.TestHosting/startup.txt View File

@@ -1,4 +1,4 @@
pm2 start "dotnet JT808.DotNetty.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.DotNetty.CleintBenchmark" -o "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/error.log"
pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log"


pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log"

+ 5
- 3
src/JT808.Gateway/JT808TcpServer.cs View File

@@ -173,9 +173,11 @@ namespace JT808.Gateway
{
if (mark == 1)
{
ReadOnlySpan<byte> contentSpan=ReadOnlySpan<byte>.Empty;
try
{
var package = Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan);
contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan;
var package = Serializer.HeaderDeserialize(contentSpan,minBufferSize:10240);
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
@@ -185,8 +187,8 @@ namespace JT808.Gateway
catch (JT808Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex,$"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}");
if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break;


+ 2
- 2
src/JT808.Gateway/JT808UdpServer.cs View File

@@ -105,8 +105,8 @@ namespace JT808.Gateway
catch (JT808Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex, $"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}-{buffer.ToArray().ToHexString()}");
if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{buffer.ToArray().ToHexString()}");
}
catch (Exception ex)
{


Loading…
Cancel
Save