Browse Source

1.修改客户端初始化连接将同步改为异步

2.修改先建立连接在批量请求
tags/v2.3.1
SmallChi(Koike) 5 years ago
parent
commit
b6615de779
3 changed files with 41 additions and 30 deletions
  1. +34
    -28
      src/JT808.DotNetty.CleintBenchmark/Services/CleintBenchmarkHostedService.cs
  2. +2
    -1
      src/JT808.DotNetty.CleintBenchmark/appsettings.json
  3. +5
    -1
      src/JT808.DotNetty.Client/JT808TcpClient.cs

+ 34
- 28
src/JT808.DotNetty.CleintBenchmark/Services/CleintBenchmarkHostedService.cs View File

@@ -23,10 +23,6 @@ namespace JT808.DotNetty.CleintBenchmark.Services


private readonly IJT808TcpClientFactory jT808TcpClientFactory; private readonly IJT808TcpClientFactory jT808TcpClientFactory;


private CancellationTokenSource cts=new CancellationTokenSource();

private TaskFactory taskFactory;

public CleintBenchmarkHostedService( public CleintBenchmarkHostedService(
ILoggerFactory loggerFactory, ILoggerFactory loggerFactory,
IJT808TcpClientFactory jT808TcpClientFactory, IJT808TcpClientFactory jT808TcpClientFactory,
@@ -35,7 +31,6 @@ namespace JT808.DotNetty.CleintBenchmark.Services
this.jT808TcpClientFactory = jT808TcpClientFactory; this.jT808TcpClientFactory = jT808TcpClientFactory;
clientBenchmarkOptions = clientBenchmarkOptionsAccessor.Value; clientBenchmarkOptions = clientBenchmarkOptionsAccessor.Value;
logger = loggerFactory.CreateLogger("CleintBenchmarkHostedService"); logger = loggerFactory.CreateLogger("CleintBenchmarkHostedService");
taskFactory = new TaskFactory();
} }
public Task StartAsync(CancellationToken cancellationToken) public Task StartAsync(CancellationToken cancellationToken)
{ {
@@ -44,39 +39,50 @@ namespace JT808.DotNetty.CleintBenchmark.Services
ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads); ThreadPool.GetMaxThreads(out var maxWorkerThreads, out var maxCompletionPortThreads);
logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}"); logger.LogInformation($"GetMinThreads:{minWorkerThreads}-{minCompletionPortThreads}");
logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}"); logger.LogInformation($"GetMaxThreads:{maxWorkerThreads}-{maxCompletionPortThreads}");
//ThreadPool.SetMaxThreads(20, 20);
//ThreadPool.GetMaxThreads(out var setMaxWorkerThreads, out var setMaxCompletionPortThreads);
//logger.LogInformation($"SetMaxThreads:{setMaxWorkerThreads}-{setMaxCompletionPortThreads}");
//先建立连接
for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++) for (int i=0;i< clientBenchmarkOptions.DeviceCount; i++)
{ {
taskFactory.StartNew((item) =>
var client = jT808TcpClientFactory.Create(new JT808DeviceConfig((i+1+ clientBenchmarkOptions.DeviceTemplate).ToString(),
clientBenchmarkOptions.IP,
clientBenchmarkOptions.Port));
}

ThreadPool.QueueUserWorkItem((state) =>
{
while (!cancellationToken.IsCancellationRequested)
{ {
var client = jT808TcpClientFactory.Create(new JT808DeviceConfig(((int)item+1+ clientBenchmarkOptions.DeviceTemplate).ToString(), clientBenchmarkOptions.IP, clientBenchmarkOptions.Port));
int lat = new Random(1000).Next(100000, 180000);
int Lng = new Random(1000).Next(100000, 180000);
while (!cts.IsCancellationRequested)
Parallel.ForEach(jT808TcpClientFactory.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 100 }, (item) =>
{ {
client.Send(JT808MsgId.位置信息汇报.Create(client.DeviceConfig.TerminalPhoneNo,new JT808_0x0200()
try
{ {
Lat = lat,
Lng = Lng,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
}));
Thread.Sleep(clientBenchmarkOptions.Interval);
}
}, i,cts.Token);
}
int lat = new Random(1000).Next(100000, 180000);
int Lng = new Random(1000).Next(100000, 180000);
item.Send(JT808MsgId.位置信息汇报.Create(item.DeviceConfig.TerminalPhoneNo, new JT808_0x0200()
{
Lat = lat,
Lng = Lng,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
}));
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
});
Thread.Sleep(clientBenchmarkOptions.Interval);
}
});
return Task.CompletedTask; return Task.CompletedTask;
} }


public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
cts.Cancel();
jT808TcpClientFactory.Dispose();
logger.LogInformation("StopAsync..."); logger.LogInformation("StopAsync...");
return Task.CompletedTask; return Task.CompletedTask;
} }


+ 2
- 1
src/JT808.DotNetty.CleintBenchmark/appsettings.json View File

@@ -13,9 +13,10 @@
} }
}, },
"ClientBenchmarkOptions": { "ClientBenchmarkOptions": {
"IP": "",
"IP": "127.0.0.1",
"Port": 808, "Port": 808,
"DeviceCount": 100, "DeviceCount": 100,
"Interval": 1000,
"DeviceTemplate": 300000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000 "DeviceTemplate": 300000 //需要多台机器同时访问,那么可以根据这个避开重复终端号 100000-200000-300000
} }
} }

+ 5
- 1
src/JT808.DotNetty.Client/JT808TcpClient.cs View File

@@ -14,6 +14,7 @@ using JT808.DotNetty.Client.Metadata;
using JT808.DotNetty.Client.Codecs; using JT808.DotNetty.Client.Codecs;
using JT808.DotNetty.Client.Services; using JT808.DotNetty.Client.Services;
using JT808.Protocol; using JT808.Protocol;
using System.Threading.Tasks;


namespace JT808.DotNetty.Client namespace JT808.DotNetty.Client
{ {
@@ -57,7 +58,10 @@ namespace JT808.DotNetty.Client
channel.Pipeline.AddLast("jt808TcpClientConnection", new JT808TcpClientConnectionHandler(this)); channel.Pipeline.AddLast("jt808TcpClientConnection", new JT808TcpClientConnectionHandler(this));
channel.Pipeline.AddLast("jt808TcpService", new JT808TcpClientHandler(jT808ReceiveAtomicCounterService,this)); channel.Pipeline.AddLast("jt808TcpService", new JT808TcpClientHandler(jT808ReceiveAtomicCounterService,this));
})); }));
clientChannel = bootstrap.ConnectAsync(IPAddress.Parse(DeviceConfig.TcpHost), DeviceConfig.TcpPort).Result;
Task.Run(async () =>
{
clientChannel = await bootstrap.ConnectAsync(IPAddress.Parse(DeviceConfig.TcpHost), DeviceConfig.TcpPort);
});
} }


public async void Send(JT808ClientRequest request) public async void Send(JT808ClientRequest request)


Loading…
Cancel
Save