Kaynağa Gözat

完善http和ws的推送及对应的会话维护

master
SmallChi(Koike) 4 yıl önce
ebeveyn
işleme
4469d48646
9 değiştirilmiş dosya ile 194 ekleme ve 100 silme
  1. +1
    -1
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
  2. +2
    -2
      src/JT1078.Gateway.sln
  3. +0
    -9
      src/JT1078.Gateway/Configurations/JT1078Configuration.cs
  4. +20
    -0
      src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs
  5. +0
    -1
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  6. +8
    -7
      src/JT1078.Gateway/JT1078HttpServer.cs
  7. +0
    -66
      src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
  8. +3
    -3
      src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
  9. +160
    -11
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 1
- 1
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs Dosyayı Görüntüle

@@ -19,7 +19,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services
{
PackageConsumer.OnMessage((Message) =>
{
});
return Task.CompletedTask;
}


+ 2
- 2
src/JT1078.Gateway.sln Dosyayı Görüntüle

@@ -11,9 +11,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution


+ 0
- 9
src/JT1078.Gateway/Configurations/JT1078Configuration.cs Dosyayı Görüntüle

@@ -13,15 +13,6 @@ namespace JT1078.Gateway.Configurations
public int SoBacklog { get; set; } = 8192;
public int MiniNumBufferSize { get; set; } = 8096;
/// <summary>
/// http写超时
/// </summary>
public int HttpWriterIdleTimeSeconds { get; set; } = 60;
/// <summary>
/// http写超时
/// 默认30s检查一次
/// </summary>
public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 30;
/// <summary>
/// Tcp读超时
/// 默认10分钟检查一次
/// </summary>


+ 20
- 0
src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs Dosyayı Görüntüle

@@ -51,6 +51,19 @@ namespace JT1078.Gateway.Extensions
context.Response.Close();
}

public static async ValueTask HttpSendFirstChunked(this JT1078HttpContext context, ReadOnlyMemory<byte> buffer)
{
context.Context.Response.StatusCode = (int)HttpStatusCode.OK;
context.Context.Response.SendChunked = true;
await context.Context.Response.OutputStream.WriteAsync(buffer);
}

public static async ValueTask HttpSendChunked(this JT1078HttpContext context, ReadOnlyMemory<byte> buffer)
{
context.Context.Response.StatusCode = (int)HttpStatusCode.OK;
await context.Context.Response.OutputStream.WriteAsync(buffer);
}

public static async ValueTask HttpClose(this JT1078HttpContext context)
{
byte[] b = Encoding.UTF8.GetBytes("close");
@@ -76,5 +89,12 @@ namespace JT1078.Gateway.Extensions
{
await context.WebSocketContext.WebSocket.SendAsync(buffer, WebSocketMessageType.Binary, true, CancellationToken.None);
}

private static ReadOnlyMemory<byte> Hello = Encoding.UTF8.GetBytes("hello,jt1078");

public static async ValueTask WebSocketSendHelloAsync(this JT1078HttpContext context)
{
await context.WebSocketContext.WebSocket.SendAsync(Hello, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
}

+ 0
- 1
src/JT1078.Gateway/JT1078GatewayExtensions.cs Dosyayı Görüntüle

@@ -54,7 +54,6 @@ namespace JT1078.Gateway
{
builder.JT1078Builder.Services.AddSingleton<IJT1078Authorization, JT1078AuthorizationDefault>();
builder.JT1078Builder.Services.AddSingleton<JT1078HttpSessionManager>();
builder.JT1078Builder.Services.AddHostedService<JT1078HttpWriterTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078HttpServer>();
return builder;
}


+ 8
- 7
src/JT1078.Gateway/JT1078HttpServer.cs Dosyayı Görüntüle

@@ -109,23 +109,24 @@ namespace JT1078.Gateway
int.TryParse(channel, out int channelNo);
if (context.Request.IsWebSocketRequest)
{
HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null);
HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null, keepAliveInterval:TimeSpan.FromSeconds(5));
var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;
SessionManager.TryAdd(jT1078HttpContext);
await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None);
await Task.Factory.StartNew(async(state) =>
await jT1078HttpContext.WebSocketSendHelloAsync();
await Task.Factory.StartNew(async(state) =>
{
//https://www.bejson.com/httputil/websocket/
//ws://localhost:15555?token=22&sim=1221&channel=1
var websocketContext = state as JT1078HttpContext;
while(websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open ||
websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting)
while (websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open ||
websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting)
{
var buffer = ArrayPool<byte>.Shared.Rent(256);
try
{
//客户端主动断开需要有个线程去接收通知,不然会客户端会卡死直到超时
WebSocketReceiveResult receiveResult = await websocketContext.WebSocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None);
if (receiveResult.EndOfMessage)
{
@@ -145,9 +146,9 @@ namespace JT1078.Gateway
ArrayPool<byte>.Shared.Return(buffer);
}
}
if (Logger.IsEnabled(LogLevel.Trace))
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogTrace($"[ws close]:{websocketContext}");
Logger.LogInformation($"[ws close]:{websocketContext.SessionId}-{websocketContext.Sim}-{websocketContext.ChannelNo}-{websocketContext.StartTime:yyyyMMddhhmmss}");
}
SessionManager.TryRemove(websocketContext.SessionId);
}, jT1078HttpContext);


+ 0
- 66
src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs Dosyayı Görüntüle

@@ -1,66 +0,0 @@
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Jobs
{
internal class JT1078HttpWriterTimeoutJob : BackgroundService
{
private readonly ILogger Logger;

private readonly JT1078HttpSessionManager SessionManager;

private readonly IOptionsMonitor<JT1078Configuration> Configuration;
public JT1078HttpWriterTimeoutJob(
IOptionsMonitor<JT1078Configuration> jT1078ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT1078HttpSessionManager jT1078SessionManager
)
{
SessionManager = jT1078SessionManager;
Logger = loggerFactory.CreateLogger<JT1078TcpReceiveTimeoutJob>();
Configuration = jT1078ConfigurationAccessor;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
foreach (var item in SessionManager.GetAll())
{
//过滤掉websocket的方式,无论是客户端主动断开还是关闭浏览器都会有断开通知的情况,所以就只需要判断http的写超时
if (!item.IsWebSocket)
{
if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now)
{
SessionManager.TryRemove(item.SessionId);
}
}
}
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Http Check Writer Timeout]");
Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}");
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Http Writer Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

+ 3
- 3
src/JT1078.Gateway/Metadata/JT1078HttpContext.cs Dosyayı Görüntüle

@@ -22,24 +22,24 @@ namespace JT1078.Gateway.Metadata
return Context.Request.IsWebSocketRequest;
}
}
public DateTime ActiveTime { get; set; }
public DateTime StartTime { get; set; }
public bool SendChunked { get; set; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{
Context = context;
User = user;
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
SendChunked = false;
}
public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user)
{
Context = context;
WebSocketContext = webSocketContext;
User = user;
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
SendChunked = false;
}
}
}

+ 160
- 11
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs Dosyayı Görüntüle

@@ -1,5 +1,6 @@
using JT1078.Gateway.Extensions;
using JT1078.Gateway.Metadata;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -15,10 +16,11 @@ namespace JT1078.Gateway.Sessions
public class JT1078HttpSessionManager
{
public ConcurrentDictionary<string, JT1078HttpContext> Sessions { get; }
public JT1078HttpSessionManager()
private ILogger Logger;
public JT1078HttpSessionManager(ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary<string, JT1078HttpContext>();
Logger = loggerFactory.CreateLogger<JT1078HttpSessionManager>();
}

public bool TryAdd(JT1078HttpContext httpContext)
@@ -53,16 +55,163 @@ namespace JT1078.Gateway.Sessions
}
}

public void SendHttpChunk(byte[] data)
private void remove(string sessionId)
{
if (Sessions.TryRemove(sessionId, out JT1078HttpContext session))
{
//todo:session close notice
}
}

/// <summary>
/// 发送音视频数据
/// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="data"></param>
public void SendAVData(string sim,int channelNo,byte[] data)
{
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList();
ParallelLoopResult parallelLoopResult= Parallel.ForEach(contexts, async(context) =>
{
if (context.IsWebSocket)
{
try
{
await context.WebSocketSendBinaryAsync(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
else
{
if (!context.SendChunked)
{
context.SendChunked = true;
Sessions.TryUpdate(context.SessionId, context, context);
try
{
await context.HttpSendFirstChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
else
{
try
{
await context.HttpSendChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
}
});
if (parallelLoopResult.IsCompleted)
{

}
}

/// <summary>
/// 发送音视频数据到websocket
/// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="data"></param>
public void SendAVData2WebSocket(string sim, int channelNo, byte[] data)
{
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && w.IsWebSocket).ToList();
ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
{
if (context.IsWebSocket)
{
try
{
await context.WebSocketSendBinaryAsync(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
});
if (parallelLoopResult.IsCompleted)
{

}
}

/// <summary>
/// 发送音视频数据到Http Chunked中
/// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="data"></param>
public void SendAVData2HttpChunked(string sim, int channelNo, byte[] data)
{
//todo:set http chunk
//todo:session close notice
//byte[] b = Encoding.UTF8.GetBytes("ack");
//context.Response.StatusCode = 200;
//context.Response.KeepAlive = true;
//context.Response.ContentLength64 = b.Length;
//await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
//context.Response.Close();
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && !w.IsWebSocket).ToList();
ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
{
if (!context.SendChunked)
{
context.SendChunked = true;
Sessions.TryUpdate(context.SessionId, context, context);
try
{
await context.HttpSendFirstChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
else
{
try
{
await context.HttpSendChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
});
if (parallelLoopResult.IsCompleted)
{

}
}

public int SessionCount


Yükleniyor…
İptal
Kaydet