From 4469d4864674e0e7a2183972e63ea580c9ec1370 Mon Sep 17 00:00:00 2001
From: "SmallChi(Koike)" <564952747@qq.com>
Date: Thu, 6 Aug 2020 18:09:43 +0800
Subject: [PATCH] =?UTF-8?q?=E5=AE=8C=E5=96=84http=E5=92=8Cws=E7=9A=84?=
=?UTF-8?q?=E6=8E=A8=E9=80=81=E5=8F=8A=E5=AF=B9=E5=BA=94=E7=9A=84=E4=BC=9A?=
=?UTF-8?q?=E8=AF=9D=E7=BB=B4=E6=8A=A4?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../Services/JT1078NormalMsgHostedService.cs | 2 +-
src/JT1078.Gateway.sln | 4 +-
.../Configurations/JT1078Configuration.cs | 9 -
.../Extensions/JT1078HttpContextExtensions.cs | 20 ++
src/JT1078.Gateway/JT1078GatewayExtensions.cs | 1 -
src/JT1078.Gateway/JT1078HttpServer.cs | 15 +-
.../Jobs/JT1078HttpWriterTimeoutJob.cs | 66 -------
.../Metadata/JT1078HttpContext.cs | 6 +-
.../Sessions/JT1078HttpSessionManager.cs | 171 ++++++++++++++++--
9 files changed, 194 insertions(+), 100 deletions(-)
delete mode 100644 src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
index c63ea63..6e9b742 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
@@ -19,7 +19,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services
{
PackageConsumer.OnMessage((Message) =>
{
-
+
});
return Task.CompletedTask;
}
diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln
index de89a84..df3bbe5 100644
--- a/src/JT1078.Gateway.sln
+++ b/src/JT1078.Gateway.sln
@@ -11,9 +11,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
index 41ddc03..2793fcd 100644
--- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
+++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs
@@ -13,15 +13,6 @@ namespace JT1078.Gateway.Configurations
public int SoBacklog { get; set; } = 8192;
public int MiniNumBufferSize { get; set; } = 8096;
///
- /// http写超时
- ///
- public int HttpWriterIdleTimeSeconds { get; set; } = 60;
- ///
- /// http写超时
- /// 默认30s检查一次
- ///
- public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 30;
- ///
/// Tcp读超时
/// 默认10分钟检查一次
///
diff --git a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs
index 5d7cc38..2253eb2 100644
--- a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs
+++ b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs
@@ -51,6 +51,19 @@ namespace JT1078.Gateway.Extensions
context.Response.Close();
}
+ public static async ValueTask HttpSendFirstChunked(this JT1078HttpContext context, ReadOnlyMemory buffer)
+ {
+ context.Context.Response.StatusCode = (int)HttpStatusCode.OK;
+ context.Context.Response.SendChunked = true;
+ await context.Context.Response.OutputStream.WriteAsync(buffer);
+ }
+
+ public static async ValueTask HttpSendChunked(this JT1078HttpContext context, ReadOnlyMemory buffer)
+ {
+ context.Context.Response.StatusCode = (int)HttpStatusCode.OK;
+ await context.Context.Response.OutputStream.WriteAsync(buffer);
+ }
+
public static async ValueTask HttpClose(this JT1078HttpContext context)
{
byte[] b = Encoding.UTF8.GetBytes("close");
@@ -76,5 +89,12 @@ namespace JT1078.Gateway.Extensions
{
await context.WebSocketContext.WebSocket.SendAsync(buffer, WebSocketMessageType.Binary, true, CancellationToken.None);
}
+
+ private static ReadOnlyMemory Hello = Encoding.UTF8.GetBytes("hello,jt1078");
+
+ public static async ValueTask WebSocketSendHelloAsync(this JT1078HttpContext context)
+ {
+ await context.WebSocketContext.WebSocket.SendAsync(Hello, WebSocketMessageType.Text, true, CancellationToken.None);
+ }
}
}
diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
index f28905a..ef9f8b5 100644
--- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs
+++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
@@ -54,7 +54,6 @@ namespace JT1078.Gateway
{
builder.JT1078Builder.Services.AddSingleton();
builder.JT1078Builder.Services.AddSingleton();
- builder.JT1078Builder.Services.AddHostedService();
builder.JT1078Builder.Services.AddHostedService();
return builder;
}
diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs
index 4c393f8..3b827f5 100644
--- a/src/JT1078.Gateway/JT1078HttpServer.cs
+++ b/src/JT1078.Gateway/JT1078HttpServer.cs
@@ -109,23 +109,24 @@ namespace JT1078.Gateway
int.TryParse(channel, out int channelNo);
if (context.Request.IsWebSocketRequest)
{
- HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null);
+ HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null, keepAliveInterval:TimeSpan.FromSeconds(5));
var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal);
jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo;
SessionManager.TryAdd(jT1078HttpContext);
- await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None);
- await Task.Factory.StartNew(async(state) =>
+ await jT1078HttpContext.WebSocketSendHelloAsync();
+ await Task.Factory.StartNew(async(state) =>
{
//https://www.bejson.com/httputil/websocket/
//ws://localhost:15555?token=22&sim=1221&channel=1
var websocketContext = state as JT1078HttpContext;
- while(websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open ||
- websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting)
+ while (websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open ||
+ websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting)
{
var buffer = ArrayPool.Shared.Rent(256);
try
{
+ //客户端主动断开需要有个线程去接收通知,不然会客户端会卡死直到超时
WebSocketReceiveResult receiveResult = await websocketContext.WebSocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None);
if (receiveResult.EndOfMessage)
{
@@ -145,9 +146,9 @@ namespace JT1078.Gateway
ArrayPool.Shared.Return(buffer);
}
}
- if (Logger.IsEnabled(LogLevel.Trace))
+ if (Logger.IsEnabled(LogLevel.Information))
{
- Logger.LogTrace($"[ws close]:{websocketContext}");
+ Logger.LogInformation($"[ws close]:{websocketContext.SessionId}-{websocketContext.Sim}-{websocketContext.ChannelNo}-{websocketContext.StartTime:yyyyMMddhhmmss}");
}
SessionManager.TryRemove(websocketContext.SessionId);
}, jT1078HttpContext);
diff --git a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
deleted file mode 100644
index 134ac09..0000000
--- a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs
+++ /dev/null
@@ -1,66 +0,0 @@
-using JT1078.Gateway.Configurations;
-using JT1078.Gateway.Sessions;
-using Microsoft.Extensions.Hosting;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading;
-using System.Threading.Tasks;
-
-namespace JT1078.Gateway.Jobs
-{
- internal class JT1078HttpWriterTimeoutJob : BackgroundService
- {
- private readonly ILogger Logger;
-
- private readonly JT1078HttpSessionManager SessionManager;
-
- private readonly IOptionsMonitor Configuration;
- public JT1078HttpWriterTimeoutJob(
- IOptionsMonitor jT1078ConfigurationAccessor,
- ILoggerFactory loggerFactory,
- JT1078HttpSessionManager jT1078SessionManager
- )
- {
- SessionManager = jT1078SessionManager;
- Logger = loggerFactory.CreateLogger();
- Configuration = jT1078ConfigurationAccessor;
- }
-
- protected override async Task ExecuteAsync(CancellationToken stoppingToken)
- {
- while (!stoppingToken.IsCancellationRequested)
- {
- try
- {
- foreach (var item in SessionManager.GetAll())
- {
- //过滤掉websocket的方式,无论是客户端主动断开还是关闭浏览器都会有断开通知的情况,所以就只需要判断http的写超时
- if (!item.IsWebSocket)
- {
- if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now)
- {
- SessionManager.TryRemove(item.SessionId);
- }
- }
- }
- if (Logger.IsEnabled(LogLevel.Information))
- {
- Logger.LogInformation($"[Http Check Writer Timeout]");
- Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}");
- }
- }
- catch (Exception ex)
- {
- Logger.LogError(ex, $"[Http Writer Timeout]");
- }
- finally
- {
- await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken);
- }
- }
- }
- }
-}
diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
index 13fa461..095117a 100644
--- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
+++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
@@ -22,24 +22,24 @@ namespace JT1078.Gateway.Metadata
return Context.Request.IsWebSocketRequest;
}
}
- public DateTime ActiveTime { get; set; }
public DateTime StartTime { get; set; }
+ public bool SendChunked { get; set; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{
Context = context;
User = user;
- ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
+ SendChunked = false;
}
public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user)
{
Context = context;
WebSocketContext = webSocketContext;
User = user;
- ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N");
+ SendChunked = false;
}
}
}
diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
index fe8bbb1..276799a 100644
--- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
+++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
@@ -1,5 +1,6 @@
using JT1078.Gateway.Extensions;
using JT1078.Gateway.Metadata;
+using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -15,10 +16,11 @@ namespace JT1078.Gateway.Sessions
public class JT1078HttpSessionManager
{
public ConcurrentDictionary Sessions { get; }
-
- public JT1078HttpSessionManager()
+ private ILogger Logger;
+ public JT1078HttpSessionManager(ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary();
+ Logger = loggerFactory.CreateLogger();
}
public bool TryAdd(JT1078HttpContext httpContext)
@@ -53,16 +55,163 @@ namespace JT1078.Gateway.Sessions
}
}
- public void SendHttpChunk(byte[] data)
+ private void remove(string sessionId)
+ {
+ if (Sessions.TryRemove(sessionId, out JT1078HttpContext session))
+ {
+ //todo:session close notice
+ }
+ }
+
+ ///
+ /// 发送音视频数据
+ ///
+ ///
+ ///
+ ///
+ public void SendAVData(string sim,int channelNo,byte[] data)
+ {
+ var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList();
+ ParallelLoopResult parallelLoopResult= Parallel.ForEach(contexts, async(context) =>
+ {
+ if (context.IsWebSocket)
+ {
+ try
+ {
+ await context.WebSocketSendBinaryAsync(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ else
+ {
+ if (!context.SendChunked)
+ {
+ context.SendChunked = true;
+ Sessions.TryUpdate(context.SessionId, context, context);
+ try
+ {
+ await context.HttpSendFirstChunked(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ else
+ {
+ try
+ {
+ await context.HttpSendChunked(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ }
+ });
+ if (parallelLoopResult.IsCompleted)
+ {
+
+ }
+ }
+
+ ///
+ /// 发送音视频数据到websocket
+ ///
+ ///
+ ///
+ ///
+ public void SendAVData2WebSocket(string sim, int channelNo, byte[] data)
+ {
+ var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && w.IsWebSocket).ToList();
+ ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
+ {
+ if (context.IsWebSocket)
+ {
+ try
+ {
+ await context.WebSocketSendBinaryAsync(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ });
+ if (parallelLoopResult.IsCompleted)
+ {
+
+ }
+ }
+
+ ///
+ /// 发送音视频数据到Http Chunked中
+ ///
+ ///
+ ///
+ ///
+ public void SendAVData2HttpChunked(string sim, int channelNo, byte[] data)
{
- //todo:set http chunk
- //todo:session close notice
- //byte[] b = Encoding.UTF8.GetBytes("ack");
- //context.Response.StatusCode = 200;
- //context.Response.KeepAlive = true;
- //context.Response.ContentLength64 = b.Length;
- //await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
- //context.Response.Close();
+ var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && !w.IsWebSocket).ToList();
+ ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
+ {
+ if (!context.SendChunked)
+ {
+ context.SendChunked = true;
+ Sessions.TryUpdate(context.SessionId, context, context);
+ try
+ {
+ await context.HttpSendFirstChunked(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ else
+ {
+ try
+ {
+ await context.HttpSendChunked(data);
+ }
+ catch (Exception ex)
+ {
+ if (Logger.IsEnabled(LogLevel.Information))
+ {
+ Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
+ }
+ remove(context.SessionId);
+ }
+ }
+ });
+ if (parallelLoopResult.IsCompleted)
+ {
+
+ }
}
public int SessionCount