From 53d8ea21640324ae247cc96253bd328143939345 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Wed, 16 Sep 2020 22:19:03 +0800 Subject: [PATCH] =?UTF-8?q?=E6=95=B4=E7=90=86=E9=A1=B9=E7=9B=AE=E5=8E=BB?= =?UTF-8?q?=E6=8E=89=E4=B8=80=E4=BA=9B=E6=97=A0=E7=94=A8=E7=9A=84=E4=B8=9C?= =?UTF-8?q?=E8=A5=BF?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Enums/JT1078UseType.cs | 18 ----- .../IJT1078MsgConsumer.cs | 2 +- .../IJT1078MsgProducer.cs | 4 +- .../IJT1078NormalGatewayBuilder.cs | 12 --- .../IJT1078PackageConsumer.cs | 16 ---- .../IJT1078PackageProducer.cs | 18 ----- .../IJT1078QueueGatewayBuilder.cs | 12 --- .../JT1078.Gateway.Abstractions.csproj | 11 ++- .../Controller/UserController.cs | 2 - .../Dtos/ChannelCloseRequest.cs | 2 +- .../JT1078.Gateway.InMemoryMQ.csproj | 6 +- .../JT1078InMemoryMQExtensions.cs | 8 +- .../JT1078MsgChannel.cs | 4 +- ...ackageConsumer.cs => JT1078MsgConsumer.cs} | 17 +--- ...ackageProducer.cs => JT1078MsgProducer.cs} | 8 +- .../Configs/nlog.Unix.config | 6 +- .../Configs/nlog.Win32NT.config | 8 +- .../JT1078.Gateway.TestNormalHosting.csproj | 2 +- .../Program.cs | 4 +- .../JT1078FlvNormalMsgHostedService.cs | 81 +++++++++---------- .../JT1078HlsNormalMsgHostedService.cs | 12 +-- src/JT1078.Gateway.sln | 24 +++--- .../Impl/JT1078NormalGatewayBuilderDefault.cs | 19 ----- .../Impl/JT1078QueueGatewayBuilderDefault.cs | 19 ----- src/JT1078.Gateway/JT1078.Gateway.csproj | 3 +- src/JT1078.Gateway/JT1078GatewayExtensions.cs | 12 +-- src/JT1078.Gateway/JT1078TcpServer.cs | 51 +----------- src/JT1078.Gateway/JT1078UdpServer.cs | 42 +--------- .../Jobs/JT1078SessionNoticeJob.cs | 4 +- .../Services/JT1078SessionNoticeService.cs | 4 +- src/Version.props | 5 ++ 31 files changed, 119 insertions(+), 317 deletions(-) delete mode 100644 src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs delete mode 100644 src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs delete mode 100644 src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs delete mode 100644 src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs delete mode 100644 src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs rename src/JT1078.Gateway.InMemoryMQ/{JT1078PackageConsumer.cs => JT1078MsgConsumer.cs} (56%) rename src/JT1078.Gateway.InMemoryMQ/{JT1078PackageProducer.cs => JT1078MsgProducer.cs} (63%) delete mode 100644 src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs delete mode 100644 src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs create mode 100644 src/Version.props diff --git a/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs b/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs deleted file mode 100644 index 8f6c35f..0000000 --- a/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Abstractions.Enums -{ - public enum JT1078UseType : byte - { - /// - /// 使用正常方式 - /// - Normal = 1, - /// - /// 使用队列方式 - /// - Queue = 2 - } -} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs index 53a391f..b6266ef 100644 --- a/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs +++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs @@ -7,7 +7,7 @@ namespace JT1078.Gateway.Abstractions { public interface IJT1078MsgConsumer : IJT1078PubSub, IDisposable { - void OnMessage(Action<(string TerminalNo, byte[] Data)> callback); + void OnMessage(Action<(string SIM, byte[] Data)> callback); CancellationTokenSource Cts { get; } void Subscribe(); void Unsubscribe(); diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs index 30a557e..4abab68 100644 --- a/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs +++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs @@ -10,8 +10,8 @@ namespace JT1078.Gateway.Abstractions /// /// /// - /// 设备终端号 + /// 设备sim终端号 /// jt1078 hex data - ValueTask ProduceAsync(string terminalNo, byte[] data); + ValueTask ProduceAsync(string sim, byte[] data); } } diff --git a/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs deleted file mode 100644 index fd574d1..0000000 --- a/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Abstractions -{ - public interface IJT1078NormalGatewayBuilder: IJT1078GatewayBuilder - { - - } -} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs deleted file mode 100644 index 7b50be2..0000000 --- a/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs +++ /dev/null @@ -1,16 +0,0 @@ -using JT1078.Protocol; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; - -namespace JT1078.Gateway.Abstractions -{ - public interface IJT1078PackageConsumer : IJT1078PubSub, IDisposable - { - void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback); - CancellationTokenSource Cts { get; } - void Subscribe(); - void Unsubscribe(); - } -} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs deleted file mode 100644 index 8f6e1d6..0000000 --- a/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs +++ /dev/null @@ -1,18 +0,0 @@ -using JT1078.Protocol; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Abstractions -{ - public interface IJT1078PackageProducer : IJT1078PubSub, IDisposable - { - /// - /// - /// - /// 设备终端号 - /// jt1078 package data - ValueTask ProduceAsync(string terminalNo, JT1078Package data); - } -} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs deleted file mode 100644 index 84dc38e..0000000 --- a/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Abstractions -{ - public interface IJT1078QueueGatewayBuilder: IJT1078GatewayBuilder - { - - } -} diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index 4f3d6b4..c5ad943 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -1,5 +1,5 @@ - + netstandard2.1 8.0 @@ -17,8 +17,15 @@ false LICENSE true - 1.0.0-preview2 + $(JT1078PackageVersion) + + + + + + + diff --git a/src/JT1078.Gateway.Coordinator/Controller/UserController.cs b/src/JT1078.Gateway.Coordinator/Controller/UserController.cs index 7267601..2655b0e 100644 --- a/src/JT1078.Gateway.Coordinator/Controller/UserController.cs +++ b/src/JT1078.Gateway.Coordinator/Controller/UserController.cs @@ -16,8 +16,6 @@ namespace JT1078.Gateway.Coordinator.Controller [EnableCors("any")] public class UserController : ControllerBase { - - /// /// 登录 /// diff --git a/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs b/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs index bbd0470..9364aae 100644 --- a/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs +++ b/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs @@ -10,7 +10,7 @@ namespace JT1078.Gateway.Coordinator.Dtos /// /// 设备sim卡号 /// - public string TerminalPhoneNo { get; set; } + public string Sim { get; set; } /// /// 通道号 /// diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj index 0577923..6c6386f 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj @@ -1,5 +1,5 @@ - + netstandard2.1 8.0 @@ -17,7 +17,7 @@ false LICENSE true - 1.0.0-preview2 + $(JT1078PackageVersion) @@ -25,7 +25,7 @@ - + diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs index 4291b15..4828a6d 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs @@ -9,17 +9,17 @@ namespace JT1078.Gateway.InMemoryMQ { public static class JT1078InMemoryMQExtensions { - public static IJT1078NormalGatewayBuilder AddMsgProducer(this IJT1078NormalGatewayBuilder builder) + public static IJT1078GatewayBuilder AddMsgProducer(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.TryAddSingleton(); - builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); return builder; } - public static IJT1078NormalGatewayBuilder AddMsgConsumer(this IJT1078NormalGatewayBuilder builder) + public static IJT1078GatewayBuilder AddMsgConsumer(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.TryAddSingleton(); - builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); return builder; } } diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs index ae3ff54..943e96e 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs @@ -7,11 +7,11 @@ namespace JT1078.Gateway.InMemoryMQ { public class JT1078MsgChannel { - public Channel<(string, JT1078.Protocol.JT1078Package)> Channel { get;} + public Channel<(string, byte[])> Channel { get;} public JT1078MsgChannel() { - Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, JT1078.Protocol.JT1078Package)>(); + Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>(); } } } diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs similarity index 56% rename from src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs rename to src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs index 31ccaa0..febad0f 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs @@ -1,25 +1,18 @@ using JT1078.Gateway.Abstractions; -using JT1078.Protocol; -using Microsoft.Extensions.Logging; using System; -using System.Collections.Generic; -using System.Text; using System.Text.Json; -using System.Text.Json.Serialization; using System.Threading; using System.Threading.Tasks; namespace JT1078.Gateway.InMemoryMQ { - public class JT1078PackageConsumer: IJT1078PackageConsumer + public class JT1078MsgConsumer : IJT1078MsgConsumer { private JT1078MsgChannel Channel; - private readonly ILogger logger; - public JT1078PackageConsumer(ILoggerFactory loggerFactory,JT1078MsgChannel channel) + public JT1078MsgConsumer(JT1078MsgChannel channel) { Channel = channel; - logger = loggerFactory.CreateLogger(); } public CancellationTokenSource Cts { get; } = new CancellationTokenSource(); @@ -31,17 +24,13 @@ namespace JT1078.Gateway.InMemoryMQ } - public void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback) + public void OnMessage(Action<(string SIM, byte[] Data)> callback) { Task.Run(async() => { while (!Cts.IsCancellationRequested) { var reader = await Channel.Channel.Reader.ReadAsync(Cts.Token); - if (logger.IsEnabled(LogLevel.Trace)) - { - logger.LogTrace(JsonSerializer.Serialize(reader.Item2)); - } callback(reader); } }, Cts.Token); diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs similarity index 63% rename from src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs rename to src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs index f33134b..1b93f42 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs @@ -8,13 +8,13 @@ using System.Threading.Tasks; namespace JT1078.Gateway.InMemoryMQ { - public class JT1078PackageProducer : IJT1078PackageProducer + public class JT1078MsgProducer : IJT1078MsgProducer { public string TopicName { get; }= "JT1078Package"; private JT1078MsgChannel Channel; - public JT1078PackageProducer(JT1078MsgChannel channel) + public JT1078MsgProducer(JT1078MsgChannel channel) { Channel = channel; } @@ -24,9 +24,9 @@ namespace JT1078.Gateway.InMemoryMQ } - public async ValueTask ProduceAsync(string terminalNo, JT1078Package data) + public async ValueTask ProduceAsync(string sim, byte[] data) { - await Channel.Channel.Writer.WriteAsync((terminalNo, data)); + await Channel.Channel.Writer.WriteAsync((sim, data)); } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config index 333e593..8c8388d 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config @@ -15,6 +15,9 @@ + @@ -32,6 +35,7 @@ - + + \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config index 07098c1..78f7b08 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config @@ -23,6 +23,9 @@ + @@ -37,8 +40,9 @@ - + - + + \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index 4793456..5e000ab 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -14,7 +14,7 @@ - + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 6f96a52..ff8fd8b 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -39,7 +39,7 @@ namespace JT1078.Gateway.TestNormalHosting services.AddSingleton(); services.AddSingleton(new M3U8Option { - + }); services.AddSingleton(); //使用内存队列实现会话通知 @@ -48,9 +48,9 @@ namespace JT1078.Gateway.TestNormalHosting .AddUdp() .AddHttp() //.AddCoordinatorHttpClient() - .AddNormal() .AddMsgProducer() .AddMsgConsumer(); + //内存队列没有做分发,可以自己实现。 services.AddHostedService(); //services.AddHostedService(); }); diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs index 129a8ff..27a9ce7 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs @@ -19,7 +19,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services { public class JT1078FlvNormalMsgHostedService : BackgroundService { - private IJT1078PackageConsumer PackageConsumer; + private IJT1078MsgConsumer JT1078MsgConsumer; private JT1078HttpSessionManager HttpSessionManager; private FlvEncoder FlvEncoder; private ILogger Logger; @@ -31,74 +31,73 @@ namespace JT1078.Gateway.TestNormalHosting.Services ILoggerFactory loggerFactory, FlvEncoder flvEncoder, JT1078HttpSessionManager httpSessionManager, - IJT1078PackageConsumer packageConsumer) + IJT1078MsgConsumer msgConsumer) { Logger = loggerFactory.CreateLogger(); - PackageConsumer = packageConsumer; + JT1078MsgConsumer = msgConsumer; HttpSessionManager = httpSessionManager; FlvEncoder = flvEncoder; this.memoryCache = memoryCache; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { - PackageConsumer.OnMessage((Message) => + JT1078MsgConsumer.OnMessage((Message) => { + JT1078Package package = JT1078Serializer.Deserialize(Message.Data); + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); + Logger.LogDebug($"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + } try { - if (Logger.IsEnabled(LogLevel.Debug)) + var merge = JT1078Serializer.Merge(package); + if (merge == null) return; + string key = $"{package.GetKey()}_{ikey}"; + if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) { - Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); - Logger.LogDebug($"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}"); + memoryCache.Set(key, merge); } - var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data); - string key = $"{Message.Data.GetKey()}_{ikey}"; - if (merge != null) + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(package.SIM.TrimStart('0'), package.LogicChannelNumber); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); + if (firstHttpSessions.Count > 0) { - if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) - { - memoryCache.Set(key, merge); - } - var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM.TrimStart('0'), Message.Data.LogicChannelNumber); - var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); - if (firstHttpSessions.Count > 0) - { - if (memoryCache.TryGetValue(key, out JT1078Package idata)) - { - try - { - var flvVideoBuffer = FlvEncoder.EncoderVideoTag(idata, true); - foreach (var session in firstHttpSessions) - { - HttpSessionManager.SendAVData(session, flvVideoBuffer, true); - } - } - catch (Exception ex) - { - Logger.LogError(ex, $"{Message.Data.SIM},{true},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}"); - } - } - } - var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); - if (otherHttpSessions.Count > 0) + if (memoryCache.TryGetValue(key, out JT1078Package idata)) { try { - var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false); - foreach (var session in otherHttpSessions) + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(idata, true); + foreach (var session in firstHttpSessions) { - HttpSessionManager.SendAVData(session, flvVideoBuffer, false); + HttpSessionManager.SendAVData(session, flvVideoBuffer, true); } } catch (Exception ex) { - Logger.LogError(ex, $"{Message.Data.SIM},{false},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{package.SIM},{true},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + } + } + } + var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); + if (otherHttpSessions.Count > 0) + { + try + { + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false); + foreach (var session in otherHttpSessions) + { + HttpSessionManager.SendAVData(session, flvVideoBuffer, false); } } + catch (Exception ex) + { + Logger.LogError(ex, $"{package.SIM},{false},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + } } } catch (Exception ex) { - Logger.LogError(ex, $"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); } }); return Task.CompletedTask; diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs index bae5a20..4767edd 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs @@ -1,6 +1,7 @@ using JT1078.Gateway.Abstractions; using JT1078.Gateway.Sessions; using JT1078.Hls; +using JT1078.Protocol; using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; @@ -13,23 +14,24 @@ namespace JT1078.Gateway.TestNormalHosting.Services { public class JT1078HlsNormalMsgHostedService : BackgroundService { - private IJT1078PackageConsumer PackageConsumer; + private IJT1078MsgConsumer MsgConsumer; private JT1078HttpSessionManager HttpSessionManager; private M3U8FileManage M3U8FileManage; public JT1078HlsNormalMsgHostedService( M3U8FileManage M3U8FileManage, JT1078HttpSessionManager httpSessionManager, - IJT1078PackageConsumer packageConsumer) + IJT1078MsgConsumer msgConsumer) { - PackageConsumer = packageConsumer; + MsgConsumer = msgConsumer; HttpSessionManager = httpSessionManager; this.M3U8FileManage = M3U8FileManage; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { - PackageConsumer.OnMessage((Message) => + MsgConsumer.OnMessage((Message) => { - var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data); + JT1078Package package = JT1078Serializer.Deserialize(Message.Data); + var merge = JT1078.Protocol.JT1078Serializer.Merge(package); if (merge != null) { var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(merge.SIM, merge.LogicChannelNumber); diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index ac29fd6..4cf5778 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -3,30 +3,26 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.29418.71 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" EndProject -Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}" -EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}" EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Coordinator", "JT1078.Gateway.Coordinator\JT1078.Gateway.Coordinator.csproj", "{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{9042CA92-E01A-46CF-9C82-D954325A69B8}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU Release|Any CPU = Release|Any CPU EndGlobalSection GlobalSection(ProjectConfigurationPlatforms) = postSolution - {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.Build.0 = Debug|Any CPU - {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.ActiveCfg = Release|Any CPU - {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.Build.0 = Release|Any CPU {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.Build.0 = Debug|Any CPU {DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -35,10 +31,6 @@ Global {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU - {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.Build.0 = Debug|Any CPU - {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.ActiveCfg = Release|Any CPU - {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.Build.0 = Release|Any CPU {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.Build.0 = Debug|Any CPU {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.ActiveCfg = Release|Any CPU @@ -47,6 +39,14 @@ Global {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Debug|Any CPU.Build.0 = Debug|Any CPU {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.ActiveCfg = Release|Any CPU {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.Build.0 = Release|Any CPU + {9042CA92-E01A-46CF-9C82-D954325A69B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9042CA92-E01A-46CF-9C82-D954325A69B8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9042CA92-E01A-46CF-9C82-D954325A69B8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9042CA92-E01A-46CF-9C82-D954325A69B8}.Release|Any CPU.Build.0 = Release|Any CPU + {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs deleted file mode 100644 index e99363a..0000000 --- a/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs +++ /dev/null @@ -1,19 +0,0 @@ -using JT1078.Gateway.Abstractions; - -namespace JT1078.Gateway.Impl -{ - public class JT1078NormalGatewayBuilderDefault : IJT1078NormalGatewayBuilder - { - public IJT1078Builder JT1078Builder { get; } - - public JT1078NormalGatewayBuilderDefault(IJT1078Builder builder) - { - JT1078Builder = builder; - } - - public IJT1078Builder Builder() - { - return JT1078Builder; - } - } -} \ No newline at end of file diff --git a/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs deleted file mode 100644 index faf51c0..0000000 --- a/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs +++ /dev/null @@ -1,19 +0,0 @@ -using JT1078.Gateway.Abstractions; - -namespace JT1078.Gateway.Impl -{ - public class JT1078QueueGatewayBuilderDefault : IJT1078QueueGatewayBuilder - { - public IJT1078Builder JT1078Builder { get; } - - public JT1078QueueGatewayBuilderDefault(IJT1078Builder builder) - { - JT1078Builder = builder; - } - - public IJT1078Builder Builder() - { - return JT1078Builder; - } - } -} \ No newline at end of file diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 5c94106..39b649e 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -1,4 +1,5 @@  + netstandard2.1 8.0 @@ -16,7 +17,7 @@ false LICENSE true - 1.0.0-preview2 + $(JT1078PackageVersion) diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index c66dc4f..f9551c7 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -66,7 +66,7 @@ namespace JT1078.Gateway builder.JT1078Builder.Services.AddHostedService(); return builder; } - + public static IJT1078GatewayBuilder AddCoordinatorHttpClient(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.AddSingleton(); @@ -74,16 +74,6 @@ namespace JT1078.Gateway return builder; } - public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder) - { - return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder); - } - - public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder) - { - return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder); - } - internal static IJT1078GatewayBuilder AddJT1078Core(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.AddSingleton(); diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs index 2997c87..f73f80a 100644 --- a/src/JT1078.Gateway/JT1078TcpServer.cs +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -1,7 +1,6 @@ using System; using System.Buffers; using System.Buffers.Binary; -using System.Collections.Generic; using System.IO.Pipelines; using System.Linq; using System.Net; @@ -10,7 +9,6 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using JT1078.Gateway.Abstractions; -using JT1078.Gateway.Abstractions.Enums; using JT1078.Gateway.Configurations; using JT1078.Gateway.Sessions; using JT1078.Protocol; @@ -34,34 +32,8 @@ namespace JT1078.Gateway private readonly JT1078SessionManager SessionManager; - private readonly IJT1078PackageProducer jT1078PackageProducer; - private readonly IJT1078MsgProducer jT1078MsgProducer; - private readonly JT1078UseType jT1078UseType; - - /// - /// 使用正常方式 - /// - /// - /// - /// - /// - public JT1078TcpServer( - IJT1078PackageProducer jT1078PackageProducer, - IOptions jT1078ConfigurationAccessor, - ILoggerFactory loggerFactory, - JT1078SessionManager jT1078SessionManager) - { - SessionManager = jT1078SessionManager; - jT1078UseType = JT1078UseType.Normal; - Logger = loggerFactory.CreateLogger(); - LogLogger = loggerFactory.CreateLogger("JT1078Logging"); - Configuration = jT1078ConfigurationAccessor.Value; - this.jT1078PackageProducer = jT1078PackageProducer; - InitServer(); - } - /// /// 使用队列方式 /// @@ -70,15 +42,14 @@ namespace JT1078.Gateway /// /// public JT1078TcpServer( - IJT1078MsgProducer jT1078MsgProducer, + IJT1078MsgProducer jT1078MsgProducer, IOptions jT1078ConfigurationAccessor, ILoggerFactory loggerFactory, JT1078SessionManager jT1078SessionManager) { SessionManager = jT1078SessionManager; - jT1078UseType = JT1078UseType.Queue; Logger = loggerFactory.CreateLogger(); - LogLogger = loggerFactory.CreateLogger("JT1078Logging"); + LogLogger = loggerFactory.CreateLogger("JT1078.Gateway.JT1078Logging"); Configuration = jT1078ConfigurationAccessor.Value; this.jT1078MsgProducer = jT1078MsgProducer; InitServer(); @@ -266,14 +237,7 @@ namespace JT1078.Gateway try { SessionManager.TryLink(fixedHeaderInfo.SIM, session); - if (jT1078UseType == JT1078UseType.Queue) - { - jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.ToArray()); - } - else - { - jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1)); - } + jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.ToArray()); } catch (Exception ex) { @@ -307,14 +271,7 @@ namespace JT1078.Gateway try { SessionManager.TryLink(fixedHeaderInfo.SIM, session); - if (jT1078UseType == JT1078UseType.Queue) - { - jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package); - } - else - { - jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package)); - } + jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package); } catch (Exception ex) { diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs index b4c13f6..de2bd71 100644 --- a/src/JT1078.Gateway/JT1078UdpServer.cs +++ b/src/JT1078.Gateway/JT1078UdpServer.cs @@ -1,20 +1,13 @@ using System; using System.Buffers; -using System.Buffers.Binary; -using System.Collections.Generic; -using System.IO.Pipelines; -using System.Linq; using System.Net; using System.Net.Sockets; -using System.Text; using System.Threading; using System.Threading.Tasks; using JT1078.Gateway.Abstractions; -using JT1078.Gateway.Abstractions.Enums; using JT1078.Gateway.Configurations; using JT1078.Gateway.Sessions; using JT1078.Protocol; -using JT1078.Protocol.Enums; using JT1078.Protocol.Extensions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; @@ -32,33 +25,8 @@ namespace JT1078.Gateway private readonly JT1078SessionManager SessionManager; - private readonly IJT1078PackageProducer jT1078PackageProducer; - private readonly IJT1078MsgProducer jT1078MsgProducer; - private readonly JT1078UseType jT1078UseType; - - /// - /// 使用正常方式 - /// - /// - /// - /// - /// - public JT1078UdpServer( - IJT1078PackageProducer jT1078PackageProducer, - IOptions jT1078ConfigurationAccessor, - ILoggerFactory loggerFactory, - JT1078SessionManager jT1078SessionManager) - { - SessionManager = jT1078SessionManager; - jT1078UseType = JT1078UseType.Normal; - Logger = loggerFactory.CreateLogger(); - Configuration = jT1078ConfigurationAccessor.Value; - this.jT1078PackageProducer = jT1078PackageProducer; - InitServer(); - } - /// /// 使用队列方式 /// @@ -73,7 +41,6 @@ namespace JT1078.Gateway JT1078SessionManager jT1078SessionManager) { SessionManager = jT1078SessionManager; - jT1078UseType = JT1078UseType.Queue; Logger = loggerFactory.CreateLogger(); Configuration = jT1078ConfigurationAccessor.Value; this.jT1078MsgProducer = jT1078MsgProducer; @@ -134,14 +101,7 @@ namespace JT1078.Gateway { Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}"); } - if (jT1078UseType == JT1078UseType.Queue) - { - jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray()); - } - else - { - jT1078PackageProducer.ProduceAsync(package.SIM, package); - } + jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray()); } catch (NotImplementedException ex) { diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs index 3de8190..b8ae525 100644 --- a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs @@ -38,14 +38,14 @@ namespace JT1078.Gateway.Jobs { if (logger.IsEnabled(LogLevel.Information)) { - logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}"); + logger.LogInformation($"[Notice]:{notice.SIM}-{notice.ProtocolType}-{notice.SessionType}"); } if(JT1078GatewayConstants.SessionOffline== notice.SessionType) { if (HttpSessionManager != null) { //当1078设备主动断开的情况下,需要关闭所有再观看的连接 - HttpSessionManager.TryRemoveBySim(notice.TerminalPhoneNo); + HttpSessionManager.TryRemoveBySim(notice.SIM); } } } diff --git a/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs index 3d1b70d..ecdb843 100644 --- a/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs +++ b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs @@ -7,10 +7,10 @@ namespace JT1078.Gateway.Services { public class JT1078SessionNoticeService { - public BlockingCollection<(string SessionType, string TerminalPhoneNo,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; } + public BlockingCollection<(string SessionType, string SIM,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; } public JT1078SessionNoticeService() { - SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string TerminalPhoneNo, string ProtocolType)>(); + SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string SIM, string ProtocolType)>(); } } } diff --git a/src/Version.props b/src/Version.props new file mode 100644 index 0000000..19ae7ec --- /dev/null +++ b/src/Version.props @@ -0,0 +1,5 @@ + + + 1.0.0-preview3 + + \ No newline at end of file