diff --git a/src/JT1078.DotNetty.TestHosting/JT1078DataService.cs b/src/JT1078.DotNetty.TestHosting/JT1078DataService.cs index 79b8c7b..7a5859f 100644 --- a/src/JT1078.DotNetty.TestHosting/JT1078DataService.cs +++ b/src/JT1078.DotNetty.TestHosting/JT1078DataService.cs @@ -2,7 +2,10 @@ using System; using System.Collections.Concurrent; using System.Collections.Generic; +using System.IO; using System.Text; +using System.Threading; +using System.Threading.Tasks; namespace JT1078.DotNetty.TestHosting { @@ -12,7 +15,28 @@ namespace JT1078.DotNetty.TestHosting public JT1078DataService() { - DataBlockingCollection = new BlockingCollection(); + DataBlockingCollection = new BlockingCollection(60000); + var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-07-15.log")); + Task.Run(() => + { + while (true) + { + for (int i = 0; i < lines.Length; i++) + { + try + { + var package = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes()); + DataBlockingCollection.Add(package); + Thread.Sleep(300); + } + catch + { + + } + } + Thread.Sleep(60000); + } + }); } } } diff --git a/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs b/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs index 2abc7e6..17a88cd 100644 --- a/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs +++ b/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs @@ -38,33 +38,37 @@ namespace JT1078.DotNetty.TestHosting { foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable()) { - Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism=5 },session=> + if (jT1078WebSocketSessionManager.GetAll().Count() > 0) { - //if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) - //{ - // SubcontractKey.TryRemove(item.SIM, out _); + Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session => + { + //if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) + //{ + // SubcontractKey.TryRemove(item.SIM, out _); - // SubcontractKey.TryAdd(item.SIM, item.Bodies); - //} - //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) - //{ - // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) - // { - // SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); - // } - //} - //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) - //{ - // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) - // { - // session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray()))); - // } - //} - //else - //{ - session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); - // } - }); + // SubcontractKey.TryAdd(item.SIM, item.Bodies); + //} + //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) + //{ + // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) + // { + // SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); + // } + //} + //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) + //{ + // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) + // { + // session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray()))); + // } + //} + //else + //{ + session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); + // } + }); + } + } } catch