Bladeren bron

优化一下hls,可同时播放hls和flv视频

master
waterliu99 4 jaren geleden
bovenliggende
commit
5cee1a8d99
19 gewijzigde bestanden met toevoegingen van 513 en 137 verwijderingen
  1. +1
    -1
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
  2. +16
    -8
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  3. +22
    -21
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs
  4. +21
    -15
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs
  5. +16
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs
  6. +43
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs
  7. +2
    -2
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json
  8. +3
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json
  9. +2
    -1
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html
  10. +1
    -1
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html
  11. +12
    -0
      src/JT1078.Gateway.sln
  12. +36
    -0
      src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs
  13. +71
    -0
      src/JT1078.Gateway/HLSPathStorage.cs
  14. +97
    -81
      src/JT1078.Gateway/HLSRequestManager.cs
  15. +1
    -0
      src/JT1078.Gateway/JT1078.Gateway.csproj
  16. +65
    -0
      src/JT1078.Gateway/JT1078.Gateway.xml
  17. +11
    -0
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  18. +86
    -0
      src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs
  19. +7
    -7
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 1
- 1
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj Bestand weergeven

@@ -7,7 +7,6 @@
<ItemGroup>
<PackageReference Include="JT1078.Flv" Version="1.1.0" />
<PackageReference Include="JT1078.Hls" Version="1.1.0-preview1" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="5.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
@@ -16,6 +15,7 @@
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj" />
<ProjectReference Include="..\..\JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj" />
<ProjectReference Include="..\..\JT1078.Gateway\JT1078.Gateway.csproj" />
</ItemGroup>


+ 16
- 8
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs Bestand weergeven

@@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using NLog.Extensions.Logging;
using System;
using System.IO;
@@ -33,18 +34,21 @@ namespace JT1078.Gateway.TestNormalHosting
})
.ConfigureServices((hostContext, services) =>
{
services.AddMemoryCache();
services.AddScoped<FileSystemWatcher>();
services.AddSingleton<HLSRequestManager>();
services.AddMemoryCache();
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//flv视频解码器
services.AddSingleton<FlvEncoder>();
//hls视频解码器
services.AddSingleton<TSEncoder>();
services.AddSingleton(new M3U8Option
{
});
services.AddSingleton<M3U8FileManage>();
//添加hls依赖项
services.AddHlsGateway(hostContext.Configuration);
services.Configure<M3U8Option>(hostContext.Configuration.GetSection("M3U8Option"));
var m3u8Option = services.BuildServiceProvider().GetRequiredService<IOptions<M3U8Option>>().Value;
services.AddSingleton(m3u8Option);

//使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration)
.AddTcp()
@@ -55,7 +59,11 @@ namespace JT1078.Gateway.TestNormalHosting
.AddMsgConsumer();
//内存队列没有做分发,可以自己实现。
services.AddHostedService<JT1078FlvNormalMsgHostedService>();
//services.AddHostedService<JT1078HlsNormalMsgHostedService>();
services.AddHostedService<JT1078HlsNormalMsgHostedService>();

services.AddSingleton<MessageDispatchDataService>();
services.AddHostedService<MessageDispatchHostedService>();
});
//测试1:
//发送完整包


+ 22
- 21
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs Bestand weergeven

@@ -25,8 +25,10 @@ namespace JT1078.Gateway.TestNormalHosting.Services
private ILogger Logger;
private IMemoryCache memoryCache;
private const string ikey = "IKEY";
private MessageDispatchDataService messageDispatchDataService;

public JT1078FlvNormalMsgHostedService(
MessageDispatchDataService messageDispatchDataService,
IMemoryCache memoryCache,
ILoggerFactory loggerFactory,
FlvEncoder flvEncoder,
@@ -38,27 +40,26 @@ namespace JT1078.Gateway.TestNormalHosting.Services
HttpSessionManager = httpSessionManager;
FlvEncoder = flvEncoder;
this.memoryCache = memoryCache;
this.messageDispatchDataService = messageDispatchDataService;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
JT1078MsgConsumer.OnMessage((Message) =>
while (!stoppingToken.IsCancellationRequested)
{
JT1078Package package = JT1078Serializer.Deserialize(Message.Data);
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
Logger.LogDebug($"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
}
var data = await messageDispatchDataService.FlvChannel.Reader.ReadAsync();
try
{
var merge = JT1078Serializer.Merge(package);
if (merge == null) return;
string key = $"{package.GetKey()}_{ikey}";
if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
{
if (Logger.IsEnabled(LogLevel.Debug))
{
Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
}
string key = $"{data.GetKey()}_{ikey}";
if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
{
memoryCache.Set(key, merge);
memoryCache.Set(key, data);
}
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(package.SIM.TrimStart('0'), package.LogicChannelNumber);
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber);
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
if (firstHttpSessions.Count > 0)
{
@@ -74,7 +75,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services
}
catch (Exception ex)
{
Logger.LogError(ex, $"{package.SIM},{true},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
}
}
}
@@ -83,7 +84,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services
{
try
{
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(data, false);
foreach (var session in otherHttpSessions)
{
HttpSessionManager.SendAVData(session, flvVideoBuffer, false);
@@ -91,16 +92,16 @@ namespace JT1078.Gateway.TestNormalHosting.Services
}
catch (Exception ex)
{
Logger.LogError(ex, $"{package.SIM},{false},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
Logger.LogError(ex, $"{data.SIM},{false},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
}
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
}
});
return Task.CompletedTask;
}
await Task.CompletedTask;
}
}
}

+ 21
- 15
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs Bestand weergeven

@@ -3,6 +3,7 @@ using JT1078.Gateway.Sessions;
using JT1078.Hls;
using JT1078.Protocol;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Linq;
@@ -17,34 +18,39 @@ namespace JT1078.Gateway.TestNormalHosting.Services
private IJT1078MsgConsumer MsgConsumer;
private JT1078HttpSessionManager HttpSessionManager;
private M3U8FileManage M3U8FileManage;
private MessageDispatchDataService messageDispatchDataService;
private readonly ILogger logger;
public JT1078HlsNormalMsgHostedService(
ILoggerFactory loggerFactory,
M3U8FileManage M3U8FileManage,
JT1078HttpSessionManager httpSessionManager,
MessageDispatchDataService messageDispatchDataService,
IJT1078MsgConsumer msgConsumer)
{
logger = loggerFactory.CreateLogger<JT1078HlsNormalMsgHostedService>();
MsgConsumer = msgConsumer;
HttpSessionManager = httpSessionManager;
this.M3U8FileManage = M3U8FileManage;
this.messageDispatchDataService = messageDispatchDataService;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
protected async override Task ExecuteAsync(CancellationToken stoppingToken)
{
MsgConsumer.OnMessage((Message) =>
while (!stoppingToken.IsCancellationRequested)
{
JT1078Package package = JT1078Serializer.Deserialize(Message.Data);
var merge = JT1078.Protocol.JT1078Serializer.Merge(package);
if (merge != null)
var data = await messageDispatchDataService.HlsChannel.Reader.ReadAsync();
logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber},session:{System.Text.Json.JsonSerializer.Serialize(HttpSessionManager)}");
var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(data.SIM, data.LogicChannelNumber).Where(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList();
if (hasHttpSessionn.Count > 0)
{
var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(merge.SIM, merge.LogicChannelNumber);
if (hasHttpSessionn.Count>0)
{
M3U8FileManage.CreateTsData(merge);
}
else {
M3U8FileManage.Clear(merge.SIM, merge.LogicChannelNumber);
}
logger.LogDebug($"设备{data.SIM},{data.LogicChannelNumber}连上了");
M3U8FileManage.CreateTsData(data);
}
});
return Task.CompletedTask;
else
{
logger.LogDebug($"没有设备链接");
}
}
await Task.CompletedTask;
}
}
}

+ 16
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchDataService.cs Bestand weergeven

@@ -0,0 +1,16 @@
using JT1078.Protocol;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT1078.Gateway.TestNormalHosting.Services
{
public class MessageDispatchDataService
{
public Channel<JT1078Package> HlsChannel = Channel.CreateUnbounded<JT1078Package>();
public Channel<JT1078Package> FlvChannel = Channel.CreateUnbounded<JT1078Package>();
}
}

+ 43
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs Bestand weergeven

@@ -0,0 +1,43 @@
using JT1078.Gateway.Abstractions;
using JT1078.Protocol;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT1078.Gateway.TestNormalHosting.Services
{
/// <summary>
/// 消费分发服务。同时分发给hls和flv
/// </summary>
public class MessageDispatchHostedService : BackgroundService
{
private IJT1078MsgConsumer JT1078MsgConsumer;
private readonly MessageDispatchDataService messageDispatchDataService;

public MessageDispatchHostedService(IJT1078MsgConsumer JT1078MsgConsumer,
MessageDispatchDataService messageDispatchDataService) {
this.JT1078MsgConsumer = JT1078MsgConsumer;
this.messageDispatchDataService = messageDispatchDataService;
}

protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
JT1078MsgConsumer.OnMessage(async (Message) =>
{
JT1078Package package = JT1078Serializer.Deserialize(Message.Data);
var merge = JT1078.Protocol.JT1078Serializer.Merge(package);
if (merge != null)
{
await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge);
await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge);
}
});
return Task.CompletedTask;
}
}
}

+ 2
- 2
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json Bestand weergeven

@@ -16,9 +16,9 @@
"HttpPort": 15555
},
"M3U8Option": {
"TsFileCapacity": 10,
"TsFileCapacity": 5,
"TsFileMaxSecond": 10,
"M3U8FileName": "live.m3u8",
"HlsFileDirectory":"wwwroot/demo"
"HlsFileDirectory":"wwwroot"
}
}

+ 3
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/.vscode/settings.json Bestand weergeven

@@ -0,0 +1,3 @@
{
"liveServer.settings.port": 5501
}

+ 2
- 1
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/demo/index.html Bestand weergeven

@@ -8,7 +8,8 @@
<video autoplay muted id="video"></video>
<script>
var video = document.getElementById('video');
var videoSrc = 'demo.m3u8';
//var videoSrc = 'demo.m3u8';
var videoSrc = 'http://127.0.0.1:15555/live.m3u8?token=123456&sim=001901305037&channelNo=1';
//
// First check for native browser HLS support
//


+ 1
- 1
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html Bestand weergeven

@@ -15,7 +15,7 @@
var flvPlayer = flvjs.createPlayer({
type: 'flv',
isLive: true,
//url: "http://127.0.0.1:15555/?sim=1901305037&channel=3&token=123456"
url: "http://127.0.0.1:15555/?sim=1901305037&channel=1&token=123456"
//url: "ws://127.0.0.1:15555/?sim=11234567810&channel=1&token=123456"
});
flvPlayer.attachMediaElement(player);


+ 12
- 0
src/JT1078.Gateway.sln Bestand weergeven

@@ -17,6 +17,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Ga
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Hls", "..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj", "{EBA7BD86-7C58-470D-BC1A-04729C140C34}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Protocol", "..\..\JT1078\src\JT1078.Protocol\JT1078.Protocol.csproj", "{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -47,6 +51,14 @@ Global
{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.Build.0 = Release|Any CPU
{EBA7BD86-7C58-470D-BC1A-04729C140C34}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{EBA7BD86-7C58-470D-BC1A-04729C140C34}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EBA7BD86-7C58-470D-BC1A-04729C140C34}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EBA7BD86-7C58-470D-BC1A-04729C140C34}.Release|Any CPU.Build.0 = Release|Any CPU
{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{759FC4E4-4924-4BFC-8811-01FC6DF52FF7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE


+ 36
- 0
src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs Bestand weergeven

@@ -62,6 +62,42 @@ namespace JT1078.Gateway.Extensions
context.Response.OutputStream.Close();
context.Response.Close();
}
/// <summary>
/// 返回m3u8响应
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
public static async ValueTask HttpM3U8Async(this HttpListenerContext context, ReadOnlyMemory<byte> buffer)
{
context.Response.AddHeader("Access-Control-Allow-Headers", "*");
context.Response.AppendHeader("Access-Control-Allow-Origin", "*");
context.Response.ContentType = "application/x-mpegURL";
context.Response.StatusCode = (int)HttpStatusCode.OK;
context.Response.ContentLength64 = buffer.Length;
context.Response.KeepAlive = false;
await context.Response.OutputStream.WriteAsync(buffer);
context.Response.OutputStream.Close();
context.Response.Close();
}
/// <summary>
/// 返回ts响应数
/// </summary>
/// <param name="context"></param>
/// <param name="buffer"></param>
/// <returns></returns>
public static async ValueTask HttpTsAsync(this HttpListenerContext context, ReadOnlyMemory<byte> buffer)
{
context.Response.AddHeader("Access-Control-Allow-Headers", "*");
context.Response.AppendHeader("Access-Control-Allow-Origin", "*");
context.Response.ContentType = "video/MP2T";
context.Response.StatusCode = (int)HttpStatusCode.OK;
context.Response.ContentLength64 = buffer.Length;
context.Response.KeepAlive = false;
await context.Response.OutputStream.WriteAsync(buffer);
context.Response.OutputStream.Close();
context.Response.Close();
}

public static async ValueTask HttpSendFirstChunked(this JT1078HttpContext context, ReadOnlyMemory<byte> buffer)
{


+ 71
- 0
src/JT1078.Gateway/HLSPathStorage.cs Bestand weergeven

@@ -0,0 +1,71 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;

namespace JT1078.Gateway
{
/// <summary>
/// hls路径是否存在处理,及文件监控处理
/// </summary>
public class HLSPathStorage
{
private readonly ConcurrentDictionary<string, string> path_sim_channelDic = new ConcurrentDictionary<string, string>();
private readonly ConcurrentDictionary<string, FileSystemWatcher> pathFileSystemWaterDic = new ConcurrentDictionary<string, FileSystemWatcher>();
/// <summary>
/// 添加路径
/// </summary>
/// <param name="path"></param>
public void AddPath(string path,string key) {
path_sim_channelDic.TryAdd(path, key);
}
/// <summary>
/// 判断路径是否存在
/// </summary>
/// <param name="path"></param>
/// <returns></returns>
public bool ExsitPath(string path) {
return path_sim_channelDic.TryGetValue(path, out var _);
}
/// <summary>
/// 移除所有路径
/// </summary>
/// <returns></returns>
public bool RemoveAllPath(string key) {
var flag = false;
var paths = path_sim_channelDic.Where(m => m.Value == key).ToList();
foreach (var item in paths)
{
flag = path_sim_channelDic.TryRemove(item.Key,out var _);
}
return flag;
}

/// <summary>
/// 是否存在文件监控
/// </summary>
/// <param name="path"></param>
/// <returns></returns>
public bool ExistFileSystemWatcher(string path) {
return pathFileSystemWaterDic.TryGetValue(path, out var _);
}
/// <summary>
/// 添加文件监控
/// </summary>
/// <param name="path"></param>
/// <param name="fileSystemWatcher"></param>
public void AddFileSystemWatcher(string path, FileSystemWatcher fileSystemWatcher) {
pathFileSystemWaterDic.TryAdd(path, fileSystemWatcher);
}
/// <summary>
/// 删除文件监控
/// </summary>
/// <param name="path"></param>
public bool DeleteFileSystemWatcher(string path)
{
return pathFileSystemWaterDic.TryRemove(path, out var _);
}
}
}

+ 97
- 81
src/JT1078.Gateway/HLSRequestManager.cs Bestand weergeven

@@ -3,9 +3,11 @@ using JT1078.Gateway.Extensions;
using JT1078.Gateway.Metadata;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Caching.Memory;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.IO;
using System.Linq;
@@ -22,51 +24,34 @@ namespace JT1078.Gateway
/// </summary>
public class HLSRequestManager
{
private const string m3u8Mime = "application/x-mpegURL";
private const string tsMime = "video/MP2T";
private readonly JT1078Configuration Configuration;
private readonly JT1078HttpSessionManager HttpSessionManager;
private readonly JT1078SessionManager SessionManager;
private readonly HLSPathStorage hLSPathStorage;
private readonly ILogger Logger;
private IMemoryCache memoryCache;
private FileSystemWatcher fileSystemWatcher;

private readonly IServiceProvider serviceProvider;
//private FileSystemWatcher fileSystemWatcher;

public HLSRequestManager(
IMemoryCache memoryCache,
IOptions<JT1078Configuration> jT1078ConfigurationAccessor,
JT1078HttpSessionManager httpSessionManager,
JT1078SessionManager sessionManager,
FileSystemWatcher fileSystemWatcher,
HLSPathStorage hLSPathStorage,
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory)
{
this.memoryCache = memoryCache;
this.fileSystemWatcher = fileSystemWatcher;
this.serviceProvider = serviceProvider;
HttpSessionManager = httpSessionManager;
SessionManager = sessionManager;
this.hLSPathStorage = hLSPathStorage;
Configuration = jT1078ConfigurationAccessor.Value;
Logger = loggerFactory.CreateLogger<HLSRequestManager>();

Task.Run(()=> {
while (true)
{
var expireds= HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 20).ToList();
foreach (var item in expireds)
{
//移除httpsession
HttpSessionManager.TryRemoveBySim(item.Sim);
//移除tcpsession
SessionManager.RemoveByTerminalPhoneNo(item.Sim);
}
Thread.Sleep(TimeSpan.FromSeconds(10));
}
});
}
/// <summary>
/// 处理hls实时视频请求
/// </summary>
/// <param name="context"></param>
/// <param name="principal"></param>
public async void HandleHlsRequest(HttpListenerContext context, IPrincipal principal) {
public async void HandleHlsRequest(HttpListenerContext context, IPrincipal principal)
{
if (context.Request.QueryString.Count < 2)
{
context.Http404();
@@ -77,85 +62,116 @@ namespace JT1078.Gateway
string key = $"{sim}_{channelNo}";
string filename = Path.GetFileName(context.Request.Url.AbsolutePath.ToString());
string filepath = Path.Combine(Configuration.HlsRootDirectory, key, filename);
if (!File.Exists(filepath))

if (hLSPathStorage.ExsitPath(filepath))
{
if (filename.ToLower().Contains("m3u8"))
try
{
fileSystemWatcher = new FileSystemWatcher();
fileSystemWatcher.Path = Path.Combine(Configuration.HlsRootDirectory, key);
fileSystemWatcher.NotifyFilter = NotifyFilters.LastWrite; //NotifyFilters.CreateTime
fileSystemWatcher.Filter = "*.m3u8"; // Only watch text files.
fileSystemWatcher.Changed += (sender, arg) =>
using (FileStream sr = new FileStream(filepath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
if (context.Response.ContentLength64 != 0) return;
//wwwroot\1234_2\live.m3u8
//var key = arg.FullPath.Replace(arg.Name, "").Substring(arg.FullPath.Replace(arg.Name, "").IndexOf("\\")).Replace("\\", "");
var key = arg.FullPath.Substring(arg.FullPath.IndexOf("\\")+1,arg.FullPath.LastIndexOf("\\"));
var sim = key.Split("_")[0];
var channel = int.Parse(key.Split("_")[1]);
try
MemoryStream ms = new MemoryStream();
sr.CopyTo(ms);
if (filename.Contains("m3u8"))
{
using (FileStream sr = new FileStream(arg.FullPath, FileMode.Open))
{
context.Response.ContentType = m3u8Mime;
context.Response.StatusCode = (int)HttpStatusCode.OK;
context.Response.ContentLength64 = sr.Length;
sr.CopyTo(context.Response.OutputStream);
}
await context.HttpM3U8Async(ms.ToArray());
}
catch (Exception ex)
else if (filename.Contains("ts"))
{
Logger.LogError(ex, $"{context.Request.Url}");
await context.HttpTsAsync(ms.ToArray());
}
finally
else
{
context.Response.OutputStream.Close();
context.Response.Close();
context.Http404();
}
};
fileSystemWatcher.EnableRaisingEvents = true; // Begin watching.
}
}
else
catch (Exception ex)
{
Logger.LogError(ex, ex.Message);
context.Http404();
return;
}
}
}
else
{
try
if (!File.Exists(filepath))
{
using (FileStream sr = new FileStream(filepath, FileMode.Open))
if (filename.ToLower().Contains("m3u8"))
{
if (filename.ToLower().Contains("m3u8"))
var directory = Path.Combine(Configuration.HlsRootDirectory, key);
if (!Directory.Exists(directory))
{
context.Response.ContentType = m3u8Mime;
Directory.CreateDirectory(directory);
}
else
{
context.Response.ContentType = tsMime;
if (!hLSPathStorage.ExistFileSystemWatcher(directory)) {
var fileSystemWatcher = new FileSystemWatcher();
fileSystemWatcher.Path = directory;
fileSystemWatcher.NotifyFilter = NotifyFilters.LastWrite; //NotifyFilters.CreateTime
fileSystemWatcher.Filter = "*.m3u8"; // Only watch text files.
fileSystemWatcher.Changed += async (sender, arg) =>
{
if (context.Response.ContentLength64 != 0) return;
//wwwroot\1234_2\live.m3u8
//var key = arg.FullPath.Replace(arg.Name, "").Substring(arg.FullPath.Replace(arg.Name, "").IndexOf("\\")).Replace("\\", "");
var key = arg.FullPath.Substring(arg.FullPath.IndexOf("\\") + 1, (arg.FullPath.LastIndexOf("\\") - arg.FullPath.IndexOf("\\")) - 1);
var sim = key.Split("_")[0];
var channel = int.Parse(key.Split("_")[1]);
try
{
using (FileStream sr = new FileStream(arg.FullPath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
hLSPathStorage.AddPath(arg.FullPath, key);
MemoryStream ms = new MemoryStream();
sr.CopyTo(ms);
await context.HttpM3U8Async(ms.ToArray());
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"{context.Request.Url}");
context.Http404();
}
finally
{
hLSPathStorage.DeleteFileSystemWatcher(directory);
}
};
fileSystemWatcher.EnableRaisingEvents = true; // Begin watching.
hLSPathStorage.AddFileSystemWatcher(directory, fileSystemWatcher);
}
context.Response.StatusCode = (int)HttpStatusCode.OK;
context.Response.ContentLength64 = sr.Length;
await sr.CopyToAsync(context.Response.OutputStream);
}
else
{
context.Http404();
return;
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"{context.Request.Url}");
context.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
}
finally
else
{
context.Response.OutputStream.Close();
context.Response.Close();
hLSPathStorage.AddPath(filepath, key);
using (FileStream sr = new FileStream(filepath, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
MemoryStream ms = new MemoryStream();
sr.CopyTo(ms);
if (filename.Contains("m3u8"))
{
await context.HttpM3U8Async(ms.ToArray());
}
else if (filename.Contains("ts"))
{
await context.HttpTsAsync(ms.ToArray());
}
else
{
context.Http404();
}
}
}
var jT1078HttpContext = new JT1078HttpContext(context, principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = int.Parse(channelNo);
jT1078HttpContext.RTPVideoType = RTPVideoType.Http_Hls;
HttpSessionManager.AddOrUpdateHlsSession(jT1078HttpContext);
}
var jT1078HttpContext = new JT1078HttpContext(context, principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = int.Parse(channelNo);
jT1078HttpContext.RTPVideoType = RTPVideoType.Http_Hls;
HttpSessionManager.AddOrUpdate(jT1078HttpContext);
}
}
}

+ 1
- 0
src/JT1078.Gateway/JT1078.Gateway.csproj Bestand weergeven

@@ -48,6 +48,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\JT1078\src\JT1078.Hls\JT1078.Hls.csproj" />
<ProjectReference Include="..\JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj" />
</ItemGroup>
</Project>

+ 65
- 0
src/JT1078.Gateway/JT1078.Gateway.xml Bestand weergeven

@@ -53,6 +53,66 @@
协调器Coordinator主机登录密码
</summary>
</member>
<member name="M:JT1078.Gateway.Extensions.JT1078HttpContextExtensions.HttpM3U8Async(System.Net.HttpListenerContext,System.ReadOnlyMemory{System.Byte})">
<summary>
返回m3u8响应
</summary>
<param name="context"></param>
<param name="buffer"></param>
<returns></returns>
</member>
<member name="M:JT1078.Gateway.Extensions.JT1078HttpContextExtensions.HttpTsAsync(System.Net.HttpListenerContext,System.ReadOnlyMemory{System.Byte})">
<summary>
返回ts响应数
</summary>
<param name="context"></param>
<param name="buffer"></param>
<returns></returns>
</member>
<member name="T:JT1078.Gateway.HLSPathStorage">
<summary>
hls路径是否存在处理,及文件监控处理
</summary>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.AddPath(System.String,System.String)">
<summary>
添加路径
</summary>
<param name="path"></param>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.ExsitPath(System.String)">
<summary>
判断路径是否存在
</summary>
<param name="path"></param>
<returns></returns>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.RemoveAllPath(System.String)">
<summary>
移除所有路径
</summary>
<returns></returns>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.ExistFileSystemWatcher(System.String)">
<summary>
是否存在文件监控
</summary>
<param name="path"></param>
<returns></returns>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.AddFileSystemWatcher(System.String,System.IO.FileSystemWatcher)">
<summary>
添加文件监控
</summary>
<param name="path"></param>
<param name="fileSystemWatcher"></param>
</member>
<member name="M:JT1078.Gateway.HLSPathStorage.DeleteFileSystemWatcher(System.String)">
<summary>
删除文件监控
</summary>
<param name="path"></param>
</member>
<member name="T:JT1078.Gateway.HLSRequestManager">
<summary>
Hls请求管理
@@ -65,6 +125,11 @@
<param name="context"></param>
<param name="principal"></param>
</member>
<member name="T:JT1078.Gateway.Jobs.JT1078SessionClearJob">
<summary>
清理hls session
</summary>
</member>
<member name="T:JT1078.Gateway.JT1078CoordinatorHttpClient">
<summary>
协调器客户端


+ 11
- 0
src/JT1078.Gateway/JT1078GatewayExtensions.cs Bestand weergeven

@@ -4,10 +4,13 @@ using JT1078.Gateway.Impl;
using JT1078.Gateway.Jobs;
using JT1078.Gateway.Services;
using JT1078.Gateway.Sessions;
using JT1078.Hls.Options;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using System;
using System.IO;
using System.Net.Http;
using System.Runtime.CompilerServices;

@@ -79,7 +82,15 @@ namespace JT1078.Gateway
builder.JT1078Builder.Services.AddSingleton<JT1078SessionNoticeService>();
builder.JT1078Builder.Services.AddSingleton<JT1078SessionManager>();
builder.JT1078Builder.Services.AddHostedService<JT1078SessionNoticeJob>();

return builder;
}
public static IServiceCollection AddHlsGateway(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.AddSingleton<HLSRequestManager>();
serviceDescriptors.AddSingleton<HLSPathStorage>();
serviceDescriptors.AddHostedService<JT1078SessionClearJob>();
return serviceDescriptors;
}
}
}

+ 86
- 0
src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs Bestand weergeven

@@ -0,0 +1,86 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Services;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.IO;
using System.Linq;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;


namespace JT1078.Gateway.Jobs
{
/// <summary>
/// 清理hls session
/// </summary>
public class JT1078SessionClearJob : BackgroundService
{
private readonly ILogger logger;
private readonly JT1078HttpSessionManager HttpSessionManager;//用户链接session
private readonly JT1078SessionManager SessionManager;//设备链接session
private readonly HLSPathStorage hLSPathStorage;
private readonly JT1078Configuration Configuration;
public JT1078SessionClearJob(
ILoggerFactory loggerFactory,
JT1078SessionManager SessionManager,
HLSPathStorage hLSPathStorage,
IOptions<JT1078Configuration> jT1078ConfigurationAccessor,
[AllowNull]JT1078HttpSessionManager jT1078HttpSessionManager=null)
{
logger = loggerFactory.CreateLogger<JT1078SessionClearJob>();
HttpSessionManager = jT1078HttpSessionManager;
this.hLSPathStorage = hLSPathStorage;
this.SessionManager = SessionManager;
this.Configuration = jT1078ConfigurationAccessor.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Run(() => {
while (true)
{
try
{
var hasSessions = HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 60 && m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList();//所有http 的 hls短链接
foreach (var item in hasSessions)
{
var key = $"{item.Sim}_{item.ChannelNo}";
HttpSessionManager.TryRemove(item.SessionId);//超过120s未访问。
//清楚所有hls文件
string filepath = Path.Combine(Configuration.HlsRootDirectory, key);
if (Directory.Exists(filepath)) {
Directory.Delete(filepath,true);
}
hLSPathStorage.RemoveAllPath(key);//移除所有缓存

if (logger.IsEnabled(LogLevel.Debug)) {
logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清楚session");
}
var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接
var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接
if (!hasTcpSession && !httpFlvSession)
{
//不存在websocket链接和http-flv链接时,主动断开设备链接以节省流量
//移除tcpsession,断开设备链接
if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(item.Sim.TrimStart('0'));
}
}
}
catch(Exception ex)
{
logger.LogError(ex, ex.Message);
}
Thread.Sleep(TimeSpan.FromSeconds(30));//30s 执行一次
}
}, stoppingToken);
}
}
}

+ 7
- 7
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs Bestand weergeven

@@ -29,15 +29,15 @@ namespace JT1078.Gateway.Sessions
return Sessions.TryAdd(httpContext.SessionId, httpContext);
}

public void AddOrUpdate(JT1078HttpContext httpContext) {
var session = Sessions.FirstOrDefault(m => m.Value.Sim == httpContext.Sim && m.Value.ChannelNo == httpContext.ChannelNo);
if (string.IsNullOrEmpty(session.Key))
public void AddOrUpdateHlsSession(JT1078HttpContext httpContext)
{
//如果不存在就添加,如果存在则删除后添加(保持key和value中的sessionid一致)
var session = Sessions.FirstOrDefault(m => m.Value.Sim == httpContext.Sim && m.Value.ChannelNo == httpContext.ChannelNo && m.Value.RTPVideoType == RTPVideoType.Http_Hls);
if (!string.IsNullOrEmpty(session.Key))
{
Sessions.TryAdd(httpContext.SessionId, httpContext);
}
else {
Sessions.TryUpdate(session.Key, httpContext, session.Value);
Sessions.TryRemove(session.Key, out var _);
}
Sessions.TryAdd(httpContext.SessionId, httpContext);
}

public async void TryRemove(string sessionId)


Laden…
Annuleren
Opslaan