@@ -12,7 +12,7 @@ | |||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||
<PackageReference Include="JT1078" Version="1.0.0-preview1" /> | |||
<PackageReference Include="JT1078" Version="1.0.0-preview2" /> | |||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" /> | |||
@@ -15,8 +15,12 @@ namespace JT1078.DotNetty.TestHosting.Handlers | |||
{ | |||
private readonly ILogger logger; | |||
private readonly ILogger hexLogger; | |||
public JT1078TcpMessageHandlers(ILoggerFactory loggerFactory) | |||
private readonly JT1078DataService jT1078DataService; | |||
public JT1078TcpMessageHandlers( | |||
JT1078DataService jT1078DataService, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
this.jT1078DataService = jT1078DataService; | |||
logger = loggerFactory.CreateLogger("JT1078TcpMessageHandlers"); | |||
hexLogger = loggerFactory.CreateLogger("JT1078TcpMessageHandlersHex"); | |||
} | |||
@@ -25,6 +29,7 @@ namespace JT1078.DotNetty.TestHosting.Handlers | |||
{ | |||
logger.LogInformation(JsonConvert.SerializeObject(request.Package)); | |||
hexLogger.LogInformation($"{request.Package.SIM},{request.Package.SN},{request.Package.LogicChannelNumber},{request.Package.Label3.DataType.ToString()},{request.Package.Label3.SubpackageType.ToString()},{ByteBufferUtil.HexDump(request.Src)}"); | |||
jT1078DataService.DataBlockingCollection.TryAdd(request.Package); | |||
return Task.FromResult<JT1078Response>(default); | |||
} | |||
} | |||
@@ -0,0 +1,18 @@ | |||
using JT1078.Protocol; | |||
using System; | |||
using System.Collections.Concurrent; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT1078.DotNetty.TestHosting | |||
{ | |||
public class JT1078DataService | |||
{ | |||
public BlockingCollection<JT1078Package> DataBlockingCollection { get; } | |||
public JT1078DataService() | |||
{ | |||
DataBlockingCollection = new BlockingCollection<JT1078Package>(); | |||
} | |||
} | |||
} |
@@ -10,33 +10,66 @@ using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
using JT1078.Protocol; | |||
using System.Collections.Concurrent; | |||
using JT1078.Protocol.Enums; | |||
namespace JT1078.DotNetty.TestHosting | |||
{ | |||
class JT1078WebSocketPushHostedService : IHostedService | |||
{ | |||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | |||
public JT1078WebSocketPushHostedService(JT1078WebSocketSessionManager jT1078WebSocketSessionManager) | |||
private readonly JT1078DataService jT1078DataService; | |||
private readonly ConcurrentDictionary<string, byte[]> SubcontractKey; | |||
public JT1078WebSocketPushHostedService( | |||
JT1078DataService jT1078DataService, | |||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager) | |||
{ | |||
SubcontractKey = new ConcurrentDictionary<string, byte[]>(StringComparer.OrdinalIgnoreCase); | |||
this.jT1078DataService = jT1078DataService; | |||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | |||
} | |||
public Task StartAsync(CancellationToken cancellationToken) | |||
{ | |||
var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-07-12.log")); | |||
Task.Run(() => | |||
{ | |||
while (true) | |||
{ | |||
var session = jT1078WebSocketSessionManager.GetAll().FirstOrDefault(); | |||
if (session != null) | |||
try | |||
{ | |||
for (int i = 0; i < lines.Length; i++) | |||
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable()) | |||
{ | |||
var package = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes()); | |||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(package.Bodies))); | |||
Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism=5 },session=> | |||
{ | |||
//if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) | |||
//{ | |||
// SubcontractKey.TryRemove(item.SIM, out _); | |||
// SubcontractKey.TryAdd(item.SIM, item.Bodies); | |||
//} | |||
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) | |||
//{ | |||
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||
// { | |||
// SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); | |||
// } | |||
//} | |||
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) | |||
//{ | |||
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||
// { | |||
// session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray()))); | |||
// } | |||
//} | |||
//else | |||
//{ | |||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); | |||
// } | |||
}); | |||
} | |||
} | |||
catch | |||
{ | |||
} | |||
Thread.Sleep(10000); | |||
} | |||
@@ -57,10 +57,11 @@ namespace JT1078.DotNetty.TestHosting | |||
{ | |||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
services.AddSingleton<JT1078DataService>(); | |||
services.AddJT1078Core(hostContext.Configuration) | |||
//.AddJT1078TcpHost() | |||
//.Replace<JT1078TcpMessageHandlers>() | |||
//.Builder() | |||
.AddJT1078TcpHost() | |||
.Replace<JT1078TcpMessageHandlers>() | |||
.Builder() | |||
//.AddJT1078UdpHost() | |||
//.Replace<JT1078UdpMessageHandlers>() | |||
//.Builder() | |||
@@ -17,7 +17,7 @@ | |||
"UdpPort": 1808, | |||
"WebSocketPort": 1818, | |||
"RemoteServerOptions": { | |||
"RemoteServers": [ "172.16.19.209:16868" ] | |||
//"RemoteServers": [ "172.16.19.209:16868" ] | |||
} | |||
} | |||
} |