diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
index 56e4035..22acca0 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
@@ -30,10 +30,10 @@ namespace JT1078.Gateway.TestNormalHosting
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration)
- .AddHttp()
- .AddUdp()
.AddTcp()
- //.AddCoordinatorHttpClient()
+ .AddUdp()
+ .AddHttp()
+ .AddCoordinatorHttpClient()
.AddNormal()
.AddMsgProducer()
.AddMsgConsumer();
diff --git a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs
index f983a62..f87e878 100644
--- a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs
+++ b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs
@@ -43,5 +43,17 @@ namespace JT1078.Gateway
{
await httpClient.PostAsync($"{endpoint}/heartbeat", new StringContent(content));
}
+
+ ///
+ /// 发送设备号和通道给协调器中
+ ///
+ ///
+ ///
+ public async ValueTask ChannelClose(string terminalPhoneNo,int channelNo)
+ {
+ //todo:通过自维护,当协调重启导致集群内网关未关闭的情况下,通过轮询的方式再去调用
+ string json = $"{{\"TerminalPhoneNo\":\"{terminalPhoneNo}\",\"ChannelNo\":\"{channelNo}\"}}";
+ await httpClient.PostAsync($"{endpoint}/ChannelClose", new StringContent(json));
+ }
}
}
diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
index 5a9c2d4..3de8190 100644
--- a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
+++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
@@ -1,8 +1,11 @@
-using JT1078.Gateway.Services;
+using JT1078.Gateway.Abstractions;
+using JT1078.Gateway.Services;
+using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
+using System.Diagnostics.CodeAnalysis;
using System.Text;
using System.Text.Json;
using System.Threading;
@@ -15,12 +18,15 @@ namespace JT1078.Gateway.Jobs
{
private readonly ILogger logger;
private JT1078SessionNoticeService SessionNoticeService;
+ private JT1078HttpSessionManager HttpSessionManager;
public JT1078SessionNoticeJob(
JT1078SessionNoticeService sessionNoticeService,
- ILoggerFactory loggerFactory)
+ ILoggerFactory loggerFactory,
+ [AllowNull]JT1078HttpSessionManager jT1078HttpSessionManager=null)
{
logger = loggerFactory.CreateLogger();
SessionNoticeService = sessionNoticeService;
+ HttpSessionManager = jT1078HttpSessionManager;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -34,6 +40,14 @@ namespace JT1078.Gateway.Jobs
{
logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}");
}
+ if(JT1078GatewayConstants.SessionOffline== notice.SessionType)
+ {
+ if (HttpSessionManager != null)
+ {
+ //当1078设备主动断开的情况下,需要关闭所有再观看的连接
+ HttpSessionManager.TryRemoveBySim(notice.TerminalPhoneNo);
+ }
+ }
}
}
catch
diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
index 27ab7a4..fef4040 100644
--- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
+++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs
@@ -48,18 +48,40 @@ namespace JT1078.Gateway.Sessions
{
}
- finally
+ }
+ }
+
+ public async void TryRemoveBySim(string sim)
+ {
+ var keys=Sessions.Where(f => f.Value.Sim == sim).Select(s => s.Key);
+ foreach(var key in keys)
+ {
+ if (Sessions.TryRemove(key, out JT1078HttpContext session))
{
- //todo:session close notice
+ try
+ {
+ if (session.IsWebSocket)
+ {
+ await session.WebSocketClose("close");
+ }
+ else
+ {
+
+ await session.HttpClose();
+ }
+ }
+ catch (Exception)
+ {
+
+ }
}
- }
+ }
}
private void remove(string sessionId)
{
if (Sessions.TryRemove(sessionId, out JT1078HttpContext session))
{
- //todo:session close notice
}
}