From bcd4bd8a6242118a85d19153c9104f85a90350ce Mon Sep 17 00:00:00 2001
From: "SmallChi(Koike)" <564952747@qq.com>
Date: Sun, 2 Aug 2020 22:07:08 +0800
Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0http=E4=BC=9A=E8=AF=9D?=
=?UTF-8?q?=E7=9A=84=E5=86=99=E8=B6=85=E6=97=B6?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Configurations/JT1078Configuration.cs | 8 +-
src/JT1078.Gateway/JT1078GatewayExtensions.cs | 2 +
src/JT1078.Gateway/JT1078HttpServer.cs | 46 ++++++-----
.../Jobs/JT1078HttpWriterTimeoutJob.cs | 62 ++++++++++++++
.../Metadata/JT1078HttpContext.cs | 9 +++
.../Sessions/JT1078HttpSessionManager.cs | 81 +++++++++++++++++++
6 files changed, 188 insertions(+), 20 deletions(-)
create mode 100644 src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
create mode 100644 src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
index e88b9f3..01b95ec 100644
--- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
+++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
@@ -14,9 +14,13 @@ namespace JT1078.Gateway.Configurations
public int MiniNumBufferSize { get; set; } = 8096;
///
/// http写超时
- /// 默认5s检查一次
///
- public int HttpWriterIdleTimeSeconds { get; set; } = 5;
+ public int HttpWriterIdleTimeSeconds { get; set; } = 60;
+ ///
+ /// http写超时
+ /// 默认60s检查一次
+ ///
+ public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 60;
///
/// Tcp读超时
/// 默认10分钟检查一次
diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
index e769055..f28905a 100644
--- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs
+++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
@@ -53,6 +53,8 @@ namespace JT1078.Gateway
public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton();
+ builder.JT1078Builder.Services.AddSingleton();
+ builder.JT1078Builder.Services.AddHostedService();
builder.JT1078Builder.Services.AddHostedService();
return builder;
}
diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs
index 69c99f9..bc14363 100644
--- a/src/JT1078.Gateway/JT1078HttpServer.cs
+++ b/src/JT1078.Gateway/JT1078HttpServer.cs
@@ -1,6 +1,7 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Metadata;
+using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -27,14 +28,18 @@ namespace JT1078.Gateway
private HttpListener listener;
+ private JT1078HttpSessionManager SessionManager;
+
public JT1078HttpServer(
IOptions jT1078ConfigurationAccessor,
IJT1078Authorization authorization,
+ JT1078HttpSessionManager sessionManager,
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger();
Configuration = jT1078ConfigurationAccessor.Value;
this.authorization = authorization;
+ this.SessionManager = sessionManager;
}
public Task StartAsync(CancellationToken cancellationToken)
@@ -46,8 +51,15 @@ namespace JT1078.Gateway
}
listener = new HttpListener();
listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous;
- listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/");
- listener.Start();
+ try
+ {
+ listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/");
+ listener.Start();
+ }
+ catch (System.Net.HttpListenerException ex)
+ {
+ Logger.LogWarning(ex, $"{ex.Message}:使用cmd命令[netsh http add urlacl url=http://*:{Configuration.HttpPort}/ user=Everyone]");
+ }
Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}.");
Task.Factory.StartNew(async() =>
{
@@ -81,6 +93,7 @@ namespace JT1078.Gateway
{
Http404(context);
}
+ //todo:.m3u8 .ts
if (Logger.IsEnabled(LogLevel.Trace))
{
Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}");
@@ -99,7 +112,7 @@ namespace JT1078.Gateway
var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;
- //todo: add session manager
+ SessionManager.TryAdd(jT1078HttpContext);
await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None);
await Task.Factory.StartNew(async(state) =>
{
@@ -135,8 +148,12 @@ namespace JT1078.Gateway
{
Logger.LogTrace($"[ws close]:{websocketContext}");
}
- //todo:session close notice
- await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None);
+ try
+ {
+ await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None);
+ }
+ catch {}
+ SessionManager.TryRemove(websocketContext.SessionId);
}, jT1078HttpContext);
}
else
@@ -144,19 +161,7 @@ namespace JT1078.Gateway
var jT1078HttpContext = new JT1078HttpContext(context,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;
-
- //todo:add session manager
-
- //todo:set http chunk
-
- //todo:session close notice
-
- byte[] b = Encoding.UTF8.GetBytes("ack");
- context.Response.StatusCode = 200;
- context.Response.KeepAlive = true;
- context.Response.ContentLength64 = b.Length;
- await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
- context.Response.Close();
+ SessionManager.TryAdd(jT1078HttpContext);
}
}
@@ -204,11 +209,16 @@ namespace JT1078.Gateway
{
try
{
+ SessionManager.TryRemoveAll();
listener.Stop();
}
catch (System.ObjectDisposedException ex)
{
+ }
+ catch (Exception ex)
+ {
+
}
return Task.CompletedTask;
}
diff --git a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
new file mode 100644
index 0000000..0252e4e
--- /dev/null
+++ b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
@@ -0,0 +1,62 @@
+using JT1078.Gateway.Configurations;
+using JT1078.Gateway.Sessions;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT1078.Gateway.Jobs
+{
+ internal class JT1078HttpWriterTimeoutJob : BackgroundService
+ {
+ private readonly ILogger Logger;
+
+ private readonly JT1078HttpSessionManager SessionManager;
+
+ private readonly IOptionsMonitor Configuration;
+ public JT1078HttpWriterTimeoutJob(
+ IOptionsMonitor jT1078ConfigurationAccessor,
+ ILoggerFactory loggerFactory,
+ JT1078HttpSessionManager jT1078SessionManager
+ )
+ {
+ SessionManager = jT1078SessionManager;
+ Logger = loggerFactory.CreateLogger();
+ Configuration = jT1078ConfigurationAccessor;
+ }
+
+ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
+ {
+ while (!stoppingToken.IsCancellationRequested)
+ {
+ try
+ {
+ foreach (var item in SessionManager.GetAll())
+ {
+ if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now)
+ {
+ SessionManager.TryRemove(item.SessionId);
+ }
+ }
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[Http Check Writer Timeout]");
+ Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}");
+ }
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"[Http Writer Timeout]");
+ }
+ finally
+ {
+ await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken);
+ }
+ }
+ }
+ }
+}
diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
index 2d09b54..13fa461 100644
--- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
+++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
@@ -9,6 +9,7 @@ namespace JT1078.Gateway.Metadata
{
public class JT1078HttpContext
{
+ public string SessionId { get; }
public HttpListenerContext Context { get; }
public HttpListenerWebSocketContext WebSocketContext { get; }
public IPrincipal User { get; }
@@ -21,16 +22,24 @@ namespace JT1078.Gateway.Metadata
return Context.Request.IsWebSocketRequest;
}
}
+ public DateTime ActiveTime { get; set; }
+ public DateTime StartTime { get; set; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{
Context = context;
User = user;
+ ActiveTime = DateTime.Now;
+ StartTime = DateTime.Now;
+ SessionId = Guid.NewGuid().ToString("N");
}
public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user)
{
Context = context;
WebSocketContext = webSocketContext;
User = user;
+ ActiveTime = DateTime.Now;
+ StartTime = DateTime.Now;
+ SessionId = Guid.NewGuid().ToString("N");
}
}
}
diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
new file mode 100644
index 0000000..babad66
--- /dev/null
+++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
@@ -0,0 +1,81 @@
+using JT1078.Gateway.Metadata;
+using System;
+using System.Collections.Concurrent;
+using System.Collections.Generic;
+using System.Linq;
+using System.Net.WebSockets;
+using System.Text;
+using System.Threading;
+
+namespace JT1078.Gateway.Sessions
+{
+ public class JT1078HttpSessionManager
+ {
+ public ConcurrentDictionary Sessions { get; }
+
+ public JT1078HttpSessionManager()
+ {
+ Sessions = new ConcurrentDictionary();
+ }
+
+ public bool TryAdd(JT1078HttpContext httpContext)
+ {
+ return Sessions.TryAdd(httpContext.SessionId, httpContext);
+ }
+
+ public bool TryRemove(string sessionId)
+ {
+ //todo:session close notice
+ return Sessions.TryRemove(sessionId,out JT1078HttpContext session);
+ }
+
+ public void SendHttpChunk(byte[] data)
+ {
+ //todo:set http chunk
+ //todo:session close notice
+ //byte[] b = Encoding.UTF8.GetBytes("ack");
+ //context.Response.StatusCode = 200;
+ //context.Response.KeepAlive = true;
+ //context.Response.ContentLength64 = b.Length;
+ //await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
+ //context.Response.Close();
+ }
+
+
+
+ public int SessionCount
+ {
+ get
+ {
+ return Sessions.Count;
+ }
+ }
+
+ public List GetAll()
+ {
+ return Sessions.Select(s => s.Value).ToList();
+ }
+
+ internal void TryRemoveAll()
+ {
+ foreach(var item in Sessions)
+ {
+ try
+ {
+ if (item.Value.IsWebSocket)
+ {
+ item.Value.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "server close", CancellationToken.None);
+ }
+ else
+ {
+ item.Value.Context.Response.Close();
+ }
+ }
+ catch (Exception)
+ {
+
+ }
+ }
+ }
+ }
+}