From 5cee1a8d9984342fd6669d2bac54790b1f0a4dc1 Mon Sep 17 00:00:00 2001 From: waterliu99 Date: Wed, 31 Mar 2021 15:30:36 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E4=B8=80=E4=B8=8Bhls?= =?UTF-8?q?=EF=BC=8C=E5=8F=AF=E5=90=8C=E6=97=B6=E6=92=AD=E6=94=BEhls?= =?UTF-8?q?=E5=92=8Cflv=E8=A7=86=E9=A2=91?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT1078.Gateway.TestNormalHosting.csproj | 2 +- .../Program.cs | 24 ++- .../JT1078FlvNormalMsgHostedService.cs | 43 ++--- .../JT1078HlsNormalMsgHostedService.cs | 36 ++-- .../Services/MessageDispatchDataService.cs | 16 ++ .../Services/MessageDispatchHostedService.cs | 43 +++++ .../appsettings.json | 4 +- .../wwwroot/.vscode/settings.json | 3 + .../wwwroot/demo/index.html | 3 +- .../wwwroot/index.html | 2 +- src/JT1078.Gateway.sln | 12 ++ .../Extensions/JT1078HttpContextExtensions.cs | 36 ++++ src/JT1078.Gateway/HLSPathStorage.cs | 71 +++++++ src/JT1078.Gateway/HLSRequestManager.cs | 178 ++++++++++-------- src/JT1078.Gateway/JT1078.Gateway.csproj | 1 + src/JT1078.Gateway/JT1078.Gateway.xml | 65 +++++++ src/JT1078.Gateway/JT1078GatewayExtensions.cs | 11 ++ .../Jobs/JT1078SessionClearJob.cs | 86 +++++++++ .../Sessions/JT1078HttpSessionManager.cs | 14 +- 19 files changed, 513 insertions(+), 137 deletions(-) create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs create mode 100644 src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json create mode 100644 src/JT1078.Gateway/HLSPathStorage.cs create mode 100644 src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index 33ecf75..b598c1c 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -7,7 +7,6 @@ - @@ -16,6 +15,7 @@ + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 66f0378..58172b1 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using NLog.Extensions.Logging; using System; using System.IO; @@ -33,18 +34,21 @@ namespace JT1078.Gateway.TestNormalHosting }) .ConfigureServices((hostContext, services) => { - services.AddMemoryCache(); - services.AddScoped(); - services.AddSingleton(); + services.AddMemoryCache(); services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + //flv视频解码器 services.AddSingleton(); + //hls视频解码器 services.AddSingleton(); - services.AddSingleton(new M3U8Option - { - - }); services.AddSingleton(); + //添加hls依赖项 + services.AddHlsGateway(hostContext.Configuration); + services.Configure(hostContext.Configuration.GetSection("M3U8Option")); + var m3u8Option = services.BuildServiceProvider().GetRequiredService>().Value; + services.AddSingleton(m3u8Option); + + //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) .AddTcp() @@ -55,7 +59,11 @@ namespace JT1078.Gateway.TestNormalHosting .AddMsgConsumer(); //内存队列没有做分发,可以自己实现。 services.AddHostedService(); - //services.AddHostedService(); + services.AddHostedService(); + + services.AddSingleton(); + services.AddHostedService(); + }); //测试1: //发送完整包 diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs index 27a9ce7..b5c3df6 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs @@ -25,8 +25,10 @@ namespace JT1078.Gateway.TestNormalHosting.Services private ILogger Logger; private IMemoryCache memoryCache; private const string ikey = "IKEY"; + private MessageDispatchDataService messageDispatchDataService; public JT1078FlvNormalMsgHostedService( + MessageDispatchDataService messageDispatchDataService, IMemoryCache memoryCache, ILoggerFactory loggerFactory, FlvEncoder flvEncoder, @@ -38,27 +40,26 @@ namespace JT1078.Gateway.TestNormalHosting.Services HttpSessionManager = httpSessionManager; FlvEncoder = flvEncoder; this.memoryCache = memoryCache; + this.messageDispatchDataService = messageDispatchDataService; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected async override Task ExecuteAsync(CancellationToken stoppingToken) { - JT1078MsgConsumer.OnMessage((Message) => + while (!stoppingToken.IsCancellationRequested) { - JT1078Package package = JT1078Serializer.Deserialize(Message.Data); - if (Logger.IsEnabled(LogLevel.Debug)) - { - Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); - Logger.LogDebug($"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); - } + var data = await messageDispatchDataService.FlvChannel.Reader.ReadAsync(); try - { - var merge = JT1078Serializer.Merge(package); - if (merge == null) return; - string key = $"{package.GetKey()}_{ikey}"; - if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) + { + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); + Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + string key = $"{data.GetKey()}_{ikey}"; + if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) { - memoryCache.Set(key, merge); + memoryCache.Set(key, data); } - var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(package.SIM.TrimStart('0'), package.LogicChannelNumber); + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); if (firstHttpSessions.Count > 0) { @@ -74,7 +75,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{true},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } } } @@ -83,7 +84,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services { try { - var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false); + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(data, false); foreach (var session in otherHttpSessions) { HttpSessionManager.SendAVData(session, flvVideoBuffer, false); @@ -91,16 +92,16 @@ namespace JT1078.Gateway.TestNormalHosting.Services } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{false},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{false},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } } } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } - }); - return Task.CompletedTask; + } + await Task.CompletedTask; } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs index 4767edd..88a8026 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs @@ -3,6 +3,7 @@ using JT1078.Gateway.Sessions; using JT1078.Hls; using JT1078.Protocol; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; @@ -17,34 +18,39 @@ namespace JT1078.Gateway.TestNormalHosting.Services private IJT1078MsgConsumer MsgConsumer; private JT1078HttpSessionManager HttpSessionManager; private M3U8FileManage M3U8FileManage; + private MessageDispatchDataService messageDispatchDataService; + private readonly ILogger logger; public JT1078HlsNormalMsgHostedService( + ILoggerFactory loggerFactory, M3U8FileManage M3U8FileManage, JT1078HttpSessionManager httpSessionManager, + MessageDispatchDataService messageDispatchDataService, IJT1078MsgConsumer msgConsumer) { + logger = loggerFactory.CreateLogger(); MsgConsumer = msgConsumer; HttpSessionManager = httpSessionManager; this.M3U8FileManage = M3U8FileManage; + this.messageDispatchDataService = messageDispatchDataService; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected async override Task ExecuteAsync(CancellationToken stoppingToken) { - MsgConsumer.OnMessage((Message) => + while (!stoppingToken.IsCancellationRequested) { - JT1078Package package = JT1078Serializer.Deserialize(Message.Data); - var merge = JT1078.Protocol.JT1078Serializer.Merge(package); - if (merge != null) + var data = await messageDispatchDataService.HlsChannel.Reader.ReadAsync(); + logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber},session:{System.Text.Json.JsonSerializer.Serialize(HttpSessionManager)}"); + var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(data.SIM, data.LogicChannelNumber).Where(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList(); + if (hasHttpSessionn.Count > 0) { - var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(merge.SIM, merge.LogicChannelNumber); - if (hasHttpSessionn.Count>0) - { - M3U8FileManage.CreateTsData(merge); - } - else { - M3U8FileManage.Clear(merge.SIM, merge.LogicChannelNumber); - } + logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber}连上了"); + M3U8FileManage.CreateTsData(data); } - }); - return Task.CompletedTask; + else + { + logger.LogDebug($"没有设备链接"); + } + } + await Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs new file mode 100644 index 0000000..07c81d4 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs @@ -0,0 +1,16 @@ +using JT1078.Protocol; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + public class MessageDispatchDataService + { + public Channel HlsChannel = Channel.CreateUnbounded(); + public Channel FlvChannel = Channel.CreateUnbounded(); + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs new file mode 100644 index 0000000..06624ef --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs @@ -0,0 +1,43 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Protocol; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + /// + /// 消费分发服务。同时分发给hls和flv + /// + public class MessageDispatchHostedService : BackgroundService + { + private IJT1078MsgConsumer JT1078MsgConsumer; + private readonly MessageDispatchDataService messageDispatchDataService; + + public MessageDispatchHostedService(IJT1078MsgConsumer JT1078MsgConsumer, + MessageDispatchDataService messageDispatchDataService) { + this.JT1078MsgConsumer = JT1078MsgConsumer; + this.messageDispatchDataService = messageDispatchDataService; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + JT1078MsgConsumer.OnMessage(async (Message) => + { + JT1078Package package = JT1078Serializer.Deserialize(Message.Data); + var merge = JT1078.Protocol.JT1078Serializer.Merge(package); + if (merge != null) + { + await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge); + await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge); + } + }); + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json index 65f5cbd..88b2301 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json @@ -16,9 +16,9 @@ "HttpPort": 15555 }, "M3U8Option": { - "TsFileCapacity": 10, + "TsFileCapacity": 5, "TsFileMaxSecond": 10, "M3U8FileName": "live.m3u8", - "HlsFileDirectory":"wwwroot/demo" + "HlsFileDirectory":"wwwroot" } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json new file mode 100644 index 0000000..6f3a291 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "liveServer.settings.port": 5501 +} \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html index 554e735..3a2c188 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html @@ -8,7 +8,8 @@