From 0aae0e872afb0be5445cdb8eea89d05a700114a8 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Mon, 10 Aug 2020 23:34:08 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=A2=9E=E5=8A=A0=E5=8D=8F=E8=B0=83=E5=99=A8?= =?UTF-8?q?=E5=AE=A2=E6=88=B7=E7=AB=AF=E7=9A=84=E9=80=9A=E9=81=93=E4=B8=BB?= =?UTF-8?q?=E5=8A=A8=E5=85=B3=E9=97=AD=E6=8E=A5=E5=8F=A3=202.=E5=A2=9E?= =?UTF-8?q?=E5=8A=A0=E8=A7=86=E9=A2=91=E8=AE=BE=E5=A4=87=E6=96=AD=E5=BC=80?= =?UTF-8?q?=E9=80=9A=E7=9F=A5=EF=BC=8C=E5=B0=86=E6=89=80=E6=9C=89=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E8=AF=A5=E8=AE=BE=E5=A4=87=E7=9A=84=E5=AE=A2=E6=88=B7?= =?UTF-8?q?=E7=AB=AF=E5=85=A8=E9=83=A8=E5=88=87=E6=96=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Program.cs | 6 ++-- .../JT1078CoordinatorHttpClient.cs | 12 ++++++++ .../Jobs/JT1078SessionNoticeJob.cs | 18 +++++++++-- .../Sessions/JT1078HttpSessionManager.cs | 30 ++++++++++++++++--- 4 files changed, 57 insertions(+), 9 deletions(-) diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 56e4035..22acca0 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -30,10 +30,10 @@ namespace JT1078.Gateway.TestNormalHosting services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) - .AddHttp() - .AddUdp() .AddTcp() - //.AddCoordinatorHttpClient() + .AddUdp() + .AddHttp() + .AddCoordinatorHttpClient() .AddNormal() .AddMsgProducer() .AddMsgConsumer(); diff --git a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs index f983a62..f87e878 100644 --- a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs +++ b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs @@ -43,5 +43,17 @@ namespace JT1078.Gateway { await httpClient.PostAsync($"{endpoint}/heartbeat", new StringContent(content)); } + + /// + /// 发送设备号和通道给协调器中 + /// + /// + /// + public async ValueTask ChannelClose(string terminalPhoneNo,int channelNo) + { + //todo:通过自维护,当协调重启导致集群内网关未关闭的情况下,通过轮询的方式再去调用 + string json = $"{{\"TerminalPhoneNo\":\"{terminalPhoneNo}\",\"ChannelNo\":\"{channelNo}\"}}"; + await httpClient.PostAsync($"{endpoint}/ChannelClose", new StringContent(json)); + } } } diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs index 5a9c2d4..3de8190 100644 --- a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs @@ -1,8 +1,11 @@ -using JT1078.Gateway.Services; +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Services; +using JT1078.Gateway.Sessions; using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; +using System.Diagnostics.CodeAnalysis; using System.Text; using System.Text.Json; using System.Threading; @@ -15,12 +18,15 @@ namespace JT1078.Gateway.Jobs { private readonly ILogger logger; private JT1078SessionNoticeService SessionNoticeService; + private JT1078HttpSessionManager HttpSessionManager; public JT1078SessionNoticeJob( JT1078SessionNoticeService sessionNoticeService, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + [AllowNull]JT1078HttpSessionManager jT1078HttpSessionManager=null) { logger = loggerFactory.CreateLogger(); SessionNoticeService = sessionNoticeService; + HttpSessionManager = jT1078HttpSessionManager; } protected override async Task ExecuteAsync(CancellationToken stoppingToken) @@ -34,6 +40,14 @@ namespace JT1078.Gateway.Jobs { logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}"); } + if(JT1078GatewayConstants.SessionOffline== notice.SessionType) + { + if (HttpSessionManager != null) + { + //当1078设备主动断开的情况下,需要关闭所有再观看的连接 + HttpSessionManager.TryRemoveBySim(notice.TerminalPhoneNo); + } + } } } catch diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs index 27ab7a4..fef4040 100644 --- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -48,18 +48,40 @@ namespace JT1078.Gateway.Sessions { } - finally + } + } + + public async void TryRemoveBySim(string sim) + { + var keys=Sessions.Where(f => f.Value.Sim == sim).Select(s => s.Key); + foreach(var key in keys) + { + if (Sessions.TryRemove(key, out JT1078HttpContext session)) { - //todo:session close notice + try + { + if (session.IsWebSocket) + { + await session.WebSocketClose("close"); + } + else + { + + await session.HttpClose(); + } + } + catch (Exception) + { + + } } - } + } } private void remove(string sessionId) { if (Sessions.TryRemove(sessionId, out JT1078HttpContext session)) { - //todo:session close notice } }