diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs index e88b9f3..01b95ec 100644 --- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs +++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs @@ -14,9 +14,13 @@ namespace JT1078.Gateway.Configurations public int MiniNumBufferSize { get; set; } = 8096; /// /// http写超时 - /// 默认5s检查一次 /// - public int HttpWriterIdleTimeSeconds { get; set; } = 5; + public int HttpWriterIdleTimeSeconds { get; set; } = 60; + /// + /// http写超时 + /// 默认60s检查一次 + /// + public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 60; /// /// Tcp读超时 /// 默认10分钟检查一次 diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index e769055..f28905a 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -53,6 +53,8 @@ namespace JT1078.Gateway public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder) { builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddHostedService(); builder.JT1078Builder.Services.AddHostedService(); return builder; } diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs index 69c99f9..bc14363 100644 --- a/src/JT1078.Gateway/JT1078HttpServer.cs +++ b/src/JT1078.Gateway/JT1078HttpServer.cs @@ -1,6 +1,7 @@ using JT1078.Gateway.Abstractions; using JT1078.Gateway.Configurations; using JT1078.Gateway.Metadata; +using JT1078.Gateway.Sessions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -27,14 +28,18 @@ namespace JT1078.Gateway private HttpListener listener; + private JT1078HttpSessionManager SessionManager; + public JT1078HttpServer( IOptions jT1078ConfigurationAccessor, IJT1078Authorization authorization, + JT1078HttpSessionManager sessionManager, ILoggerFactory loggerFactory) { Logger = loggerFactory.CreateLogger(); Configuration = jT1078ConfigurationAccessor.Value; this.authorization = authorization; + this.SessionManager = sessionManager; } public Task StartAsync(CancellationToken cancellationToken) @@ -46,8 +51,15 @@ namespace JT1078.Gateway } listener = new HttpListener(); listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous; - listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/"); - listener.Start(); + try + { + listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/"); + listener.Start(); + } + catch (System.Net.HttpListenerException ex) + { + Logger.LogWarning(ex, $"{ex.Message}:使用cmd命令[netsh http add urlacl url=http://*:{Configuration.HttpPort}/ user=Everyone]"); + } Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}."); Task.Factory.StartNew(async() => { @@ -81,6 +93,7 @@ namespace JT1078.Gateway { Http404(context); } + //todo:.m3u8 .ts if (Logger.IsEnabled(LogLevel.Trace)) { Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}"); @@ -99,7 +112,7 @@ namespace JT1078.Gateway var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal); jT1078HttpContext.Sim = sim; jT1078HttpContext.ChannelNo = channelNo; - //todo: add session manager + SessionManager.TryAdd(jT1078HttpContext); await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None); await Task.Factory.StartNew(async(state) => { @@ -135,8 +148,12 @@ namespace JT1078.Gateway { Logger.LogTrace($"[ws close]:{websocketContext}"); } - //todo:session close notice - await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None); + try + { + await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None); + } + catch {} + SessionManager.TryRemove(websocketContext.SessionId); }, jT1078HttpContext); } else @@ -144,19 +161,7 @@ namespace JT1078.Gateway var jT1078HttpContext = new JT1078HttpContext(context,principal); jT1078HttpContext.Sim = sim; jT1078HttpContext.ChannelNo = channelNo; - - //todo:add session manager - - //todo:set http chunk - - //todo:session close notice - - byte[] b = Encoding.UTF8.GetBytes("ack"); - context.Response.StatusCode = 200; - context.Response.KeepAlive = true; - context.Response.ContentLength64 = b.Length; - await context.Response.OutputStream.WriteAsync(b, 0, b.Length); - context.Response.Close(); + SessionManager.TryAdd(jT1078HttpContext); } } @@ -204,11 +209,16 @@ namespace JT1078.Gateway { try { + SessionManager.TryRemoveAll(); listener.Stop(); } catch (System.ObjectDisposedException ex) { + } + catch (Exception ex) + { + } return Task.CompletedTask; } diff --git a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs new file mode 100644 index 0000000..0252e4e --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs @@ -0,0 +1,62 @@ +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Jobs +{ + internal class JT1078HttpWriterTimeoutJob : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT1078HttpSessionManager SessionManager; + + private readonly IOptionsMonitor Configuration; + public JT1078HttpWriterTimeoutJob( + IOptionsMonitor jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078HttpSessionManager jT1078SessionManager + ) + { + SessionManager = jT1078SessionManager; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + foreach (var item in SessionManager.GetAll()) + { + if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now) + { + SessionManager.TryRemove(item.SessionId); + } + } + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Http Check Writer Timeout]"); + Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}"); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Http Writer Timeout]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken); + } + } + } + } +} diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs index 2d09b54..13fa461 100644 --- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs +++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs @@ -9,6 +9,7 @@ namespace JT1078.Gateway.Metadata { public class JT1078HttpContext { + public string SessionId { get; } public HttpListenerContext Context { get; } public HttpListenerWebSocketContext WebSocketContext { get; } public IPrincipal User { get; } @@ -21,16 +22,24 @@ namespace JT1078.Gateway.Metadata return Context.Request.IsWebSocketRequest; } } + public DateTime ActiveTime { get; set; } + public DateTime StartTime { get; set; } public JT1078HttpContext(HttpListenerContext context, IPrincipal user) { Context = context; User = user; + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionId = Guid.NewGuid().ToString("N"); } public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user) { Context = context; WebSocketContext = webSocketContext; User = user; + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionId = Guid.NewGuid().ToString("N"); } } } diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs new file mode 100644 index 0000000..babad66 --- /dev/null +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -0,0 +1,81 @@ +using JT1078.Gateway.Metadata; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net.WebSockets; +using System.Text; +using System.Threading; + +namespace JT1078.Gateway.Sessions +{ + public class JT1078HttpSessionManager + { + public ConcurrentDictionary Sessions { get; } + + public JT1078HttpSessionManager() + { + Sessions = new ConcurrentDictionary(); + } + + public bool TryAdd(JT1078HttpContext httpContext) + { + return Sessions.TryAdd(httpContext.SessionId, httpContext); + } + + public bool TryRemove(string sessionId) + { + //todo:session close notice + return Sessions.TryRemove(sessionId,out JT1078HttpContext session); + } + + public void SendHttpChunk(byte[] data) + { + //todo:set http chunk + //todo:session close notice + //byte[] b = Encoding.UTF8.GetBytes("ack"); + //context.Response.StatusCode = 200; + //context.Response.KeepAlive = true; + //context.Response.ContentLength64 = b.Length; + //await context.Response.OutputStream.WriteAsync(b, 0, b.Length); + //context.Response.Close(); + } + + + + public int SessionCount + { + get + { + return Sessions.Count; + } + } + + public List GetAll() + { + return Sessions.Select(s => s.Value).ToList(); + } + + internal void TryRemoveAll() + { + foreach(var item in Sessions) + { + try + { + if (item.Value.IsWebSocket) + { + item.Value.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "server close", CancellationToken.None); + } + else + { + item.Value.Context.Response.Close(); + } + } + catch (Exception) + { + + } + } + } + } +}