@@ -14,5 +14,6 @@ namespace JT1078.Gateway.Coordinator.Dtos | |||
public int UdpSessionCount { get; set; } | |||
public int HttpSessionCount { get; set; } | |||
public int WebSocketSessionCount { get; set; } | |||
public List<string> Sims { get; set; } | |||
} | |||
} |
@@ -11,7 +11,9 @@ namespace JT1078.Gateway.InMemoryMQ | |||
public JT1078MsgChannel() | |||
{ | |||
Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>(); | |||
Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>(new UnboundedChannelOptions { | |||
SingleWriter=true, | |||
}); | |||
} | |||
} | |||
} |
@@ -12,6 +12,12 @@ | |||
<target name="JT1078FlvNormalMsgHostedService" xsi:type="File" | |||
fileName="${Directory}/JT1078FlvNormalMsgHostedService.${shortdate}.log" | |||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||
<target name="JT1078FMp4NormalMsgHostedService" xsi:type="File" | |||
fileName="${Directory}/JT1078FMp4NormalMsgHostedService.${shortdate}.log" | |||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||
<target name="JT1078HlsNormalMsgHostedService" xsi:type="File" | |||
fileName="${Directory}/JT1078HlsNormalMsgHostedService.${shortdate}.log" | |||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||
<target name="JT1078TcpServer" xsi:type="File" | |||
fileName="${Directory}/JT1078TcpServer.${shortdate}.log" | |||
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/> | |||
@@ -34,6 +40,8 @@ | |||
<rules> | |||
<logger name="*" minlevel="Error" writeTo="all"/> | |||
<logger name="JT1078.Gateway.TestNormalHosting.Services.JT1078FlvNormalMsgHostedService" minlevel="Trace" writeTo="JT1078FlvNormalMsgHostedService"/> | |||
<logger name="JT1078.Gateway.TestNormalHosting.Services.JT1078FMp4NormalMsgHostedService" minlevel="Trace" writeTo="JT1078FMp4NormalMsgHostedService"/> | |||
<logger name="JT1078.Gateway.TestNormalHosting.Services.JT1078HlsNormalMsgHostedService" minlevel="Trace" writeTo="JT1078HlsNormalMsgHostedService"/> | |||
<logger name="JT1078.Gateway.JT1078TcpServer" minlevel="Trace" writeTo="JT1078TcpServer"/> | |||
<logger name="JT1078.Gateway.JT1078UdpServer" minlevel="Trace" writeTo="JT1078UdpServer"/> | |||
<logger name="JT1078.Gateway.JT1078Logging" minlevel="Trace" writeTo="JT1078Logging"/> | |||
@@ -69,7 +69,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
if (memoryCache.TryGetValue(key, out byte[] moov)) | |||
{ | |||
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); | |||
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); | |||
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); | |||
if (firstHttpSessions.Count > 0) | |||
{ | |||
try | |||
@@ -93,17 +93,22 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
Logger.LogError(ex, $"{data.SIM},{true},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); | |||
} | |||
} | |||
var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); | |||
var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); | |||
if (otherHttpSessions.Count > 0) | |||
{ | |||
try | |||
{ | |||
//foreach (var session in otherHttpSessions) | |||
//{ | |||
// var fmp4VideoBuffer = FM4Encoder.EncoderOtherVideoBox(nalus); | |||
// HttpSessionManager.SendAVData(session, fmp4VideoBuffer, false); | |||
//} | |||
var firstNALU = nalus.FirstOrDefault(); | |||
if (firstNALU == null) | |||
{ | |||
continue; | |||
} | |||
if(!avFrameDict.TryGetValue(firstNALU.GetKey(),out List<H264NALU> cacheNALU)) | |||
if (!avFrameDict.TryGetValue(firstNALU.GetKey(), out List<H264NALU> cacheNALU)) | |||
{ | |||
cacheNALU = new List<H264NALU>(); | |||
avFrameDict.TryAdd(firstNALU.GetKey(), cacheNALU); | |||
@@ -60,7 +60,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
memoryCache.Set(key, data); | |||
} | |||
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); | |||
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList(); | |||
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType== Metadata.RTPVideoType.Http_Flv || w.RTPVideoType == Metadata.RTPVideoType.Ws_Flv)).ToList(); | |||
if (firstHttpSessions.Count > 0) | |||
{ | |||
if (memoryCache.TryGetValue(key, out JT1078Package idata)) | |||
@@ -79,7 +79,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
} | |||
} | |||
} | |||
var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList(); | |||
var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_Flv || w.RTPVideoType == Metadata.RTPVideoType.Ws_Flv)).ToList(); | |||
if (otherHttpSessions.Count > 0) | |||
{ | |||
try | |||
@@ -14,7 +14,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
/// <summary> | |||
/// 消费分发服务。同时分发给hls和flv | |||
/// </summary> | |||
public class MessageDispatchHostedService : BackgroundService | |||
public class MessageDispatchHostedService : BackgroundService | |||
{ | |||
private IJT1078MsgConsumer JT1078MsgConsumer; | |||
private readonly MessageDispatchDataService messageDispatchDataService; | |||
@@ -33,8 +33,8 @@ namespace JT1078.Gateway.TestNormalHosting.Services | |||
var merge = JT1078.Protocol.JT1078Serializer.Merge(package); | |||
if (merge != null) | |||
{ | |||
//await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); | |||
//await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); | |||
await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); | |||
await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); | |||
await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken); | |||
} | |||
}); | |||
@@ -15,8 +15,7 @@ | |||
var flvPlayer = flvjs.createPlayer({ | |||
type: 'flv', | |||
isLive: true, | |||
//url: "http://127.0.0.1:15555/live.flv?sim=1901305037&channel=3&token=123456" | |||
url: "ws://127.0.0.1:15555/live.flv?sim=1901305037&channel=3&token=123456" | |||
url: "http://127.0.0.1:15555/live.flv?sim=19019000001&channel=3&token=123456" | |||
}); | |||
flvPlayer.attachMediaElement(player); | |||
flvPlayer.load(); | |||
@@ -18,7 +18,7 @@ | |||
// *** USER PARAMETERS *** | |||
var verbose = true; | |||
// var verbose = true; // enable for saturating the console .. | |||
var buffering_sec = 1; // use some reasonable value | |||
var buffering_sec = 3; // use some reasonable value | |||
var buffering_sec_seek = buffering_sec * 0.9; | |||
// ..seek the stream if it's this much away or | |||
// from the last available timestamp | |||
@@ -118,8 +118,7 @@ | |||
// } | |||
// keep the latency to minimum | |||
let latest = stream_live.duration; | |||
if ((stream_live.duration >= buffering_sec) && | |||
((latest - stream_live.currentTime) > buffering_sec_seek)) { | |||
if ((stream_live.duration >= buffering_sec) && ((latest - stream_live.currentTime) > buffering_sec_seek)) { | |||
console.log("seek from ", stream_live.currentTime, " to ", latest); | |||
df = (stream_live.duration - stream_live.currentTime); // this much away from the last available frame | |||
if ((df > buffering_sec_seek)) { | |||
@@ -127,35 +126,35 @@ | |||
stream_live.currentTime = seek_to; | |||
} | |||
} | |||
data = arr; | |||
if (!stream_started) { | |||
if (!source_buffer.updating) { | |||
if (verbose) { console.log("Streaming started: ", memview[0], memview[1], memview[2], memview[3], memview[4]); } | |||
stream_started = true; | |||
source_buffer.appendBuffer(data); | |||
source_buffer.appendBuffer(arr); | |||
cc = cc + 1; | |||
return; | |||
}else{ | |||
queue.push(arr); // add to the end | |||
} | |||
queue.push(data); // add to the end | |||
if (verbose) { console.log("queue push:", queue.length); } | |||
} | |||
function loadPacket() { // called when source_buffer is ready for more | |||
if (!source_buffer.updating) { // really, really ready | |||
if (queue.length > 0) { | |||
inp = queue.shift(); // pop from the beginning | |||
var inp = queue.shift(); // pop from the beginning | |||
if (verbose) { console.log("queue pop:", queue.length); } | |||
var memview = new Uint8Array(inp); | |||
if (verbose) { console.log(" ==> writing buffer with", memview[0], memview[1], memview[2], memview[3]); } | |||
source_buffer.appendBuffer(inp); | |||
cc = cc + 1; | |||
} | |||
else { // the queue runs empty, so the next packet is fed directly | |||
stream_started = false; | |||
} | |||
} | |||
else { // so it was not? | |||
// else { // the queue runs empty, so the next packet is fed directly | |||
// stream_started = false; | |||
// } | |||
} | |||
// else { | |||
// // so it was not? | |||
// } | |||
} | |||
function opened() { // MediaSource object is ready to go | |||
@@ -167,14 +166,13 @@ | |||
source_buffer.mode = 'sequence'; | |||
// source_buffer.mode = 'segments'; | |||
source_buffer.addEventListener("updateend", loadPacket); | |||
ws = new WebSocket("ws://127.0.0.1:15555/live.mp4?sim=1901305037&channel=1&token=123456"); //创建WebSocket连接 | |||
ws = new WebSocket("ws://127.0.0.1:81/live/JT1078_7.live.mp4"); //创建WebSocket连接 | |||
ws.binaryType = 'arraybuffer'; | |||
ws.onmessage = function (e) { | |||
//当客户端收到服务端发来的消息时,触发onmessage事件,参数e.data包含server传递过来的数据 | |||
//console.log(e.data); | |||
//putPacket(e.data); | |||
source_buffer.appendBuffer(e.data); | |||
putPacket(e.data); | |||
//source_buffer.appendBuffer(e.data); | |||
} | |||
} | |||
@@ -39,9 +39,9 @@ | |||
</ItemGroup> | |||
<ItemGroup> | |||
<PackageReference Include="JT1078.Hls" Version="1.1.0-preview3" /> | |||
<PackageReference Include="JT1078.Hls" Version="1.1.0-preview4" /> | |||
<PackageReference Include="JT1078.FMp4" Version="1.0.0-preview4" /> | |||
<PackageReference Include="JT1078.Flv" Version="1.1.0" /> | |||
<PackageReference Include="JT1078.Flv" Version="1.1.1-preview1" /> | |||
<PackageReference Include="Microsoft.Extensions.Caching.Abstractions" Version="5.0.0" /> | |||
<PackageReference Include="System.IO.Pipelines" Version="5.0.1" /> | |||
</ItemGroup> | |||
@@ -231,8 +231,8 @@ namespace JT1078.Gateway | |||
} | |||
catch (Exception ex) | |||
{ | |||
LogLogger.LogError($"[Error Parse 1]:{package1.ToHexString()}"); | |||
Logger.LogError(ex, $"[Error Parse 1]:{package1.ToHexString()}"); | |||
LogLogger.LogError($"[Error Parse 1]:{fixedHeaderInfo.SIM}-{package1.ToHexString()}"); | |||
Logger.LogError(ex, $"[Error Parse 1]:{fixedHeaderInfo.SIM}-{package1.ToHexString()}"); | |||
} | |||
finally | |||
{ | |||
@@ -260,7 +260,7 @@ namespace JT1078.Gateway | |||
seqReader.Advance(fixedHeaderInfo.TotalSize); | |||
if (LogLogger.IsEnabled(LogLevel.Trace)) | |||
{ | |||
LogLogger.LogTrace($"===>{package.ToHexString()}"); | |||
LogLogger.LogTrace($"===>{fixedHeaderInfo.SIM}-{package.ToHexString()}"); | |||
} | |||
try | |||
{ | |||
@@ -269,8 +269,8 @@ namespace JT1078.Gateway | |||
} | |||
catch (Exception ex) | |||
{ | |||
LogLogger.LogError($"[Error Parse 2]:{package.ToHexString()}"); | |||
Logger.LogError(ex, $"[Error Parse 2]:{package.ToHexString()}"); | |||
LogLogger.LogError($"[Error Parse 2]:{fixedHeaderInfo.SIM}-{package.ToHexString()}"); | |||
Logger.LogError(ex, $"[Error Parse 2]:{fixedHeaderInfo.SIM}-{package.ToHexString()}"); | |||
} | |||
finally | |||
{ | |||
@@ -65,6 +65,16 @@ namespace JT1078.Gateway.Jobs | |||
writer.WriteNumber(nameof(SessionManager.UdpSessionCount), SessionManager.UdpSessionCount); | |||
writer.WriteNumber(nameof(HttpSessionManager.HttpSessionCount), HttpSessionManager.HttpSessionCount); | |||
writer.WriteNumber(nameof(HttpSessionManager.WebSocketSessionCount), HttpSessionManager.WebSocketSessionCount); | |||
writer.WriteStartArray("Sims"); | |||
var sessions = HttpSessionManager.GetAll(); | |||
if (sessions != null) | |||
{ | |||
foreach(var session in sessions) | |||
{ | |||
writer.WriteStringValue($"{session.Sim}_{session.ChannelNo}_{session.SessionId}"); | |||
} | |||
} | |||
writer.WriteEndArray(); | |||
writer.WriteEndObject(); | |||
} | |||
json = Encoding.UTF8.GetString(stream.ToArray()); | |||
@@ -50,27 +50,29 @@ namespace JT1078.Gateway.Jobs | |||
try | |||
{ | |||
var hasSessions = HttpSessionManager.GetAll().Where(m => DateTime.Now.Subtract(m.StartTime).TotalSeconds > 60 && m.RTPVideoType == Metadata.RTPVideoType.Http_Hls).ToList();//所有http 的 hls短链接 | |||
foreach (var item in hasSessions) | |||
foreach (var item in hasSessions) | |||
{ | |||
var key = $"{item.Sim}_{item.ChannelNo}"; | |||
HttpSessionManager.TryRemove(item.SessionId);//超过120s未访问。 | |||
//清楚所有hls文件 | |||
string filepath = Path.Combine(Configuration.HlsRootDirectory, key); | |||
if (Directory.Exists(filepath)) { | |||
if (Directory.Exists(filepath)) | |||
{ | |||
Directory.Delete(filepath,true); | |||
} | |||
hLSPathStorage.RemoveAllPath(key);//移除所有缓存 | |||
if (logger.IsEnabled(LogLevel.Debug)) { | |||
logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清楚session"); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug($"{System.Text.Json.JsonSerializer.Serialize(item)},清除session"); | |||
} | |||
var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接 | |||
var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(item.Sim.TrimStart('0'), item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接 | |||
string sim = item.Sim.TrimStart('0'); | |||
var hasTcpSession = HttpSessionManager.GetAllBySimAndChannelNo(sim, item.ChannelNo).Any(m => m.IsWebSocket);//是否存在tcp的 socket链接 | |||
var httpFlvSession = HttpSessionManager.GetAllBySimAndChannelNo(sim, item.ChannelNo).Any(m => m.RTPVideoType == Metadata.RTPVideoType.Http_Flv);//是否存在http的 flv长链接 | |||
if (!hasTcpSession && !httpFlvSession) | |||
{ | |||
//不存在websocket链接和http-flv链接时,主动断开设备链接以节省流量 | |||
//移除tcpsession,断开设备链接 | |||
if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(item.Sim.TrimStart('0')); | |||
if(SessionManager!=null) SessionManager.RemoveByTerminalPhoneNo(sim); | |||
} | |||
} | |||
} | |||
@@ -50,9 +50,9 @@ namespace JT1078.Gateway.Jobs | |||
} | |||
} | |||
} | |||
catch | |||
catch(Exception ex) | |||
{ | |||
logger.LogError(ex, ""); | |||
} | |||
}, stoppingToken); | |||
} | |||