diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs index c63ea63..6e9b742 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs @@ -19,7 +19,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services { PackageConsumer.OnMessage((Message) => { - + }); return Task.CompletedTask; } diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index de89a84..df3bbe5 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -11,9 +11,9 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs index 41ddc03..2793fcd 100644 --- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs +++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs @@ -13,15 +13,6 @@ namespace JT1078.Gateway.Configurations public int SoBacklog { get; set; } = 8192; public int MiniNumBufferSize { get; set; } = 8096; /// - /// http写超时 - /// - public int HttpWriterIdleTimeSeconds { get; set; } = 60; - /// - /// http写超时 - /// 默认30s检查一次 - /// - public int HttpWriterTimeoutCheckTimeSeconds { get; set; } = 30; - /// /// Tcp读超时 /// 默认10分钟检查一次 /// diff --git a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs index 5d7cc38..2253eb2 100644 --- a/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs +++ b/src/JT1078.Gateway/Extensions/JT1078HttpContextExtensions.cs @@ -51,6 +51,19 @@ namespace JT1078.Gateway.Extensions context.Response.Close(); } + public static async ValueTask HttpSendFirstChunked(this JT1078HttpContext context, ReadOnlyMemory buffer) + { + context.Context.Response.StatusCode = (int)HttpStatusCode.OK; + context.Context.Response.SendChunked = true; + await context.Context.Response.OutputStream.WriteAsync(buffer); + } + + public static async ValueTask HttpSendChunked(this JT1078HttpContext context, ReadOnlyMemory buffer) + { + context.Context.Response.StatusCode = (int)HttpStatusCode.OK; + await context.Context.Response.OutputStream.WriteAsync(buffer); + } + public static async ValueTask HttpClose(this JT1078HttpContext context) { byte[] b = Encoding.UTF8.GetBytes("close"); @@ -76,5 +89,12 @@ namespace JT1078.Gateway.Extensions { await context.WebSocketContext.WebSocket.SendAsync(buffer, WebSocketMessageType.Binary, true, CancellationToken.None); } + + private static ReadOnlyMemory Hello = Encoding.UTF8.GetBytes("hello,jt1078"); + + public static async ValueTask WebSocketSendHelloAsync(this JT1078HttpContext context) + { + await context.WebSocketContext.WebSocket.SendAsync(Hello, WebSocketMessageType.Text, true, CancellationToken.None); + } } } diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index f28905a..ef9f8b5 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -54,7 +54,6 @@ namespace JT1078.Gateway { builder.JT1078Builder.Services.AddSingleton(); builder.JT1078Builder.Services.AddSingleton(); - builder.JT1078Builder.Services.AddHostedService(); builder.JT1078Builder.Services.AddHostedService(); return builder; } diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs index 4c393f8..3b827f5 100644 --- a/src/JT1078.Gateway/JT1078HttpServer.cs +++ b/src/JT1078.Gateway/JT1078HttpServer.cs @@ -109,23 +109,24 @@ namespace JT1078.Gateway int.TryParse(channel, out int channelNo); if (context.Request.IsWebSocketRequest) { - HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null); + HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null, keepAliveInterval:TimeSpan.FromSeconds(5)); var jT1078HttpContext = new JT1078HttpContext(context, wsContext,principal); jT1078HttpContext.Sim = sim; jT1078HttpContext.ChannelNo = channelNo; SessionManager.TryAdd(jT1078HttpContext); - await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None); - await Task.Factory.StartNew(async(state) => + await jT1078HttpContext.WebSocketSendHelloAsync(); + await Task.Factory.StartNew(async(state) => { //https://www.bejson.com/httputil/websocket/ //ws://localhost:15555?token=22&sim=1221&channel=1 var websocketContext = state as JT1078HttpContext; - while(websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open || - websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting) + while (websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Open || + websocketContext.WebSocketContext.WebSocket.State == WebSocketState.Connecting) { var buffer = ArrayPool.Shared.Rent(256); try { + //客户端主动断开需要有个线程去接收通知,不然会客户端会卡死直到超时 WebSocketReceiveResult receiveResult = await websocketContext.WebSocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None); if (receiveResult.EndOfMessage) { @@ -145,9 +146,9 @@ namespace JT1078.Gateway ArrayPool.Shared.Return(buffer); } } - if (Logger.IsEnabled(LogLevel.Trace)) + if (Logger.IsEnabled(LogLevel.Information)) { - Logger.LogTrace($"[ws close]:{websocketContext}"); + Logger.LogInformation($"[ws close]:{websocketContext.SessionId}-{websocketContext.Sim}-{websocketContext.ChannelNo}-{websocketContext.StartTime:yyyyMMddhhmmss}"); } SessionManager.TryRemove(websocketContext.SessionId); }, jT1078HttpContext); diff --git a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs deleted file mode 100644 index 134ac09..0000000 --- a/src/JT1078.Gateway/Jobs/JT1078HttpWriterTimeoutJob.cs +++ /dev/null @@ -1,66 +0,0 @@ -using JT1078.Gateway.Configurations; -using JT1078.Gateway.Sessions; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Jobs -{ - internal class JT1078HttpWriterTimeoutJob : BackgroundService - { - private readonly ILogger Logger; - - private readonly JT1078HttpSessionManager SessionManager; - - private readonly IOptionsMonitor Configuration; - public JT1078HttpWriterTimeoutJob( - IOptionsMonitor jT1078ConfigurationAccessor, - ILoggerFactory loggerFactory, - JT1078HttpSessionManager jT1078SessionManager - ) - { - SessionManager = jT1078SessionManager; - Logger = loggerFactory.CreateLogger(); - Configuration = jT1078ConfigurationAccessor; - } - - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - while (!stoppingToken.IsCancellationRequested) - { - try - { - foreach (var item in SessionManager.GetAll()) - { - //过滤掉websocket的方式,无论是客户端主动断开还是关闭浏览器都会有断开通知的情况,所以就只需要判断http的写超时 - if (!item.IsWebSocket) - { - if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.HttpWriterIdleTimeSeconds) < DateTime.Now) - { - SessionManager.TryRemove(item.SessionId); - } - } - } - if (Logger.IsEnabled(LogLevel.Information)) - { - Logger.LogInformation($"[Http Check Writer Timeout]"); - Logger.LogInformation($"[Http Session Online Count]:{SessionManager.SessionCount}"); - } - } - catch (Exception ex) - { - Logger.LogError(ex, $"[Http Writer Timeout]"); - } - finally - { - await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.HttpWriterTimeoutCheckTimeSeconds), stoppingToken); - } - } - } - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs index 13fa461..095117a 100644 --- a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs +++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs @@ -22,24 +22,24 @@ namespace JT1078.Gateway.Metadata return Context.Request.IsWebSocketRequest; } } - public DateTime ActiveTime { get; set; } public DateTime StartTime { get; set; } + public bool SendChunked { get; set; } public JT1078HttpContext(HttpListenerContext context, IPrincipal user) { Context = context; User = user; - ActiveTime = DateTime.Now; StartTime = DateTime.Now; SessionId = Guid.NewGuid().ToString("N"); + SendChunked = false; } public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user) { Context = context; WebSocketContext = webSocketContext; User = user; - ActiveTime = DateTime.Now; StartTime = DateTime.Now; SessionId = Guid.NewGuid().ToString("N"); + SendChunked = false; } } } diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs index fe8bbb1..276799a 100644 --- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -1,5 +1,6 @@ using JT1078.Gateway.Extensions; using JT1078.Gateway.Metadata; +using Microsoft.Extensions.Logging; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -15,10 +16,11 @@ namespace JT1078.Gateway.Sessions public class JT1078HttpSessionManager { public ConcurrentDictionary Sessions { get; } - - public JT1078HttpSessionManager() + private ILogger Logger; + public JT1078HttpSessionManager(ILoggerFactory loggerFactory) { Sessions = new ConcurrentDictionary(); + Logger = loggerFactory.CreateLogger(); } public bool TryAdd(JT1078HttpContext httpContext) @@ -53,16 +55,163 @@ namespace JT1078.Gateway.Sessions } } - public void SendHttpChunk(byte[] data) + private void remove(string sessionId) + { + if (Sessions.TryRemove(sessionId, out JT1078HttpContext session)) + { + //todo:session close notice + } + } + + /// + /// 发送音视频数据 + /// + /// + /// + /// + public void SendAVData(string sim,int channelNo,byte[] data) + { + var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList(); + ParallelLoopResult parallelLoopResult= Parallel.ForEach(contexts, async(context) => + { + if (context.IsWebSocket) + { + try + { + await context.WebSocketSendBinaryAsync(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + else + { + if (!context.SendChunked) + { + context.SendChunked = true; + Sessions.TryUpdate(context.SessionId, context, context); + try + { + await context.HttpSendFirstChunked(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + else + { + try + { + await context.HttpSendChunked(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + } + }); + if (parallelLoopResult.IsCompleted) + { + + } + } + + /// + /// 发送音视频数据到websocket + /// + /// + /// + /// + public void SendAVData2WebSocket(string sim, int channelNo, byte[] data) + { + var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && w.IsWebSocket).ToList(); + ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) => + { + if (context.IsWebSocket) + { + try + { + await context.WebSocketSendBinaryAsync(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + }); + if (parallelLoopResult.IsCompleted) + { + + } + } + + /// + /// 发送音视频数据到Http Chunked中 + /// + /// + /// + /// + public void SendAVData2HttpChunked(string sim, int channelNo, byte[] data) { - //todo:set http chunk - //todo:session close notice - //byte[] b = Encoding.UTF8.GetBytes("ack"); - //context.Response.StatusCode = 200; - //context.Response.KeepAlive = true; - //context.Response.ContentLength64 = b.Length; - //await context.Response.OutputStream.WriteAsync(b, 0, b.Length); - //context.Response.Close(); + var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && !w.IsWebSocket).ToList(); + ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) => + { + if (!context.SendChunked) + { + context.SendChunked = true; + Sessions.TryUpdate(context.SessionId, context, context); + try + { + await context.HttpSendFirstChunked(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + else + { + try + { + await context.HttpSendChunked(data); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); + } + remove(context.SessionId); + } + } + }); + if (parallelLoopResult.IsCompleted) + { + + } } public int SessionCount