diff --git a/src/JT1078.Gateway.Coordinator/Dtos/HeartbeatRequest.cs b/src/JT1078.Gateway.Coordinator/Dtos/HeartbeatRequest.cs index 61e21ee..df1ebca 100644 --- a/src/JT1078.Gateway.Coordinator/Dtos/HeartbeatRequest.cs +++ b/src/JT1078.Gateway.Coordinator/Dtos/HeartbeatRequest.cs @@ -14,5 +14,6 @@ namespace JT1078.Gateway.Coordinator.Dtos public int UdpSessionCount { get; set; } public int HttpSessionCount { get; set; } public int WebSocketSessionCount { get; set; } + public List Sims { get; set; } } } diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs index 943e96e..00504c7 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs @@ -11,7 +11,9 @@ namespace JT1078.Gateway.InMemoryMQ public JT1078MsgChannel() { - Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>(); + Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>(new UnboundedChannelOptions { + SingleWriter=true, + }); } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config index ed45e3b..a24af6a 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config @@ -12,6 +12,12 @@ + + @@ -34,6 +40,8 @@ + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs index 93921b9..248e0ac 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs @@ -69,7 +69,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services if (memoryCache.TryGetValue(key, out byte[] moov)) { var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); - var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); if (firstHttpSessions.Count > 0) { try @@ -93,17 +93,22 @@ namespace JT1078.Gateway.TestNormalHosting.Services Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); } } - var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); + var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); if (otherHttpSessions.Count > 0) { try { + //foreach (var session in otherHttpSessions) + //{ + // var fmp4VideoBuffer = FM4Encoder.EncoderOtherVideoBox(nalus); + // HttpSessionManager.SendAVData(session, fmp4VideoBuffer, false); + //} var firstNALU = nalus.FirstOrDefault(); if (firstNALU == null) { continue; } - if(!avFrameDict.TryGetValue(firstNALU.GetKey(),out List cacheNALU)) + if (!avFrameDict.TryGetValue(firstNALU.GetKey(), out List cacheNALU)) { cacheNALU = new List(); avFrameDict.TryAdd(firstNALU.GetKey(), cacheNALU); diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs index b5c3df6..06189ca 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs @@ -60,7 +60,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services memoryCache.Set(key, data); } var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); - var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType== Metadata.RTPVideoType.Http_Flv || w.RTPVideoType == Metadata.RTPVideoType.Ws_Flv)).ToList(); if (firstHttpSessions.Count > 0) { if (memoryCache.TryGetValue(key, out JT1078Package idata)) @@ -79,7 +79,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services } } } - var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); + var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_Flv || w.RTPVideoType == Metadata.RTPVideoType.Ws_Flv)).ToList(); if (otherHttpSessions.Count > 0) { try diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs index 01cd2fd..eedb362 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs @@ -14,7 +14,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services /// /// 消费分发服务。同时分发给hls和flv /// - public class MessageDispatchHostedService : BackgroundService + public class MessageDispatchHostedService : BackgroundService { private IJT1078MsgConsumer JT1078MsgConsumer; private readonly MessageDispatchDataService messageDispatchDataService; @@ -33,8 +33,8 @@ namespace JT1078.Gateway.TestNormalHosting.Services var merge = JT1078.Protocol.JT1078Serializer.Merge(package); if (merge != null) { - //await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); - //await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); + await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); + await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken); } }); diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv_demo/index.html b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv_demo/index.html index d3b40c0..2bebff9 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv_demo/index.html +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv_demo/index.html @@ -15,8 +15,7 @@ var flvPlayer = flvjs.createPlayer({ type: 'flv', isLive: true, - //url: "http://127.0.0.1:15555/live.flv?sim=1901305037&channel=3&token=123456" - url: "ws://127.0.0.1:15555/live.flv?sim=1901305037&channel=3&token=123456" + url: "http://127.0.0.1:15555/live.flv?sim=19019000001&channel=3&token=123456" }); flvPlayer.attachMediaElement(player); flvPlayer.load(); diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/fmp4_demo/index.html b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/fmp4_demo/index.html index 5ee0609..5a06948 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/fmp4_demo/index.html +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/fmp4_demo/index.html @@ -18,7 +18,7 @@ // *** USER PARAMETERS *** var verbose = true; // var verbose = true; // enable for saturating the console .. - var buffering_sec = 1; // use some reasonable value + var buffering_sec = 3; // use some reasonable value var buffering_sec_seek = buffering_sec * 0.9; // ..seek the stream if it's this much away or // from the last available timestamp @@ -118,8 +118,7 @@ // } // keep the latency to minimum let latest = stream_live.duration; - if ((stream_live.duration >= buffering_sec) && - ((latest - stream_live.currentTime) > buffering_sec_seek)) { + if ((stream_live.duration >= buffering_sec) && ((latest - stream_live.currentTime) > buffering_sec_seek)) { console.log("seek from ", stream_live.currentTime, " to ", latest); df = (stream_live.duration - stream_live.currentTime); // this much away from the last available frame if ((df > buffering_sec_seek)) { @@ -127,35 +126,35 @@ stream_live.currentTime = seek_to; } } - data = arr; - if (!stream_started) { + if (!source_buffer.updating) { if (verbose) { console.log("Streaming started: ", memview[0], memview[1], memview[2], memview[3], memview[4]); } stream_started = true; - source_buffer.appendBuffer(data); - + source_buffer.appendBuffer(arr); cc = cc + 1; - return; + }else{ + queue.push(arr); // add to the end } - queue.push(data); // add to the end if (verbose) { console.log("queue push:", queue.length); } } function loadPacket() { // called when source_buffer is ready for more if (!source_buffer.updating) { // really, really ready if (queue.length > 0) { - inp = queue.shift(); // pop from the beginning + var inp = queue.shift(); // pop from the beginning if (verbose) { console.log("queue pop:", queue.length); } var memview = new Uint8Array(inp); if (verbose) { console.log(" ==> writing buffer with", memview[0], memview[1], memview[2], memview[3]); } source_buffer.appendBuffer(inp); cc = cc + 1; } - else { // the queue runs empty, so the next packet is fed directly - stream_started = false; - } - } - else { // so it was not? + // else { // the queue runs empty, so the next packet is fed directly + // stream_started = false; + // } } + // else { + // // so it was not? + + // } } function opened() { // MediaSource object is ready to go @@ -167,14 +166,13 @@ source_buffer.mode = 'sequence'; // source_buffer.mode = 'segments'; source_buffer.addEventListener("updateend", loadPacket); - - ws = new WebSocket("ws://127.0.0.1:15555/live.mp4?sim=1901305037&channel=1&token=123456"); //创建WebSocket连接 + ws = new WebSocket("ws://127.0.0.1:81/live/JT1078_7.live.mp4"); //创建WebSocket连接 ws.binaryType = 'arraybuffer'; ws.onmessage = function (e) { //当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据 //console.log(e.data); - //putPacket(e.data); - source_buffer.appendBuffer(e.data); + putPacket(e.data); + //source_buffer.appendBuffer(e.data); } } diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 0f16094..39741a9 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -39,9 +39,9 @@ - + - + diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs index b728ad1..32649b2 100644 --- a/src/JT1078.Gateway/JT1078TcpServer.cs +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -231,8 +231,8 @@ namespace JT1078.Gateway } catch (Exception ex) { - LogLogger.LogError($"[Error Parse 1]:{package1.ToHexString()}"); - Logger.LogError(ex, $"[Error Parse 1]:{package1.ToHexString()}"); + LogLogger.LogError($"[Error Parse 1]:{fixedHeaderInfo.SIM}-{package1.ToHexString()}"); + Logger.LogError(ex, $"[Error Parse 1]:{fixedHeaderInfo.SIM}-{package1.ToHexString()}"); } finally { @@ -260,7 +260,7 @@ namespace JT1078.Gateway seqReader.Advance(fixedHeaderInfo.TotalSize); if (LogLogger.IsEnabled(LogLevel.Trace)) { - LogLogger.LogTrace($"===>{package.ToHexString()}"); + LogLogger.LogTrace($"===>{fixedHeaderInfo.SIM}-{package.ToHexString()}"); } try { @@ -269,8 +269,8 @@ namespace JT1078.Gateway } catch (Exception ex) { - LogLogger.LogError($"[Error Parse 2]:{package.ToHexString()}"); - Logger.LogError(ex, $"[Error Parse 2]:{package.ToHexString()}"); + LogLogger.LogError($"[Error Parse 2]:{fixedHeaderInfo.SIM}-{package.ToHexString()}"); + Logger.LogError(ex, $"[Error Parse 2]:{fixedHeaderInfo.SIM}-{package.ToHexString()}"); } finally { diff --git a/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs b/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs index 6ace38c..dcbdf12 100644 --- a/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs @@ -65,6 +65,16 @@ namespace JT1078.Gateway.Jobs writer.WriteNumber(nameof(SessionManager.UdpSessionCount), SessionManager.UdpSessionCount); writer.WriteNumber(nameof(HttpSessionManager.HttpSessionCount), HttpSessionManager.HttpSessionCount); writer.WriteNumber(nameof(HttpSessionManager.WebSocketSessionCount), HttpSessionManager.WebSocketSessionCount); + writer.WriteStartArray("Sims"); + var sessions = HttpSessionManager.GetAll(); + if (sessions != null) + { + foreach(var session in sessions) + { + writer.WriteStringValue($"{session.Sim}_{session.ChannelNo}_{session.SessionId}"); + } + } + writer.WriteEndArray(); writer.WriteEndObject(); } json = Encoding.UTF8.GetString(stream.ToArray()); diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs index 16ece6d..13e7fdb 100644 --- a/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078SessionClearJob.cs @@ -50,27 +50,29 @@ namespace JT1078.Gateway.Jobs try { var hasSessions = HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 60 && m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList();//所有http 的 hls短链接 - foreach (var item in hasSessions) + foreach (var item in hasSessions) { var key = $"{item.Sim}_{item.ChannelNo}"; HttpSessionManager.TryRemove(item.SessionId);//超过120s未访问。 //清楚所有hls文件 string filepath = Path.Combine(Configuration.HlsRootDirectory, key); - if (Directory.Exists(filepath)) { + if (Directory.Exists(filepath)) + { Directory.Delete(filepath,true); } hLSPathStorage.RemoveAllPath(key);//移除所有缓存 - - if (logger.IsEnabled(LogLevel.Debug)) { - logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清楚session"); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清除session"); } - var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接 - var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接 + string sim = item.Sim.TrimStart('0'); + var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(sim, item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接 + var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(sim, item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接 if (!hasTcpSession && !httpFlvSession) { //不存在websocket链接和http-flv链接时,主动断开设备链接以节省流量 //移除tcpsession,断开设备链接 - if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(item.Sim.TrimStart('0')); + if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(sim); } } } diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs index b8ae525..26f207e 100644 --- a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs @@ -50,9 +50,9 @@ namespace JT1078.Gateway.Jobs } } } - catch + catch(Exception ex) { - + logger.LogError(ex, ""); } }, stoppingToken); }