diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index 33ecf75..b598c1c 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -7,7 +7,6 @@ <ItemGroup> <PackageReference Include="JT1078.Flv" Version="1.1.0" /> - <PackageReference Include="JT1078.Hls" Version="1.1.0-preview1" /> <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" /> @@ -16,6 +15,7 @@ </ItemGroup> <ItemGroup> + <ProjectReference Include="..\..\..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj" /> <ProjectReference Include="..\..\JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj" /> <ProjectReference Include="..\..\JT1078.Gateway\JT1078.Gateway.csproj" /> </ItemGroup> diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 66f0378..58172b1 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using NLog.Extensions.Logging; using System; using System.IO; @@ -33,18 +34,21 @@ namespace JT1078.Gateway.TestNormalHosting }) .ConfigureServices((hostContext, services) => { - services.AddMemoryCache(); - services.AddScoped<FileSystemWatcher>(); - services.AddSingleton<HLSRequestManager>(); + services.AddMemoryCache(); services.AddSingleton<ILoggerFactory, LoggerFactory>(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + //flv视频解码器 services.AddSingleton<FlvEncoder>(); + //hls视频解码器 services.AddSingleton<TSEncoder>(); - services.AddSingleton(new M3U8Option - { - - }); services.AddSingleton<M3U8FileManage>(); + //添加hls依赖项 + services.AddHlsGateway(hostContext.Configuration); + services.Configure<M3U8Option>(hostContext.Configuration.GetSection("M3U8Option")); + var m3u8Option = services.BuildServiceProvider().GetRequiredService<IOptions<M3U8Option>>().Value; + services.AddSingleton(m3u8Option); + + //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) .AddTcp() @@ -55,7 +59,11 @@ namespace JT1078.Gateway.TestNormalHosting .AddMsgConsumer(); //内存队列没有做分发,可以自己实现。 services.AddHostedService<JT1078FlvNormalMsgHostedService>(); - //services.AddHostedService<JT1078HlsNormalMsgHostedService>(); + services.AddHostedService<JT1078HlsNormalMsgHostedService>(); + + services.AddSingleton<MessageDispatchDataService>(); + services.AddHostedService<MessageDispatchHostedService>(); + }); //测试1: //发送完整包 diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs index 27a9ce7..b5c3df6 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs @@ -25,8 +25,10 @@ namespace JT1078.Gateway.TestNormalHosting.Services private ILogger Logger; private IMemoryCache memoryCache; private const string ikey = "IKEY"; + private MessageDispatchDataService messageDispatchDataService; public JT1078FlvNormalMsgHostedService( + MessageDispatchDataService messageDispatchDataService, IMemoryCache memoryCache, ILoggerFactory loggerFactory, FlvEncoder flvEncoder, @@ -38,27 +40,26 @@ namespace JT1078.Gateway.TestNormalHosting.Services HttpSessionManager = httpSessionManager; FlvEncoder = flvEncoder; this.memoryCache = memoryCache; + this.messageDispatchDataService = messageDispatchDataService; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected async override Task ExecuteAsync(CancellationToken stoppingToken) { - JT1078MsgConsumer.OnMessage((Message) => + while (!stoppingToken.IsCancellationRequested) { - JT1078Package package = JT1078Serializer.Deserialize(Message.Data); - if (Logger.IsEnabled(LogLevel.Debug)) - { - Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); - Logger.LogDebug($"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); - } + var data = await messageDispatchDataService.FlvChannel.Reader.ReadAsync(); try - { - var merge = JT1078Serializer.Merge(package); - if (merge == null) return; - string key = $"{package.GetKey()}_{ikey}"; - if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) + { + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); + Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + string key = $"{data.GetKey()}_{ikey}"; + if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) { - memoryCache.Set(key, merge); + memoryCache.Set(key, data); } - var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(package.SIM.TrimStart('0'), package.LogicChannelNumber); + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); if (firstHttpSessions.Count > 0) { @@ -74,7 +75,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{true},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } } } @@ -83,7 +84,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services { try { - var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false); + var flvVideoBuffer = FlvEncoder.EncoderVideoTag(data, false); foreach (var session in otherHttpSessions) { HttpSessionManager.SendAVData(session, flvVideoBuffer, false); @@ -91,16 +92,16 @@ namespace JT1078.Gateway.TestNormalHosting.Services } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{false},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{false},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } } } catch (Exception ex) { - Logger.LogError(ex, $"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}"); + Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } - }); - return Task.CompletedTask; + } + await Task.CompletedTask; } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs index 4767edd..88a8026 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs @@ -3,6 +3,7 @@ using JT1078.Gateway.Sessions; using JT1078.Hls; using JT1078.Protocol; using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.Linq; @@ -17,34 +18,39 @@ namespace JT1078.Gateway.TestNormalHosting.Services private IJT1078MsgConsumer MsgConsumer; private JT1078HttpSessionManager HttpSessionManager; private M3U8FileManage M3U8FileManage; + private MessageDispatchDataService messageDispatchDataService; + private readonly ILogger logger; public JT1078HlsNormalMsgHostedService( + ILoggerFactory loggerFactory, M3U8FileManage M3U8FileManage, JT1078HttpSessionManager httpSessionManager, + MessageDispatchDataService messageDispatchDataService, IJT1078MsgConsumer msgConsumer) { + logger = loggerFactory.CreateLogger<JT1078HlsNormalMsgHostedService>(); MsgConsumer = msgConsumer; HttpSessionManager = httpSessionManager; this.M3U8FileManage = M3U8FileManage; + this.messageDispatchDataService = messageDispatchDataService; } - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected async override Task ExecuteAsync(CancellationToken stoppingToken) { - MsgConsumer.OnMessage((Message) => + while (!stoppingToken.IsCancellationRequested) { - JT1078Package package = JT1078Serializer.Deserialize(Message.Data); - var merge = JT1078.Protocol.JT1078Serializer.Merge(package); - if (merge != null) + var data = await messageDispatchDataService.HlsChannel.Reader.ReadAsync(); + logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber},session:{System.Text.Json.JsonSerializer.Serialize(HttpSessionManager)}"); + var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(data.SIM, data.LogicChannelNumber).Where(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList(); + if (hasHttpSessionn.Count > 0) { - var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(merge.SIM, merge.LogicChannelNumber); - if (hasHttpSessionn.Count>0) - { - M3U8FileManage.CreateTsData(merge); - } - else { - M3U8FileManage.Clear(merge.SIM, merge.LogicChannelNumber); - } + logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber}连上了"); + M3U8FileManage.CreateTsData(data); } - }); - return Task.CompletedTask; + else + { + logger.LogDebug($"没有设备链接"); + } + } + await Task.CompletedTask; } } } \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs new file mode 100644 index 0000000..07c81d4 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs @@ -0,0 +1,16 @@ +using JT1078.Protocol; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + public class MessageDispatchDataService + { + public Channel<JT1078Package> HlsChannel = Channel.CreateUnbounded<JT1078Package>(); + public Channel<JT1078Package> FlvChannel = Channel.CreateUnbounded<JT1078Package>(); + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs new file mode 100644 index 0000000..06624ef --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs @@ -0,0 +1,43 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Protocol; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Channels; +using System.Threading.Tasks; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + /// <summary> + /// 消费分发服务。同时分发给hls和flv + /// </summary> + public class MessageDispatchHostedService : BackgroundService + { + private IJT1078MsgConsumer JT1078MsgConsumer; + private readonly MessageDispatchDataService messageDispatchDataService; + + public MessageDispatchHostedService(IJT1078MsgConsumer JT1078MsgConsumer, + MessageDispatchDataService messageDispatchDataService) { + this.JT1078MsgConsumer = JT1078MsgConsumer; + this.messageDispatchDataService = messageDispatchDataService; + } + + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + JT1078MsgConsumer.OnMessage(async (Message) => + { + JT1078Package package = JT1078Serializer.Deserialize(Message.Data); + var merge = JT1078.Protocol.JT1078Serializer.Merge(package); + if (merge != null) + { + await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge); + await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge); + } + }); + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json index 65f5cbd..88b2301 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json @@ -16,9 +16,9 @@ "HttpPort": 15555 }, "M3U8Option": { - "TsFileCapacity": 10, + "TsFileCapacity": 5, "TsFileMaxSecond": 10, "M3U8FileName": "live.m3u8", - "HlsFileDirectory":"wwwroot/demo" + "HlsFileDirectory":"wwwroot" } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json new file mode 100644 index 0000000..6f3a291 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "liveServer.settings.port": 5501 +} \ No newline at end of file diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html index 554e735..3a2c188 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html @@ -8,7 +8,8 @@ <video autoplay muted id="video"></video> <script> var video = document.getElementById('video'); - var videoSrc = 'demo.m3u8'; + //var videoSrc = 'demo.m3u8'; + var videoSrc = 'http://127.0.0.1:15555/live.m3u8?token=123456&sim=001901305037&channelNo=1'; // // First check for native browser HLS support // diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html index 4b9a5ae..d59b50b 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html @@ -15,7 +15,7 @@ var flvPlayer = flvjs.createPlayer({ type: 'flv', isLive: true, - //url: "http://127.0.0.1:15555/?sim=1901305037&channel=3&token=123456" + url: "http://127.0.0.1:15555/?sim=1901305037&channel=1&token=123456" //url: "ws://127.0.0.1:15555/?sim=11234567810&channel=1&token=123456" }); flvPlayer.attachMediaElement(player); diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index 4cf5778..72880a0 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -17,6 +17,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Ga EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Hls", "..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj", "{EBA7BD86-7C58-470D-BC1A-04729C140C34}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Protocol", "..\..\JT1078\src\JT1078.Protocol\JT1078.Protocol.csproj", "{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -47,6 +51,14 @@ Global {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.Build.0 = Debug|Any CPU {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.ActiveCfg = Release|Any CPU {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.Build.0 = Release|Any CPU + {EBA7BD86-7C58-470D-BC1A-04729C140C34}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EBA7BD86-7C58-470D-BC1A-04729C140C34}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EBA7BD86-7C58-470D-BC1A-04729C140C34}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EBA7BD86-7C58-470D-BC1A-04729C140C34}.Release|Any CPU.Build.0 = Release|Any CPU + {759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs index 3a82aee..63bb046 100644 --- a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs +++ b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs @@ -62,6 +62,42 @@ namespace JT1078.Gateway.Extensions context.Response.OutputStream.Close(); context.Response.Close(); } + /// <summary> + /// 返回m3u8响应 + /// </summary> + /// <param name="context"></param> + /// <param name="buffer"></param> + /// <returns></returns> + public static async ValueTask HttpM3U8Async(this HttpListenerContext context, ReadOnlyMemory<byte> buffer) + { + context.Response.AddHeader("Access-Control-Allow-Headers", "*"); + context.Response.AppendHeader("Access-Control-Allow-Origin", "*"); + context.Response.ContentType = "application/x-mpegURL"; + context.Response.StatusCode = (int)HttpStatusCode.OK; + context.Response.ContentLength64 = buffer.Length; + context.Response.KeepAlive = false; + await context.Response.OutputStream.WriteAsync(buffer); + context.Response.OutputStream.Close(); + context.Response.Close(); + } + /// <summary> + /// 返回ts响应数 + /// </summary> + /// <param name="context"></param> + /// <param name="buffer"></param> + /// <returns></returns> + public static async ValueTask HttpTsAsync(this HttpListenerContext context, ReadOnlyMemory<byte> buffer) + { + context.Response.AddHeader("Access-Control-Allow-Headers", "*"); + context.Response.AppendHeader("Access-Control-Allow-Origin", "*"); + context.Response.ContentType = "video/MP2T"; + context.Response.StatusCode = (int)HttpStatusCode.OK; + context.Response.ContentLength64 = buffer.Length; + context.Response.KeepAlive = false; + await context.Response.OutputStream.WriteAsync(buffer); + context.Response.OutputStream.Close(); + context.Response.Close(); + } public static async ValueTask HttpSendFirstChunked(this JT1078HttpContext context, ReadOnlyMemory<byte> buffer) { diff --git a/src/JT1078.Gateway/HLSPathStorage.cs b/src/JT1078.Gateway/HLSPathStorage.cs new file mode 100644 index 0000000..e734d81 --- /dev/null +++ b/src/JT1078.Gateway/HLSPathStorage.cs @@ -0,0 +1,71 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; + +namespace JT1078.Gateway +{ + /// <summary> + /// hls路径是否存在处理,及文件监控处理 + /// </summary> + public class HLSPathStorage + { + private readonly ConcurrentDictionary<string, string> path_sim_channelDic = new ConcurrentDictionary<string, string>(); + private readonly ConcurrentDictionary<string, FileSystemWatcher> pathFileSystemWaterDic = new ConcurrentDictionary<string, FileSystemWatcher>(); + /// <summary> + /// 添加路径 + /// </summary> + /// <param name="path"></param> + public void AddPath(string path,string key) { + path_sim_channelDic.TryAdd(path, key); + } + /// <summary> + /// 判断路径是否存在 + /// </summary> + /// <param name="path"></param> + /// <returns></returns> + public bool ExsitPath(string path) { + return path_sim_channelDic.TryGetValue(path, out var _); + } + /// <summary> + /// 移除所有路径 + /// </summary> + /// <returns></returns> + public bool RemoveAllPath(string key) { + var flag = false; + var paths = path_sim_channelDic.Where(m => m.Value == key).ToList(); + foreach (var item in paths) + { + flag = path_sim_channelDic.TryRemove(item.Key,out var _); + } + return flag; + } + + /// <summary> + /// 是否存在文件监控 + /// </summary> + /// <param name="path"></param> + /// <returns></returns> + public bool ExistFileSystemWatcher(string path) { + return pathFileSystemWaterDic.TryGetValue(path, out var _); + } + /// <summary> + /// 添加文件监控 + /// </summary> + /// <param name="path"></param> + /// <param name="fileSystemWatcher"></param> + public void AddFileSystemWatcher(string path, FileSystemWatcher fileSystemWatcher) { + pathFileSystemWaterDic.TryAdd(path, fileSystemWatcher); + } + /// <summary> + /// 删除文件监控 + /// </summary> + /// <param name="path"></param> + public bool DeleteFileSystemWatcher(string path) + { + return pathFileSystemWaterDic.TryRemove(path, out var _); + } + } +} diff --git a/src/JT1078.Gateway/HLSRequestManager.cs b/src/JT1078.Gateway/HLSRequestManager.cs index 0fa673a..64531f6 100644 --- a/src/JT1078.Gateway/HLSRequestManager.cs +++ b/src/JT1078.Gateway/HLSRequestManager.cs @@ -3,9 +3,11 @@ using JT1078.Gateway.Extensions; using JT1078.Gateway.Metadata; using JT1078.Gateway.Sessions; using Microsoft.Extensions.Caching.Memory; +using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; +using System.Collections.Concurrent; using System.Collections.Generic; using System.IO; using System.Linq; @@ -22,51 +24,34 @@ namespace JT1078.Gateway /// </summary> public class HLSRequestManager { - private const string m3u8Mime = "application/x-mpegURL"; - private const string tsMime = "video/MP2T"; private readonly JT1078Configuration Configuration; private readonly JT1078HttpSessionManager HttpSessionManager; - private readonly JT1078SessionManager SessionManager; + private readonly HLSPathStorage hLSPathStorage; private readonly ILogger Logger; - private IMemoryCache memoryCache; - private FileSystemWatcher fileSystemWatcher; + + private readonly IServiceProvider serviceProvider; + //private FileSystemWatcher fileSystemWatcher; public HLSRequestManager( - IMemoryCache memoryCache, IOptions<JT1078Configuration> jT1078ConfigurationAccessor, JT1078HttpSessionManager httpSessionManager, - JT1078SessionManager sessionManager, - FileSystemWatcher fileSystemWatcher, + HLSPathStorage hLSPathStorage, + IServiceProvider serviceProvider, ILoggerFactory loggerFactory) { - this.memoryCache = memoryCache; - this.fileSystemWatcher = fileSystemWatcher; + this.serviceProvider = serviceProvider; HttpSessionManager = httpSessionManager; - SessionManager = sessionManager; + this.hLSPathStorage = hLSPathStorage; Configuration = jT1078ConfigurationAccessor.Value; Logger = loggerFactory.CreateLogger<HLSRequestManager>(); - - Task.Run(()=> { - while (true) - { - var expireds= HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 20).ToList(); - foreach (var item in expireds) - { - //移除httpsession - HttpSessionManager.TryRemoveBySim(item.Sim); - //移除tcpsession - SessionManager.RemoveByTerminalPhoneNo(item.Sim); - } - Thread.Sleep(TimeSpan.FromSeconds(10)); - } - }); } /// <summary> /// 处理hls实时视频请求 /// </summary> /// <param name="context"></param> /// <param name="principal"></param> - public async void HandleHlsRequest(HttpListenerContext context, IPrincipal principal) { + public async void HandleHlsRequest(HttpListenerContext context, IPrincipal principal) + { if (context.Request.QueryString.Count < 2) { context.Http404(); @@ -77,85 +62,116 @@ namespace JT1078.Gateway string key = $"{sim}_{channelNo}"; string filename = Path.GetFileName(context.Request.Url.AbsolutePath.ToString()); string filepath = Path.Combine(Configuration.HlsRootDirectory, key, filename); - if (!File.Exists(filepath)) + + if (hLSPathStorage.ExsitPath(filepath)) { - if (filename.ToLower().Contains("m3u8")) + try { - fileSystemWatcher = new FileSystemWatcher(); - fileSystemWatcher.Path = Path.Combine(Configuration.HlsRootDirectory, key); - fileSystemWatcher.NotifyFilter = NotifyFilters.LastWrite; //NotifyFilters.CreateTime - fileSystemWatcher.Filter = "*.m3u8"; // Only watch text files. - fileSystemWatcher.Changed += (sender, arg) => + using (FileStream sr = new FileStream(filepath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) { - if (context.Response.ContentLength64 != 0) return; - //wwwroot\1234_2\live.m3u8 - //var key = arg.FullPath.Replace(arg.Name, "").Substring(arg.FullPath.Replace(arg.Name, "").IndexOf("\\")).Replace("\\", ""); - var key = arg.FullPath.Substring(arg.FullPath.IndexOf("\\")+1,arg.FullPath.LastIndexOf("\\")); - var sim = key.Split("_")[0]; - var channel = int.Parse(key.Split("_")[1]); - try + MemoryStream ms = new MemoryStream(); + sr.CopyTo(ms); + if (filename.Contains("m3u8")) { - using (FileStream sr = new FileStream(arg.FullPath, FileMode.Open)) - { - context.Response.ContentType = m3u8Mime; - context.Response.StatusCode = (int)HttpStatusCode.OK; - context.Response.ContentLength64 = sr.Length; - sr.CopyTo(context.Response.OutputStream); - } + await context.HttpM3U8Async(ms.ToArray()); } - catch (Exception ex) + else if (filename.Contains("ts")) { - Logger.LogError(ex, $"{context.Request.Url}"); + await context.HttpTsAsync(ms.ToArray()); } - finally + else { - context.Response.OutputStream.Close(); - context.Response.Close(); + context.Http404(); } - }; - fileSystemWatcher.EnableRaisingEvents = true; // Begin watching. + } } - else + catch (Exception ex) { + Logger.LogError(ex, ex.Message); context.Http404(); - return; - } + } } else { - try + if (!File.Exists(filepath)) { - using (FileStream sr = new FileStream(filepath, FileMode.Open)) + if (filename.ToLower().Contains("m3u8")) { - if (filename.ToLower().Contains("m3u8")) + var directory = Path.Combine(Configuration.HlsRootDirectory, key); + if (!Directory.Exists(directory)) { - context.Response.ContentType = m3u8Mime; + Directory.CreateDirectory(directory); } - else - { - context.Response.ContentType = tsMime; + if (!hLSPathStorage.ExistFileSystemWatcher(directory)) { + var fileSystemWatcher = new FileSystemWatcher(); + fileSystemWatcher.Path = directory; + fileSystemWatcher.NotifyFilter = NotifyFilters.LastWrite; //NotifyFilters.CreateTime + fileSystemWatcher.Filter = "*.m3u8"; // Only watch text files. + fileSystemWatcher.Changed += async (sender, arg) => + { + if (context.Response.ContentLength64 != 0) return; + //wwwroot\1234_2\live.m3u8 + //var key = arg.FullPath.Replace(arg.Name, "").Substring(arg.FullPath.Replace(arg.Name, "").IndexOf("\\")).Replace("\\", ""); + var key = arg.FullPath.Substring(arg.FullPath.IndexOf("\\") + 1, (arg.FullPath.LastIndexOf("\\") - arg.FullPath.IndexOf("\\")) - 1); + var sim = key.Split("_")[0]; + var channel = int.Parse(key.Split("_")[1]); + try + { + using (FileStream sr = new FileStream(arg.FullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + hLSPathStorage.AddPath(arg.FullPath, key); + MemoryStream ms = new MemoryStream(); + sr.CopyTo(ms); + await context.HttpM3U8Async(ms.ToArray()); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"{context.Request.Url}"); + context.Http404(); + } + finally + { + hLSPathStorage.DeleteFileSystemWatcher(directory); + } + }; + fileSystemWatcher.EnableRaisingEvents = true; // Begin watching. + hLSPathStorage.AddFileSystemWatcher(directory, fileSystemWatcher); } - context.Response.StatusCode = (int)HttpStatusCode.OK; - context.Response.ContentLength64 = sr.Length; - await sr.CopyToAsync(context.Response.OutputStream); + } + else + { + context.Http404(); + return; } } - catch (Exception ex) - { - Logger.LogError(ex, $"{context.Request.Url}"); - context.Response.StatusCode = (int)HttpStatusCode.InternalServerError; - } - finally + else { - context.Response.OutputStream.Close(); - context.Response.Close(); + hLSPathStorage.AddPath(filepath, key); + using (FileStream sr = new FileStream(filepath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + MemoryStream ms = new MemoryStream(); + sr.CopyTo(ms); + if (filename.Contains("m3u8")) + { + await context.HttpM3U8Async(ms.ToArray()); + } + else if (filename.Contains("ts")) + { + await context.HttpTsAsync(ms.ToArray()); + } + else + { + context.Http404(); + } + } } + var jT1078HttpContext = new JT1078HttpContext(context, principal); + jT1078HttpContext.Sim = sim; + jT1078HttpContext.ChannelNo = int.Parse(channelNo); + jT1078HttpContext.RTPVideoType = RTPVideoType.Http_Hls; + HttpSessionManager.AddOrUpdateHlsSession(jT1078HttpContext); } - var jT1078HttpContext = new JT1078HttpContext(context, principal); - jT1078HttpContext.Sim = sim; - jT1078HttpContext.ChannelNo = int.Parse(channelNo); - jT1078HttpContext.RTPVideoType = RTPVideoType.Http_Hls; - HttpSessionManager.AddOrUpdate(jT1078HttpContext); } } } diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 7b149c0..452b4ee 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -48,6 +48,7 @@ </ItemGroup> <ItemGroup> + <ProjectReference Include="..\..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj" /> <ProjectReference Include="..\JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj" /> </ItemGroup> </Project> diff --git a/src/JT1078.Gateway/JT1078.Gateway.xml b/src/JT1078.Gateway/JT1078.Gateway.xml index 48f60c9..6bb4355 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.xml +++ b/src/JT1078.Gateway/JT1078.Gateway.xml @@ -53,6 +53,66 @@ 协调器Coordinator主机登录密码 </summary> </member> + <member name="M:JT1078.Gateway.Extensions.JT1078HttpContextExtensions.HttpM3U8Async(System.Net.HttpListenerContext,System.ReadOnlyMemory{System.Byte})"> + <summary> + 返回m3u8响应 + </summary> + <param name="context"></param> + <param name="buffer"></param> + <returns></returns> + </member> + <member name="M:JT1078.Gateway.Extensions.JT1078HttpContextExtensions.HttpTsAsync(System.Net.HttpListenerContext,System.ReadOnlyMemory{System.Byte})"> + <summary> + 返回ts响应数 + </summary> + <param name="context"></param> + <param name="buffer"></param> + <returns></returns> + </member> + <member name="T:JT1078.Gateway.HLSPathStorage"> + <summary> + hls路径是否存在处理,及文件监控处理 + </summary> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.AddPath(System.String,System.String)"> + <summary> + 添加路径 + </summary> + <param name="path"></param> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.ExsitPath(System.String)"> + <summary> + 判断路径是否存在 + </summary> + <param name="path"></param> + <returns></returns> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.RemoveAllPath(System.String)"> + <summary> + 移除所有路径 + </summary> + <returns></returns> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.ExistFileSystemWatcher(System.String)"> + <summary> + 是否存在文件监控 + </summary> + <param name="path"></param> + <returns></returns> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.AddFileSystemWatcher(System.String,System.IO.FileSystemWatcher)"> + <summary> + 添加文件监控 + </summary> + <param name="path"></param> + <param name="fileSystemWatcher"></param> + </member> + <member name="M:JT1078.Gateway.HLSPathStorage.DeleteFileSystemWatcher(System.String)"> + <summary> + 删除文件监控 + </summary> + <param name="path"></param> + </member> <member name="T:JT1078.Gateway.HLSRequestManager"> <summary> Hls请求管理 @@ -65,6 +125,11 @@ <param name="context"></param> <param name="principal"></param> </member> + <member name="T:JT1078.Gateway.Jobs.JT1078SessionClearJob"> + <summary> + 清理hls session + </summary> + </member> <member name="T:JT1078.Gateway.JT1078CoordinatorHttpClient"> <summary> 协调器客户端 diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index f9551c7..0a00831 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -4,10 +4,13 @@ using JT1078.Gateway.Impl; using JT1078.Gateway.Jobs; using JT1078.Gateway.Services; using JT1078.Gateway.Sessions; +using JT1078.Hls.Options; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; using System; +using System.IO; using System.Net.Http; using System.Runtime.CompilerServices; @@ -79,7 +82,15 @@ namespace JT1078.Gateway builder.JT1078Builder.Services.AddSingleton<JT1078SessionNoticeService>(); builder.JT1078Builder.Services.AddSingleton<JT1078SessionManager>(); builder.JT1078Builder.Services.AddHostedService<JT1078SessionNoticeJob>(); + return builder; } + public static IServiceCollection AddHlsGateway(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.AddSingleton<HLSRequestManager>(); + serviceDescriptors.AddSingleton<HLSPathStorage>(); + serviceDescriptors.AddHostedService<JT1078SessionClearJob>(); + return serviceDescriptors; + } } } \ No newline at end of file diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs new file mode 100644 index 0000000..16ece6d --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs @@ -0,0 +1,86 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Services; +using JT1078.Gateway.Sessions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; +using System.IO; +using System.Linq; +using System.Text; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; + + +namespace JT1078.Gateway.Jobs +{ + /// <summary> + /// 清理hls session + /// </summary> + public class JT1078SessionClearJob : BackgroundService + { + private readonly ILogger logger; + private readonly JT1078HttpSessionManager HttpSessionManager;//用户链接session + private readonly JT1078SessionManager SessionManager;//设备链接session + private readonly HLSPathStorage hLSPathStorage; + private readonly JT1078Configuration Configuration; + public JT1078SessionClearJob( + ILoggerFactory loggerFactory, + JT1078SessionManager SessionManager, + HLSPathStorage hLSPathStorage, + IOptions<JT1078Configuration> jT1078ConfigurationAccessor, + [AllowNull]JT1078HttpSessionManager jT1078HttpSessionManager=null) + { + logger = loggerFactory.CreateLogger<JT1078SessionClearJob>(); + HttpSessionManager = jT1078HttpSessionManager; + this.hLSPathStorage = hLSPathStorage; + this.SessionManager = SessionManager; + this.Configuration = jT1078ConfigurationAccessor.Value; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + await Task.Run(() => { + while (true) + { + try + { + var hasSessions = HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 60 && m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList();//所有http 的 hls短链接 + foreach (var item in hasSessions) + { + var key = $"{item.Sim}_{item.ChannelNo}"; + HttpSessionManager.TryRemove(item.SessionId);//超过120s未访问。 + //清楚所有hls文件 + string filepath = Path.Combine(Configuration.HlsRootDirectory, key); + if (Directory.Exists(filepath)) { + Directory.Delete(filepath,true); + } + hLSPathStorage.RemoveAllPath(key);//移除所有缓存 + + if (logger.IsEnabled(LogLevel.Debug)) { + logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清楚session"); + } + var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接 + var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接 + if (!hasTcpSession && !httpFlvSession) + { + //不存在websocket链接和http-flv链接时,主动断开设备链接以节省流量 + //移除tcpsession,断开设备链接 + if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(item.Sim.TrimStart('0')); + } + } + } + catch(Exception ex) + { + logger.LogError(ex, ex.Message); + } + Thread.Sleep(TimeSpan.FromSeconds(30));//30s 执行一次 + } + }, stoppingToken); + } + } +} diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs index 6575ef5..2f881d5 100644 --- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -29,15 +29,15 @@ namespace JT1078.Gateway.Sessions return Sessions.TryAdd(httpContext.SessionId, httpContext); } - public void AddOrUpdate(JT1078HttpContext httpContext) { - var session = Sessions.FirstOrDefault(m => m.Value.Sim == httpContext.Sim && m.Value.ChannelNo == httpContext.ChannelNo); - if (string.IsNullOrEmpty(session.Key)) + public void AddOrUpdateHlsSession(JT1078HttpContext httpContext) + { + //如果不存在就添加,如果存在则删除后添加(保持key和value中的sessionid一致) + var session = Sessions.FirstOrDefault(m => m.Value.Sim == httpContext.Sim && m.Value.ChannelNo == httpContext.ChannelNo && m.Value.RTPVideoType == RTPVideoType.Http_Hls); + if (!string.IsNullOrEmpty(session.Key)) { - Sessions.TryAdd(httpContext.SessionId, httpContext); - } - else { - Sessions.TryUpdate(session.Key, httpContext, session.Value); + Sessions.TryRemove(session.Key, out var _); } + Sessions.TryAdd(httpContext.SessionId, httpContext); } public async void TryRemove(string sessionId)