浏览代码

1.增加协调器客户端的通道主动关闭接口

2.增加视频设备断开通知,将所有连接该设备的客户端全部切断
master
SmallChi(Koike) 4 年前
父节点
当前提交
0aae0e872a
共有 4 个文件被更改,包括 57 次插入9 次删除
  1. +3
    -3
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  2. +12
    -0
      src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs
  3. +16
    -2
      src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
  4. +26
    -4
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 3
- 3
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs 查看文件

@@ -30,10 +30,10 @@ namespace JT1078.Gateway.TestNormalHosting
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//使用内存队列实现会话通知 //使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration) services.AddJT1078Gateway(hostContext.Configuration)
.AddHttp()
.AddUdp()
.AddTcp() .AddTcp()
//.AddCoordinatorHttpClient()
.AddUdp()
.AddHttp()
.AddCoordinatorHttpClient()
.AddNormal() .AddNormal()
.AddMsgProducer() .AddMsgProducer()
.AddMsgConsumer(); .AddMsgConsumer();


+ 12
- 0
src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs 查看文件

@@ -43,5 +43,17 @@ namespace JT1078.Gateway
{ {
await httpClient.PostAsync($"{endpoint}/heartbeat", new StringContent(content)); await httpClient.PostAsync($"{endpoint}/heartbeat", new StringContent(content));
} }

/// <summary>
/// 发送设备号和通道给协调器中
/// </summary>
/// <param name="terminalPhoneNo"></param>
/// <param name="channelNo"></param>
public async ValueTask ChannelClose(string terminalPhoneNo,int channelNo)
{
//todo:通过自维护,当协调重启导致集群内网关未关闭的情况下,通过轮询的方式再去调用
string json = $"{{\"TerminalPhoneNo\":\"{terminalPhoneNo}\",\"ChannelNo\":\"{channelNo}\"}}";
await httpClient.PostAsync($"{endpoint}/ChannelClose", new StringContent(json));
}
} }
} }

+ 16
- 2
src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs 查看文件

@@ -1,8 +1,11 @@
using JT1078.Gateway.Services;
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Services;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Diagnostics.CodeAnalysis;
using System.Text; using System.Text;
using System.Text.Json; using System.Text.Json;
using System.Threading; using System.Threading;
@@ -15,12 +18,15 @@ namespace JT1078.Gateway.Jobs
{ {
private readonly ILogger logger; private readonly ILogger logger;
private JT1078SessionNoticeService SessionNoticeService; private JT1078SessionNoticeService SessionNoticeService;
private JT1078HttpSessionManager HttpSessionManager;
public JT1078SessionNoticeJob( public JT1078SessionNoticeJob(
JT1078SessionNoticeService sessionNoticeService, JT1078SessionNoticeService sessionNoticeService,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
[AllowNull]JT1078HttpSessionManager jT1078HttpSessionManager=null)
{ {
logger = loggerFactory.CreateLogger<JT1078SessionNoticeJob>(); logger = loggerFactory.CreateLogger<JT1078SessionNoticeJob>();
SessionNoticeService = sessionNoticeService; SessionNoticeService = sessionNoticeService;
HttpSessionManager = jT1078HttpSessionManager;
} }


protected override async Task ExecuteAsync(CancellationToken stoppingToken) protected override async Task ExecuteAsync(CancellationToken stoppingToken)
@@ -34,6 +40,14 @@ namespace JT1078.Gateway.Jobs
{ {
logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}"); logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}");
} }
if(JT1078GatewayConstants.SessionOffline== notice.SessionType)
{
if (HttpSessionManager != null)
{
//当1078设备主动断开的情况下,需要关闭所有再观看的连接
HttpSessionManager.TryRemoveBySim(notice.TerminalPhoneNo);
}
}
} }
} }
catch catch


+ 26
- 4
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs 查看文件

@@ -48,18 +48,40 @@ namespace JT1078.Gateway.Sessions
{ {


} }
finally
}
}

public async void TryRemoveBySim(string sim)
{
var keys=Sessions.Where(f => f.Value.Sim == sim).Select(s => s.Key);
foreach(var key in keys)
{
if (Sessions.TryRemove(key, out JT1078HttpContext session))
{ {
//todo:session close notice
try
{
if (session.IsWebSocket)
{
await session.WebSocketClose("close");
}
else
{

await session.HttpClose();
}
}
catch (Exception)
{

}
} }
}
}
} }


private void remove(string sessionId) private void remove(string sessionId)
{ {
if (Sessions.TryRemove(sessionId, out JT1078HttpContext session)) if (Sessions.TryRemove(sessionId, out JT1078HttpContext session))
{ {
//todo:session close notice
} }
} }




正在加载...
取消
保存