Bläddra i källkod

使用本地数据测试

old
smallchi 5 år sedan
förälder
incheckning
21d1de6b87
2 ändrade filer med 54 tillägg och 26 borttagningar
  1. +25
    -1
      src/JT1078.DotNetty.TestHosting/JT1078DataService.cs
  2. +29
    -25
      src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs

+ 25
- 1
src/JT1078.DotNetty.TestHosting/JT1078DataService.cs Visa fil

@@ -2,7 +2,10 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.DotNetty.TestHosting
{
@@ -12,7 +15,28 @@ namespace JT1078.DotNetty.TestHosting

public JT1078DataService()
{
DataBlockingCollection = new BlockingCollection<JT1078Package>();
DataBlockingCollection = new BlockingCollection<JT1078Package>(60000);
var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-07-15.log"));
Task.Run(() =>
{
while (true)
{
for (int i = 0; i < lines.Length; i++)
{
try
{
var package = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes());
DataBlockingCollection.Add(package);
Thread.Sleep(300);
}
catch
{

}
}
Thread.Sleep(60000);
}
});
}
}
}

+ 29
- 25
src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs Visa fil

@@ -38,33 +38,37 @@ namespace JT1078.DotNetty.TestHosting
{
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable())
{
Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism=5 },session=>
if (jT1078WebSocketSessionManager.GetAll().Count() > 0)
{
//if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包)
//{
// SubcontractKey.TryRemove(item.SIM, out _);
Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session =>
{
//if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包)
//{
// SubcontractKey.TryRemove(item.SIM, out _);

// SubcontractKey.TryAdd(item.SIM, item.Bodies);
//}
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包)
//{
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer))
// {
// SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray();
// }
//}
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包)
//{
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer))
// {
// session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray())));
// }
//}
//else
//{
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies)));
// }
});
// SubcontractKey.TryAdd(item.SIM, item.Bodies);
//}
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包)
//{
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer))
// {
// SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray();
// }
//}
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包)
//{
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer))
// {
// session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray())));
// }
//}
//else
//{
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies)));
// }
});
}
}
}
catch


Laddar…
Avbryt
Spara