diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index cd05f4d..2da8e4e 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -39,6 +39,6 @@ - + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj index 1cce6a0..3cba514 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj @@ -8,7 +8,6 @@ - all diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index 8dff48a..f799921 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -6,12 +6,11 @@ - - + - + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs new file mode 100644 index 0000000..b0461b1 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs @@ -0,0 +1,100 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Sessions; +using JT1078.Flv; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Linq; +using Microsoft.Extensions.Logging; +using JT1078.Flv.Extensions; +using Microsoft.Extensions.Caching.Memory; +using JT1078.Protocol; +using System.Text.Json; +using System.Text.Json.Serialization; +using JT1078.FMp4; + +namespace JT1078.Gateway.TestNormalHosting.Services +{ + public class JT1078FMp4NormalMsgHostedService : BackgroundService + { + private IJT1078MsgConsumer JT1078MsgConsumer; + private JT1078HttpSessionManager HttpSessionManager; + private FMp4Encoder FM4Encoder; + private ILogger Logger; + private IMemoryCache memoryCache; + private const string ikey = "IKEY"; + private MessageDispatchDataService messageDispatchDataService; + + public JT1078FMp4NormalMsgHostedService( + MessageDispatchDataService messageDispatchDataService, + IMemoryCache memoryCache, + ILoggerFactory loggerFactory, + FMp4Encoder fM4Encoder, + JT1078HttpSessionManager httpSessionManager, + IJT1078MsgConsumer msgConsumer) + { + Logger = loggerFactory.CreateLogger(); + JT1078MsgConsumer = msgConsumer; + HttpSessionManager = httpSessionManager; + FM4Encoder = fM4Encoder; + this.memoryCache = memoryCache; + this.messageDispatchDataService = messageDispatchDataService; + } + protected async override Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + var data = await messageDispatchDataService.FlvChannel.Reader.ReadAsync(); + try + { + if (Logger.IsEnabled(LogLevel.Debug)) + { + Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); + Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + string key = $"{data.GetKey()}_{ikey}"; + if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) + { + memoryCache.Set(key, data); + } + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); + if (firstHttpSessions.Count > 0) + { + if (memoryCache.TryGetValue(key, out JT1078Package idata)) + { + try + { + + } + catch (Exception ex) + { + Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + } + } + var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); + if (otherHttpSessions.Count > 0) + { + try + { + + } + catch (Exception ex) + { + Logger.LogError(ex, $"{data.SIM},{false},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + } + await Task.CompletedTask; + } + } +} diff --git a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs index 63bb046..5ca5564 100644 --- a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs +++ b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs @@ -149,5 +149,30 @@ namespace JT1078.Gateway.Extensions { await context.WebSocketContext.WebSocket.SendAsync(Hello, WebSocketMessageType.Text, true, CancellationToken.None); } + + /// + /// + /// + /// + /// + /// + public static bool TryGetAVInfo(this HttpListenerContext context,out JT1078AVInfo jT1078AVInfo) + { + if (context.Request.QueryString.Count < 2) + { + jT1078AVInfo = default; + return false; + } + string sim = context.Request.QueryString.Get("sim"); + string channel = context.Request.QueryString.Get("channel"); + if (string.IsNullOrEmpty(sim) || string.IsNullOrEmpty(channel)) + { + jT1078AVInfo = default; + return false; + } + int.TryParse(channel, out int channelNo); + jT1078AVInfo = new JT1078AVInfo(sim, channelNo); + return true; + } } } diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 31d2d3a..6930dbc 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -40,8 +40,10 @@ + + - + diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs index 30e7b9d..167fe25 100644 --- a/src/JT1078.Gateway/JT1078HttpServer.cs +++ b/src/JT1078.Gateway/JT1078HttpServer.cs @@ -16,10 +16,13 @@ using System.Text; using System.Threading; using System.Threading.Tasks; using JT1078.Gateway.Extensions; -using Microsoft.Extensions.Caching.Memory; +using JT1078.Gateway.Services; namespace JT1078.Gateway { + /// + /// http服务器 + /// public class JT1078HttpServer : IHostedService { private readonly ILogger Logger; @@ -31,9 +34,17 @@ namespace JT1078.Gateway private HttpListener listener; private JT1078HttpSessionManager SessionManager; - private readonly HLSRequestManager hLSRequestManager; - private FileSystemWatcher watcher; + private HLSRequestManager hLSRequestManager; + + /// + /// + /// + /// + /// + /// + /// + /// public JT1078HttpServer( IOptions jT1078ConfigurationAccessor, IJT1078Authorization authorization, @@ -48,6 +59,11 @@ namespace JT1078.Gateway this.hLSRequestManager = hLSRequestManager; } + /// + /// + /// + /// + /// public Task StartAsync(CancellationToken cancellationToken) { if (!HttpListener.IsSupported) @@ -65,21 +81,47 @@ namespace JT1078.Gateway catch (System.Net.HttpListenerException ex) { Logger.LogWarning(ex, $"{ex.Message}:使用cmd命令[netsh http add urlacl url=http://*:{Configuration.HttpPort}/ user=Everyone]"); + return Task.CompletedTask; } Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}."); - Task.Factory.StartNew(async() => + Task.Factory.StartNew(async () => { while (listener.IsListening) { var context = await listener.GetContextAsync(); try { - await Task.Run(async () => + await Task.Run(async () => { - IPrincipal principal=null; - if (context.Request.RawUrl.Contains(".ts")||authorization.Authorization(context, out principal)) + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}-{context.Request.RawUrl}"); + } + if (context.Request.RawUrl.StartsWith("/favicon.ico")) + { + context.Http404(); + return; + } + if (context.TryGetAVInfo(out JT1078AVInfo jT1078AVInfo)) + { + await context.Http400(); + return; + } + if (context.Request.RawUrl.Contains(".m3u8")) + { + ProcessM3u8(context, jT1078AVInfo); + } + else if (context.Request.RawUrl.Contains(".ts")) + { + ProcessTs(context, jT1078AVInfo); + } + else if (context.Request.RawUrl.Contains(".flv")) + { + ProcessFlv(context, jT1078AVInfo); + } + else if (context.Request.RawUrl.Contains(".mp4")) { - await ProcessRequestAsync(context, principal); + ProcessFMp4(context, jT1078AVInfo); } else { @@ -90,103 +132,125 @@ namespace JT1078.Gateway catch (Exception ex) { await context.Http500(); - Logger.LogError(ex, ex.StackTrace); + Logger.LogError(ex, $"[Http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}-{context.Request.RawUrl}-{ex.StackTrace}"); } } }, cancellationToken); return Task.CompletedTask; } - private async ValueTask ProcessRequestAsync(HttpListenerContext context, IPrincipal principal) + private void ProcessM3u8(HttpListenerContext context, JT1078AVInfo jT1078AVInfo) { - if(context.Request.RawUrl.StartsWith("/favicon.ico")) - { - context.Http404(); - return; - } - if (context.Request.RawUrl.Contains(".m3u8") || context.Request.RawUrl.Contains(".ts")) + if (authorization.Authorization(context, out IPrincipal principal)) { hLSRequestManager.HandleHlsRequest(context, principal); - return; } - if (Logger.IsEnabled(LogLevel.Trace)) - { - Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}"); - } - if (Logger.IsEnabled(LogLevel.Trace)) + } + + private void ProcessTs(HttpListenerContext context, JT1078AVInfo jT1078AVInfo) + { + //ts 无需验证 + hLSRequestManager.HandleHlsRequest(context, default); + } + + private async void ProcessFlv(HttpListenerContext context, JT1078AVInfo jT1078AVInfo) + { + if (authorization.Authorization(context, out IPrincipal principal)) { - Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}"); + if (context.Request.IsWebSocketRequest) + { + await ProccessWebSocket(context, principal, jT1078AVInfo, RTPVideoType.Ws_Flv); + } + else + { + ProccessHttpKeepLive(context, principal, jT1078AVInfo, RTPVideoType.Http_Flv); + } } - string sim = context.Request.QueryString.Get("sim"); - string channel = context.Request.QueryString.Get("channel"); - if(string.IsNullOrEmpty(sim) || string.IsNullOrEmpty(channel)) + } + + private async void ProcessFMp4(HttpListenerContext context, JT1078AVInfo jT1078AVInfo) + { + if (authorization.Authorization(context, out IPrincipal principal)) { - await context.Http400(); - return; + if (context.Request.IsWebSocketRequest) + { + await ProccessWebSocket(context, principal, jT1078AVInfo, RTPVideoType.Ws_FMp4); + } + else + { + ProccessHttpKeepLive(context, principal, jT1078AVInfo, RTPVideoType.Http_FMp4); + } } - int.TryParse(channel, out int channelNo); - if (context.Request.IsWebSocketRequest) + } + + private async ValueTask ProccessWebSocket(HttpListenerContext context, IPrincipal principal, JT1078AVInfo jT1078AVInfo, RTPVideoType videoType) + { + HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null, keepAliveInterval: TimeSpan.FromSeconds(5)); + var jT1078HttpContext = new JT1078HttpContext(context, wsContext, principal); + jT1078HttpContext.Sim = jT1078AVInfo.Sim; + jT1078HttpContext.ChannelNo = jT1078AVInfo.ChannelNo; + jT1078HttpContext.RTPVideoType = videoType; + SessionManager.TryAdd(jT1078HttpContext); + //这个发送出去,flv.js就报错了 + //await jT1078HttpContext.WebSocketSendHelloAsync(); + await Task.Factory.StartNew(async (state) => { - HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null, keepAliveInterval:TimeSpan.FromSeconds(5)); - var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal); - jT1078HttpContext.Sim = sim; - jT1078HttpContext.ChannelNo = channelNo; - jT1078HttpContext.RTPVideoType = RTPVideoType.Ws_Flv; - SessionManager.TryAdd(jT1078HttpContext); - //这个发送出去,flv.js就报错了 - //await jT1078HttpContext.WebSocketSendHelloAsync(); - await Task.Factory.StartNew(async(state) => + //https://www.bejson.com/httputil/websocket/ + //ws://localhost:15555?token=22&sim=1221&channel=1 + var websocketContext = state as JT1078HttpContext; + while (websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open || + websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting) { - //https://www.bejson.com/httputil/websocket/ - //ws://localhost:15555?token=22&sim=1221&channel=1 - var websocketContext = state as JT1078HttpContext; - while (websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open || - websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting) + var buffer = ArrayPool.Shared.Rent(256); + try { - var buffer = ArrayPool.Shared.Rent(256); - try + //客户端主动断开需要有个线程去接收通知,不然会客户端会卡死直到超时 + WebSocketReceiveResult receiveResult = await websocketContext.WebSocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None); + if (receiveResult.EndOfMessage) { - //客户端主动断开需要有个线程去接收通知,不然会客户端会卡死直到超时 - WebSocketReceiveResult receiveResult = await websocketContext.WebSocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None); - if (receiveResult.EndOfMessage) + if (receiveResult.Count > 0) { - if (receiveResult.Count > 0) + var data = buffer.AsSpan().Slice(0, receiveResult.Count).ToArray(); + if (Logger.IsEnabled(LogLevel.Trace)) { - var data = buffer.AsSpan().Slice(0, receiveResult.Count).ToArray(); - if (Logger.IsEnabled(LogLevel.Trace)) - { - Logger.LogTrace($"[ws receive]:{Encoding.UTF8.GetString(data)}"); - } - await websocketContext.WebSocketSendTextAsync(data); + Logger.LogTrace($"[ws receive]:{Encoding.UTF8.GetString(data)}"); } + await websocketContext.WebSocketSendTextAsync(data); } } - finally - { - ArrayPool.Shared.Return(buffer); - } } - if (Logger.IsEnabled(LogLevel.Information)) + finally { - Logger.LogInformation($"[ws close]:{websocketContext.SessionId}-{websocketContext.Sim}-{websocketContext.ChannelNo}-{websocketContext.StartTime:yyyyMMddhhmmss}"); + ArrayPool.Shared.Return(buffer); } - SessionManager.TryRemove(websocketContext.SessionId); - }, jT1078HttpContext); - } - else - { - var jT1078HttpContext = new JT1078HttpContext(context,principal); - jT1078HttpContext.Sim = sim; - jT1078HttpContext.RTPVideoType = RTPVideoType.Http_Flv; - jT1078HttpContext.ChannelNo = channelNo; - SessionManager.TryAdd(jT1078HttpContext); - } + } + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[ws close]:{websocketContext.SessionId}-{websocketContext.Sim}-{websocketContext.ChannelNo}-{websocketContext.StartTime:yyyyMMddhhmmss}-{websocketContext.Context.Request.RawUrl}"); + } + SessionManager.TryRemove(websocketContext.SessionId); + }, jT1078HttpContext); + } + + private void ProccessHttpKeepLive(HttpListenerContext context, IPrincipal principal, JT1078AVInfo jT1078AVInfo, RTPVideoType videoType) + { + var jT1078HttpContext = new JT1078HttpContext(context, principal); + jT1078HttpContext.RTPVideoType = videoType; + jT1078HttpContext.Sim = jT1078AVInfo.Sim; + jT1078HttpContext.ChannelNo = jT1078AVInfo.ChannelNo; + SessionManager.TryAdd(jT1078HttpContext); } + /// + /// + /// + /// + /// public Task StopAsync(CancellationToken cancellationToken) { try { + Logger.LogInformation($"JT1078 Http Server stop at {IPAddress.Any}:{Configuration.HttpPort}."); SessionManager.TryRemoveAll(); listener.Stop(); } @@ -196,7 +260,7 @@ namespace JT1078.Gateway } catch (Exception ex) { - + Logger.LogError(ex, $"JT1078 Http Server error at {IPAddress.Any}:{Configuration.HttpPort}."); } return Task.CompletedTask; } diff --git a/src/JT1078.Gateway/Metadata/JT1078AVInfo.cs b/src/JT1078.Gateway/Metadata/JT1078AVInfo.cs new file mode 100644 index 0000000..c507bec --- /dev/null +++ b/src/JT1078.Gateway/Metadata/JT1078AVInfo.cs @@ -0,0 +1,39 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Metadata +{ + /// + /// 音视频信息 + /// + public struct JT1078AVInfo + { + /// + /// + /// + /// + /// + public JT1078AVInfo(string sim, int channelNo) + { + Sim = sim; + ChannelNo = channelNo; + } + /// + /// sim + /// + public string Sim { get; set; } + /// + /// 通道号 + /// + public int ChannelNo { get; set; } + /// + /// key + /// + /// + public override string ToString() + { + return $"{Sim}_{ChannelNo}"; + } + } +} diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs index 31eeeaf..7e190fb 100644 --- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs +++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs @@ -8,17 +8,38 @@ using System.Text.Json.Serialization; namespace JT1078.Gateway.Metadata { + /// + /// http上下文 + /// public class JT1078HttpContext { + /// + /// 会话Id + /// public string SessionId { get; } + /// + /// http上下文 + /// [JsonIgnore] public HttpListenerContext Context { get; } + /// + /// ws上下文 + /// [JsonIgnore] public HttpListenerWebSocketContext WebSocketContext { get; } + /// + /// 用户信息 + /// public IPrincipal User { get; } + /// + /// 观看视频类型 + /// public RTPVideoType RTPVideoType { get; set; } public string Sim { get; set; } public int ChannelNo { get; set; } + /// + /// 是否是ws协议 + /// public bool IsWebSocket { get @@ -26,8 +47,19 @@ namespace JT1078.Gateway.Metadata return Context.Request.IsWebSocketRequest; } } + /// + /// 开始时间 + /// public DateTime StartTime { get; set; } + /// + /// 是否发送首包视频数据 + /// public bool FirstSend { get; set; } + /// + /// + /// + /// + /// public JT1078HttpContext(HttpListenerContext context, IPrincipal user) { Context = context; @@ -36,6 +68,12 @@ namespace JT1078.Gateway.Metadata SessionId = Guid.NewGuid().ToString("N"); FirstSend = false; } + /// + /// + /// + /// + /// + /// public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user) { Context = context; @@ -46,10 +84,30 @@ namespace JT1078.Gateway.Metadata FirstSend = false; } } + /// + /// 观看视频类型 + /// public enum RTPVideoType { + /// + /// Http_Flv + /// Http_Flv, + /// + /// Ws_Flv + /// Ws_Flv, + /// + /// Http_Hls + /// Http_Hls, + /// + /// Http_FMp4 + /// + Http_FMp4, + /// + /// Ws_FMp4 + /// + Ws_FMp4, } } diff --git a/src/JT1078.Gateway/HLSPathStorage.cs b/src/JT1078.Gateway/Services/HLSPathStorage.cs similarity index 98% rename from src/JT1078.Gateway/HLSPathStorage.cs rename to src/JT1078.Gateway/Services/HLSPathStorage.cs index e734d81..55a4a4e 100644 --- a/src/JT1078.Gateway/HLSPathStorage.cs +++ b/src/JT1078.Gateway/Services/HLSPathStorage.cs @@ -5,7 +5,7 @@ using System.IO; using System.Linq; using System.Text; -namespace JT1078.Gateway +namespace JT1078.Gateway.Services { /// /// hls路径是否存在处理,及文件监控处理 diff --git a/src/JT1078.Gateway/HLSRequestManager.cs b/src/JT1078.Gateway/Services/HLSRequestManager.cs similarity index 99% rename from src/JT1078.Gateway/HLSRequestManager.cs rename to src/JT1078.Gateway/Services/HLSRequestManager.cs index 64531f6..3b6ea34 100644 --- a/src/JT1078.Gateway/HLSRequestManager.cs +++ b/src/JT1078.Gateway/Services/HLSRequestManager.cs @@ -17,7 +17,7 @@ using System.Text; using System.Threading; using System.Threading.Tasks; -namespace JT1078.Gateway +namespace JT1078.Gateway.Services { /// /// Hls请求管理