diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index 9cc33a1..2d5012b 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -26,10 +26,10 @@ - - - - + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj index 5b454b9..fbdca8d 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj @@ -7,10 +7,10 @@ - + - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index a88eb40..ad156de 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -6,9 +6,10 @@ - - - + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 22acca0..9616f49 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -1,4 +1,5 @@ -using JT1078.Gateway.InMemoryMQ; +using JT1078.Flv; +using JT1078.Gateway.InMemoryMQ; using JT1078.Gateway.TestNormalHosting.Services; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; @@ -28,12 +29,13 @@ namespace JT1078.Gateway.TestNormalHosting { services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + services.AddSingleton(); //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) .AddTcp() .AddUdp() .AddHttp() - .AddCoordinatorHttpClient() + //.AddCoordinatorHttpClient() .AddNormal() .AddMsgProducer() .AddMsgConsumer(); diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs index 6e9b742..abc7c1f 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs @@ -1,25 +1,51 @@ using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Sessions; +using JT1078.Flv; using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; using System.Text; using System.Threading; using System.Threading.Tasks; +using System.Linq; namespace JT1078.Gateway.TestNormalHosting.Services { public class JT1078NormalMsgHostedService : BackgroundService { private IJT1078PackageConsumer PackageConsumer; - public JT1078NormalMsgHostedService(IJT1078PackageConsumer packageConsumer) + private JT1078HttpSessionManager HttpSessionManager; + private FlvEncoder FlvEncoder; + public JT1078NormalMsgHostedService( + FlvEncoder flvEncoder, + JT1078HttpSessionManager httpSessionManager, + IJT1078PackageConsumer packageConsumer) { PackageConsumer = packageConsumer; + HttpSessionManager = httpSessionManager; + FlvEncoder = flvEncoder; } protected override Task ExecuteAsync(CancellationToken stoppingToken) { - PackageConsumer.OnMessage((Message) => + PackageConsumer.OnMessage((Message) => { - + var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data); + if (merge != null) + { + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM, Message.Data.LogicChannelNumber); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); + if (firstHttpSessions.Count > 0) + { + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, true); + HttpSessionManager.SendAVData(firstHttpSessions, flvVideoBuffer, true); + } + var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); + if (otherHttpSessions.Count > 0) + { + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false); + HttpSessionManager.SendAVData(firstHttpSessions, flvVideoBuffer, false); + } + } }); return Task.CompletedTask; } diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs index 095117a..8079d9f 100644 --- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs +++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs @@ -23,14 +23,14 @@ namespace JT1078.Gateway.Metadata } } public DateTime StartTime { get; set; } - public bool SendChunked { get; set; } + public bool FirstSend { get; set; } public JT1078HttpContext(HttpListenerContext context, IPrincipal user) { Context = context; User = user; StartTime = DateTime.Now; SessionId = Guid.NewGuid().ToString("N"); - SendChunked = false; + FirstSend = false; } public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user) { @@ -39,7 +39,7 @@ namespace JT1078.Gateway.Metadata User = user; StartTime = DateTime.Now; SessionId = Guid.NewGuid().ToString("N"); - SendChunked = false; + FirstSend = false; } } } diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs index fef4040..e31d0c3 100644 --- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -88,16 +88,20 @@ namespace JT1078.Gateway.Sessions /// /// 发送音视频数据 /// - /// - /// + /// /// - public void SendAVData(string sim,int channelNo,byte[] data) + /// + public void SendAVData(List httpContexts, byte[] data, bool firstSend) { - var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList(); - ParallelLoopResult parallelLoopResult= Parallel.ForEach(contexts, async(context) => + ParallelLoopResult parallelLoopResult = Parallel.ForEach(httpContexts, async (context) => { if (context.IsWebSocket) { + if (firstSend) + { + context.FirstSend = firstSend; + Sessions.TryUpdate(context.SessionId, context, context); + } try { await context.WebSocketSendBinaryAsync(data); @@ -109,13 +113,13 @@ namespace JT1078.Gateway.Sessions Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); } remove(context.SessionId); - } + } } else { - if (!context.SendChunked) + if (firstSend) { - context.SendChunked = true; + context.FirstSend = firstSend; Sessions.TryUpdate(context.SessionId, context, context); try { @@ -153,89 +157,6 @@ namespace JT1078.Gateway.Sessions } } - /// - /// 发送音视频数据到websocket - /// - /// - /// - /// - public void SendAVData2WebSocket(string sim, int channelNo, byte[] data) - { - var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && w.IsWebSocket).ToList(); - ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) => - { - if (context.IsWebSocket) - { - try - { - await context.WebSocketSendBinaryAsync(data); - } - catch (Exception ex) - { - if (Logger.IsEnabled(LogLevel.Information)) - { - Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); - } - remove(context.SessionId); - } - } - }); - if (parallelLoopResult.IsCompleted) - { - - } - } - - /// - /// 发送音视频数据到Http Chunked中 - /// - /// - /// - /// - public void SendAVData2HttpChunked(string sim, int channelNo, byte[] data) - { - var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && !w.IsWebSocket).ToList(); - ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) => - { - if (!context.SendChunked) - { - context.SendChunked = true; - Sessions.TryUpdate(context.SessionId, context, context); - try - { - await context.HttpSendFirstChunked(data); - } - catch (Exception ex) - { - if (Logger.IsEnabled(LogLevel.Information)) - { - Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); - } - remove(context.SessionId); - } - } - else - { - try - { - await context.HttpSendChunked(data); - } - catch (Exception ex) - { - if (Logger.IsEnabled(LogLevel.Information)) - { - Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); - } - remove(context.SessionId); - } - } - }); - if (parallelLoopResult.IsCompleted) - { - - } - } - public int SessionCount { get @@ -260,6 +181,11 @@ namespace JT1078.Gateway.Sessions } } + public List GetAllBySimAndChannelNo(string sim, int channelNo) + { + return Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList(); + } + public List GetAll() { return Sessions.Select(s => s.Value).ToList();