From 025c182ddcc3e5170ba295620530225576e03272 Mon Sep 17 00:00:00 2001 From: "smallchi(Koike)" <564952747@qq.com> Date: Sun, 26 Jan 2020 23:30:23 +0800 Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E6=94=B9=E6=B6=88=E6=81=AF=E5=92=8C?= =?UTF-8?q?=E6=B6=88=E6=81=AF=E5=BA=94=E7=AD=94=E6=B6=88=E8=B4=B9=E6=9C=8D?= =?UTF-8?q?=E5=8A=A1=E4=B8=BA=E5=BC=82=E6=AD=A5=E6=96=B9=E5=BC=8F=202.?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=AE=A2=E6=88=B7=E7=AB=AF=E4=B8=BA=E4=B8=B2?= =?UTF-8?q?=E8=A1=8C=E4=B8=8B=E5=8F=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Services/CleintBenchmarkHostedService.cs | 40 ++------------ src/JT808.Gateway.Client/JT808TcpClient.cs | 55 +++++++++++-------- .../JT808TcpClientFactory.cs | 2 +- .../JT808MsgConsumer.cs | 8 +-- .../JT808MsgReplyConsumer.cs | 8 +-- .../Services/JT808MsgService.cs | 5 ++ .../Services/JT808ReplyMsgService.cs | 5 ++ src/JT808.Gateway.TestHosting/startup.txt | 2 +- src/JT808.Gateway/JT808TcpServer.cs | 8 ++- src/JT808.Gateway/JT808UdpServer.cs | 4 +- 10 files changed, 62 insertions(+), 75 deletions(-) diff --git a/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs b/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs index 3e947df..2b7a016 100644 --- a/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs +++ b/src/JT808.Gateway.CleintBenchmark/Services/CleintBenchmarkHostedService.cs @@ -9,6 +9,7 @@ using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.Linq; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -24,8 +25,6 @@ namespace JT808.Gateway.CleintBenchmark.Services private readonly IJT808TcpClientFactory jT808TcpClientFactory; - private ConcurrentQueue failDeviceNoQueue; - public CleintBenchmarkHostedService( ILoggerFactory loggerFactory, IJT808TcpClientFactory jT808TcpClientFactory, @@ -42,49 +41,18 @@ namespace JT808.Gateway.CleintBenchmark.Services ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads); logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}"); logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}"); - //先建立连接 - failDeviceNoQueue = new ConcurrentQueue(); for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++) { string deviceNo = (i + 1 + clientBenchmarkOptions.DeviceTemplate).ToString(); var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo, clientBenchmarkOptions.IP, clientBenchmarkOptions.Port), cancellationToken); - if (client == null) - { - failDeviceNoQueue.Enqueue(deviceNo); - } } - int successCount = clientBenchmarkOptions.DeviceCount - failDeviceNoQueue.Count; - logger.LogInformation($"总连接数:{clientBenchmarkOptions.DeviceCount}"); - logger.LogInformation($"已建立连接数:{successCount}"); - logger.LogInformation($"失败连接数:{failDeviceNoQueue.Count}"); - Task.Run(() => { - while (!cancellationToken.IsCancellationRequested) - { - if(failDeviceNoQueue.TryDequeue(out string deviceNo)) - { - logger.LogInformation($"尝试重连{deviceNo}..."); - var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(deviceNo, - clientBenchmarkOptions.IP, - clientBenchmarkOptions.Port), cancellationToken); - if (client == null) - { - failDeviceNoQueue.Enqueue(deviceNo); - } - Thread.Sleep(1000); - } - else - { - Thread.Sleep(3000); - } - } - }, cancellationToken); ThreadPool.QueueUserWorkItem((state) => { while (!cancellationToken.IsCancellationRequested) { - Parallel.ForEach(jT808TcpClientFactory.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 100 }, (item) => + foreach (var item in jT808TcpClientFactory.GetAll()) { try { @@ -106,8 +74,8 @@ namespace JT808.Gateway.CleintBenchmark.Services { logger.LogError(ex.Message); } - }); - Thread.Sleep(clientBenchmarkOptions.Interval); + } + Thread.Sleep(100); } }); return Task.CompletedTask; diff --git a/src/JT808.Gateway.Client/JT808TcpClient.cs b/src/JT808.Gateway.Client/JT808TcpClient.cs index e404a5d..d24648d 100644 --- a/src/JT808.Gateway.Client/JT808TcpClient.cs +++ b/src/JT808.Gateway.Client/JT808TcpClient.cs @@ -23,6 +23,7 @@ namespace JT808.Gateway.Client private readonly JT808Serializer JT808Serializer; private readonly JT808SendAtomicCounterService SendAtomicCounterService; private readonly JT808ReceiveAtomicCounterService ReceiveAtomicCounterService; + private bool socketState = true; public JT808DeviceConfig DeviceConfig { get; } public JT808TcpClient( JT808DeviceConfig deviceConfig, @@ -47,32 +48,29 @@ namespace JT808.Gateway.Client return false; } } - public Task StartAsync(CancellationToken cancellationToken) + public async void StartAsync(CancellationToken cancellationToken) { - Task.Run(async () => { - await Task.Factory.StartNew(async (state) => + await Task.Factory.StartNew(async (state) => + { + var session = (Socket)state; + if (Logger.IsEnabled(LogLevel.Information)) { - var session = (Socket)state; - if (Logger.IsEnabled(LogLevel.Information)) - { - Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}"); - } - var pipe = new Pipe(); - Task writing = FillPipeAsync(session, pipe.Writer); - Task reading = ReadPipeAsync(session, pipe.Reader); - await Task.WhenAll(reading, writing); - }, clientSocket); - }, cancellationToken); - return Task.CompletedTask; + Logger.LogInformation($"[Connected]:{session.LocalEndPoint} to {session.RemoteEndPoint}"); + } + var pipe = new Pipe(); + Task writing = FillPipeAsync(session, pipe.Writer, cancellationToken); + Task reading = ReadPipeAsync(session, pipe.Reader); + await Task.WhenAll(reading, writing); + }, clientSocket); } - private async Task FillPipeAsync(Socket session, PipeWriter writer) + private async Task FillPipeAsync(Socket session, PipeWriter writer, CancellationToken cancellationToken) { while (true) { try { Memory memory = writer.GetMemory(10240); - int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None); + int bytesRead = await session.ReceiveAsync(memory, SocketFlags.None, cancellationToken); if (bytesRead == 0) { break; @@ -181,7 +179,7 @@ namespace JT808.Gateway.Client public void Send(JT808ClientRequest message) { if (disposed) return; - if (IsOpen) + if (IsOpen && socketState) { if (message.Package != null) { @@ -193,7 +191,8 @@ namespace JT808.Gateway.Client } catch (System.Net.Sockets.SocketException ex) { - Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]"); + socketState = false; + Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]"); } catch (System.Exception ex) { @@ -202,8 +201,20 @@ namespace JT808.Gateway.Client } else if (message.HexData != null) { - clientSocket.Send(message.HexData); - SendAtomicCounterService.MsgSuccessIncrement(); + try + { + clientSocket.Send(message.HexData); + SendAtomicCounterService.MsgSuccessIncrement(); + } + catch (System.Net.Sockets.SocketException ex) + { + socketState = false; + Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message},{DeviceConfig.TerminalPhoneNo}]"); + } + catch (System.Exception ex) + { + Logger.LogError(ex.Message); + } } } } @@ -260,7 +271,7 @@ namespace JT808.Gateway.Client if (disposed) return false; if (clientSocket != null) { - return clientSocket.Connected; + return clientSocket.Connected && socketState; } return false; } diff --git a/src/JT808.Gateway.Client/JT808TcpClientFactory.cs b/src/JT808.Gateway.Client/JT808TcpClientFactory.cs index 2cf8591..5d928c6 100644 --- a/src/JT808.Gateway.Client/JT808TcpClientFactory.cs +++ b/src/JT808.Gateway.Client/JT808TcpClientFactory.cs @@ -41,7 +41,7 @@ namespace JT808.Gateway.Client var successed= await jT808TcpClient.ConnectAsync(new IPEndPoint(IPAddress.Parse(deviceConfig.TcpHost), deviceConfig.TcpPort)); if (successed) { - await jT808TcpClient.StartAsync(cancellationToken); + jT808TcpClient.StartAsync(cancellationToken); dict.TryAdd(deviceConfig.TerminalPhoneNo, jT808TcpClient); return jT808TcpClient; } diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs index 2eeb39c..4f890e5 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs @@ -25,16 +25,14 @@ namespace JT808.Gateway.InMemoryMQ public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(() => + Task.Run(async() => { while (!Cts.IsCancellationRequested) { try { - if (JT808MsgService.TryRead(out var item)) - { - callback(item); - } + var item = await JT808MsgService.ReadAsync(Cts.Token); + callback(item); } catch(Exception ex) { diff --git a/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs index 432e746..7eb172e 100644 --- a/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs +++ b/src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs @@ -28,16 +28,14 @@ namespace JT808.Gateway.InMemoryMQ public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) { - Task.Run(() => + Task.Run(async() => { while (!Cts.IsCancellationRequested) { try { - if (JT808ReplyMsgService.TryRead(out var item)) - { - callback(item); - } + var item= await JT808ReplyMsgService.ReadAsync(Cts.Token); + callback(item); } catch (Exception ex) { diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs index 765b58b..e58b34d 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs @@ -25,5 +25,10 @@ namespace JT808.Gateway.InMemoryMQ.Services { return _channel.Reader.TryRead(out item); } + + public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } } } diff --git a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs index 1497533..d6ef7fa 100644 --- a/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs +++ b/src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs @@ -25,5 +25,10 @@ namespace JT808.Gateway.InMemoryMQ.Services { return _channel.Reader.TryRead(out item); } + + public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken) + { + return await _channel.Reader.ReadAsync(cancellationToken); + } } } diff --git a/src/JT808.Gateway.TestHosting/startup.txt b/src/JT808.Gateway.TestHosting/startup.txt index 0a51e3b..5cc05f2 100644 --- a/src/JT808.Gateway.TestHosting/startup.txt +++ b/src/JT808.Gateway.TestHosting/startup.txt @@ -1,4 +1,4 @@ -pm2 start "dotnet JT808.DotNetty.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.DotNetty.CleintBenchmark" -o "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/error.log" +pm2 start "dotnet JT808.Gateway.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.CleintBenchmark" -o "/data/pm2Logs/JT808.Gateway.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.Gateway.CleintBenchmark/error.log" pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" \ No newline at end of file diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs index e4fdd99..87f877b 100644 --- a/src/JT808.Gateway/JT808TcpServer.cs +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -173,9 +173,11 @@ namespace JT808.Gateway { if (mark == 1) { + ReadOnlySpan contentSpan=ReadOnlySpan.Empty; try { - var package = Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan); + contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan; + var package = Serializer.HeaderDeserialize(contentSpan,minBufferSize:10240); AtomicCounterService.MsgSuccessIncrement(); if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); @@ -185,8 +187,8 @@ namespace JT808.Gateway catch (JT808Exception ex) { AtomicCounterService.MsgFailIncrement(); - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); - Logger.LogError(ex,$"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}"); + if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); + Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{contentSpan.ToArray().ToHexString()}"); } totalConsumed += (seqReader.Consumed - totalConsumed); if (seqReader.End) break; diff --git a/src/JT808.Gateway/JT808UdpServer.cs b/src/JT808.Gateway/JT808UdpServer.cs index a99e753..4bdbffd 100644 --- a/src/JT808.Gateway/JT808UdpServer.cs +++ b/src/JT808.Gateway/JT808UdpServer.cs @@ -105,8 +105,8 @@ namespace JT808.Gateway catch (JT808Exception ex) { AtomicCounterService.MsgFailIncrement(); - if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); - Logger.LogError(ex, $"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}-{buffer.ToArray().ToHexString()}"); + if (Logger.IsEnabled(LogLevel.Information)) Logger.LogInformation($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); + Logger.LogError($"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode},[ReaderBuffer]:{buffer.ToArray().ToHexString()}"); } catch (Exception ex) {