Kaynağa Gözat

增加http会话的写超时

master
SmallChi(Koike) 4 yıl önce
ebeveyn
işleme
bcd4bd8a62
6 değiştirilmiş dosya ile 188 ekleme ve 20 silme
  1. +6
    -2
      src/JT1078.Gateway/Configurations/JT1078Configuration.cs
  2. +2
    -0
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  3. +28
    -18
      src/JT1078.Gateway/JT1078HttpServer.cs
  4. +62
    -0
      src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
  5. +9
    -0
      src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
  6. +81
    -0
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 6
- 2
src/JT1078.Gateway/Configurations/JT1078Configuration.cs Dosyayı Görüntüle

@@ -14,9 +14,13 @@ namespace JT1078.Gateway.Configurations
public int MiniNumBufferSize { get; set; } = 8096;
/// <summary>
/// http写超时
/// 默认5s检查一次
/// </summary>
public int HttpWriterIdleTimeSeconds { get; set; } = 5;
public int HttpWriterIdleTimeSeconds { get; set; } = 60;
/// <summary>
/// http写超时
/// 默认60s检查一次
/// </summary>
public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 60;
/// <summary>
/// Tcp读超时
/// 默认10分钟检查一次


+ 2
- 0
src/JT1078.Gateway/JT1078GatewayExtensions.cs Dosyayı Görüntüle

@@ -53,6 +53,8 @@ namespace JT1078.Gateway
public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton<IJT1078Authorization, JT1078AuthorizationDefault>();
builder.JT1078Builder.Services.AddSingleton<JT1078HttpSessionManager>();
builder.JT1078Builder.Services.AddHostedService<JT1078HttpWriterTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078HttpServer>();
return builder;
}


+ 28
- 18
src/JT1078.Gateway/JT1078HttpServer.cs Dosyayı Görüntüle

@@ -1,6 +1,7 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Metadata;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -27,14 +28,18 @@ namespace JT1078.Gateway

private HttpListener listener;

private JT1078HttpSessionManager SessionManager;

public JT1078HttpServer(
IOptions<JT1078Configuration> jT1078ConfigurationAccessor,
IJT1078Authorization authorization,
JT1078HttpSessionManager sessionManager,
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
Configuration = jT1078ConfigurationAccessor.Value;
this.authorization = authorization;
this.SessionManager = sessionManager;
}

public Task StartAsync(CancellationToken cancellationToken)
@@ -46,8 +51,15 @@ namespace JT1078.Gateway
}
listener = new HttpListener();
listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous;
listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/");
listener.Start();
try
{
listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/");
listener.Start();
}
catch (System.Net.HttpListenerException ex)
{
Logger.LogWarning(ex, $"{ex.Message}:使用cmd命令[netsh http add urlacl url=http://*:{Configuration.HttpPort}/ user=Everyone]");
}
Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}.");
Task.Factory.StartNew(async() =>
{
@@ -81,6 +93,7 @@ namespace JT1078.Gateway
{
Http404(context);
}
//todo:.m3u8 .ts
if (Logger.IsEnabled(LogLevel.Trace))
{
Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}-{context.Request.RemoteEndPoint.ToString()}");
@@ -99,7 +112,7 @@ namespace JT1078.Gateway
var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;
//todo: add session manager
SessionManager.TryAdd(jT1078HttpContext);
await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None);
await Task.Factory.StartNew(async(state) =>
{
@@ -135,8 +148,12 @@ namespace JT1078.Gateway
{
Logger.LogTrace($"[ws close]:{websocketContext}");
}
//todo:session close notice
await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None);
try
{
await websocketContext.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None);
}
catch {}
SessionManager.TryRemove(websocketContext.SessionId);
}, jT1078HttpContext);
}
else
@@ -144,19 +161,7 @@ namespace JT1078.Gateway
var jT1078HttpContext = new JT1078HttpContext(context,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;

//todo:add session manager

//todo:set http chunk

//todo:session close notice

byte[] b = Encoding.UTF8.GetBytes("ack");
context.Response.StatusCode = 200;
context.Response.KeepAlive = true;
context.Response.ContentLength64 = b.Length;
await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
context.Response.Close();
SessionManager.TryAdd(jT1078HttpContext);
}
}

@@ -204,11 +209,16 @@ namespace JT1078.Gateway
{
try
{
SessionManager.TryRemoveAll();
listener.Stop();
}
catch (System.ObjectDisposedException ex)
{

}
catch (Exception ex)
{

}
return Task.CompletedTask;
}


+ 62
- 0
src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs Dosyayı Görüntüle

@@ -0,0 +1,62 @@
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Jobs
{
internal class JT1078HttpWriterTimeoutJob : BackgroundService
{
private readonly ILogger Logger;

private readonly JT1078HttpSessionManager SessionManager;

private readonly IOptionsMonitor<JT1078Configuration> Configuration;
public JT1078HttpWriterTimeoutJob(
IOptionsMonitor<JT1078Configuration> jT1078ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT1078HttpSessionManager jT1078SessionManager
)
{
SessionManager = jT1078SessionManager;
Logger = loggerFactory.CreateLogger<JT1078TcpReceiveTimeoutJob>();
Configuration = jT1078ConfigurationAccessor;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
foreach (var item in SessionManager.GetAll())
{
if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now)
{
SessionManager.TryRemove(item.SessionId);
}
}
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Http Check Writer Timeout]");
Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}");
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Http Writer Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

+ 9
- 0
src/JT1078.Gateway/Metadata/JT1078HttpContext.cs Dosyayı Görüntüle

@@ -9,6 +9,7 @@ namespace JT1078.Gateway.Metadata
{
public class JT1078HttpContext
{
public string SessionId { get; }
public HttpListenerContext Context { get; }
public HttpListenerWebSocketContext WebSocketContext { get; }
public IPrincipal User { get; }
@@ -21,16 +22,24 @@ namespace JT1078.Gateway.Metadata
return Context.Request.IsWebSocketRequest;
}
}
public DateTime ActiveTime { get; set; }
public DateTime StartTime { get; set; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{
Context = context;
User = user;
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
}
public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user)
{
Context = context;
WebSocketContext = webSocketContext;
User = user;
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
}
}
}

+ 81
- 0
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs Dosyayı Görüntüle

@@ -0,0 +1,81 @@
using JT1078.Gateway.Metadata;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Text;
using System.Threading;

namespace JT1078.Gateway.Sessions
{
public class JT1078HttpSessionManager
{
public ConcurrentDictionary<string, JT1078HttpContext> Sessions { get; }

public JT1078HttpSessionManager()
{
Sessions = new ConcurrentDictionary<string, JT1078HttpContext>();
}

public bool TryAdd(JT1078HttpContext httpContext)
{
return Sessions.TryAdd(httpContext.SessionId, httpContext);
}

public bool TryRemove(string sessionId)
{
//todo:session close notice
return Sessions.TryRemove(sessionId,out JT1078HttpContext session);
}

public void SendHttpChunk(byte[] data)
{
//todo:set http chunk
//todo:session close notice
//byte[] b = Encoding.UTF8.GetBytes("ack");
//context.Response.StatusCode = 200;
//context.Response.KeepAlive = true;
//context.Response.ContentLength64 = b.Length;
//await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
//context.Response.Close();
}



public int SessionCount
{
get
{
return Sessions.Count;
}
}

public List<JT1078HttpContext> GetAll()
{
return Sessions.Select(s => s.Value).ToList();
}

internal void TryRemoveAll()
{
foreach(var item in Sessions)
{
try
{
if (item.Value.IsWebSocket)
{
item.Value.WebSocketContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "server close", CancellationToken.None);
}
else
{
item.Value.Context.Response.Close();
}
}
catch (Exception)
{

}
}
}
}
}

Yükleniyor…
İptal
Kaydet