From 5711503e07997e20c2b42275be188f0e5c1ebafb Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Fri, 31 Dec 2021 18:31:48 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4fmp4=E7=BC=96=E7=A0=81?= =?UTF-8?q?=E5=8F=91=E9=80=81(=E5=BE=85=E6=B5=8B=E8=AF=95)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/Info.props | 33 ++++ .../JT1078.Gateway.Abstractions.csproj | 25 +-- .../JT1078.Gateway.Coordinator.csproj | 2 +- .../JT1078.Gateway.InMemoryMQ.csproj | 25 +-- .../JT1078.Gateway.Test.csproj | 6 +- .../JT1078.Gateway.TestNormalHosting.csproj | 10 +- .../JT1078FMp4NormalMsgHostedService.cs | 185 ++++++++++++------ .../Services/MessageDispatchHostedService.cs | 21 +- src/JT1078.Gateway.sln | 9 +- src/JT1078.Gateway/JT1078.Gateway.csproj | 29 +-- 10 files changed, 209 insertions(+), 136 deletions(-) create mode 100644 src/Info.props diff --git a/src/Info.props b/src/Info.props new file mode 100644 index 0000000..a8c4cf6 --- /dev/null +++ b/src/Info.props @@ -0,0 +1,33 @@ + + + net6.0 + 10.0 + Copyright 2019. + SmallChi(Koike) + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + 1.0.0-preview3 + LICENSE + true + latest + true + true + false + README.md + true + true + embedded + + + + true + true + true + + + + + + \ No newline at end of file diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index de064f4..65c60d1 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -1,24 +1,12 @@ - + - netstandard2.1;net5.0; - 9.0 - Copyright 2019. - SmallChi(Koike) JT1078.Gateway.Abstractions JT1078.Gateway.Abstractions 基于JT1078Gateway的抽象库 基于JT1078Gateway的抽象库 - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - false - false - LICENSE - true - $(JT1078PackageVersion) JT1078.Gateway.Abstractions.xml + LICENSE @@ -27,10 +15,11 @@ - - - - + + + + + diff --git a/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj index db44ac1..0e1c528 100644 --- a/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj +++ b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj @@ -1,7 +1,7 @@  - net5.0 + net6.0 true diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj index 87b241f..77de09e 100644 --- a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj +++ b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj @@ -1,32 +1,21 @@ - + - netstandard2.1;net5.0; - 9.0 - Copyright 2019. - SmallChi(Koike) JT1078.Gateway.InMemoryMQ JT1078.Gateway.InMemoryMQ 基于JT1078Gateway实现的内存队列 基于JT1078Gateway实现的内存队列 - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - false - false - LICENSE - true - $(JT1078PackageVersion) JT1078.Gateway.InMemoryMQ.xml + LICENSE - - - - + + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj index 3cba514..1bf1616 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj @@ -1,19 +1,19 @@ - net5.0 + net6.0 false - + all runtime; build; native; contentfiles; analyzers; buildtransitive - + all runtime; build; native; contentfiles; analyzers; buildtransitive diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj index e6bb06e..88abf38 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj @@ -6,11 +6,11 @@ - - - - - + + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs index 053b0e0..7a1f245 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs @@ -17,20 +17,19 @@ using System.IO; namespace JT1078.Gateway.TestNormalHosting.Services { - public class JT1078FMp4NormalMsgHostedService : BackgroundService + public class JT1078FMp4NormalMsgHostedService : IHostedService { private JT1078HttpSessionManager HttpSessionManager; private FMp4Encoder FM4Encoder; private ILogger Logger; - private IMemoryCache memoryCache; private const string ikey = "IFMp4KEY"; private MessageDispatchDataService messageDispatchDataService; - private ConcurrentDictionary> avFrameDict; + private ConcurrentDictionary avFrameDict; private H264Decoder H264Decoder; - List SegmentPackages = new List();// 一段包 以I帧为界 IPPPP , IPPPP 一组 + List NaluFilter; + BlockingCollection<(string SIM, byte ChannelNo,byte[]FirstBuffer, byte[] OtherBuffer)> FMp4Blocking; public JT1078FMp4NormalMsgHostedService( MessageDispatchDataService messageDispatchDataService, - IMemoryCache memoryCache, ILoggerFactory loggerFactory, FMp4Encoder fM4Encoder, H264Decoder h264Decoder, @@ -40,81 +39,149 @@ namespace JT1078.Gateway.TestNormalHosting.Services HttpSessionManager = httpSessionManager; FM4Encoder = fM4Encoder; H264Decoder= h264Decoder; - this.memoryCache = memoryCache; this.messageDispatchDataService = messageDispatchDataService; - //todo:定时清理 - avFrameDict = new ConcurrentDictionary>(); + avFrameDict = new ConcurrentDictionary(); + FMp4Blocking=new BlockingCollection<(string SIM, byte ChannelNo, byte[] FirstBuffer, byte[] OtherBuffer)>(); + NaluFilter = new List(); + NaluFilter.Add(NalUnitType.SEI); + NaluFilter.Add(NalUnitType.PPS); + NaluFilter.Add(NalUnitType.SPS); + NaluFilter.Add(NalUnitType.AUD); } - protected async override Task ExecuteAsync(CancellationToken stoppingToken) + + public Task StartAsync(CancellationToken cancellationToken) { - while (!stoppingToken.IsCancellationRequested) - { - var data = await messageDispatchDataService.FMp4Channel.Reader.ReadAsync(); - try + Task.Run(async () => { + while (!cancellationToken.IsCancellationRequested) { - if (Logger.IsEnabled(LogLevel.Debug)) - { - Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); - Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); - } - - if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧) + var data = await messageDispatchDataService.FMp4Channel.Reader.ReadAsync(); + try { - if (SegmentPackages.Count>0) + if (Logger.IsEnabled(LogLevel.Debug)) { - //判断是否首帧 - //Logger.LogDebug($"时间1:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}"); - var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber); - var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); - var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); - if (firstHttpSessions.Count > 0) + Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll())); + Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } + List h264NALUs = H264Decoder.ParseNALU(data); + if (h264NALUs!=null && h264NALUs.Count>0) + { + if(!avFrameDict.TryGetValue(data.GetKey(),out FMp4AVContext cacheFrame)) + { + cacheFrame=new FMp4AVContext(); + avFrameDict.TryAdd(data.GetKey(), cacheFrame); + } + foreach(var nalu in h264NALUs) { - //唯一 - var ftyp = FM4Encoder.FtypBox(); - var package1 = SegmentPackages[0]; - var nalus1 = H264Decoder.ParseNALU(package1); - var moov = FM4Encoder.MoovBox( - nalus1.FirstOrDefault(f => f.NALUHeader.NalUnitType == NalUnitType.SPS), - nalus1.FirstOrDefault(f => f.NALUHeader.NalUnitType == NalUnitType.PPS)); - //首帧 - var styp = FM4Encoder.StypBox(); - var firstVideo = FM4Encoder.OtherVideoBox(SegmentPackages); - foreach (var session in firstHttpSessions) + if (NaluFilter.Contains(nalu.NALUHeader.NalUnitType)) { - HttpSessionManager.SendAVData(session, ftyp.Concat(moov).Concat(styp).Concat(firstVideo).ToArray(), true); - SegmentPackages.Clear();//发送完成后清理 + if (nalu.NALUHeader.NalUnitType== NalUnitType.SPS) + { + cacheFrame.SPSNalu=nalu; + } + else if (nalu.NALUHeader.NalUnitType== NalUnitType.PPS) + { + cacheFrame.PPSNalu=nalu; + } + } + else + { + cacheFrame.NALUs.Add(nalu); } } - if (otherHttpSessions.Count > 0) + if (cacheFrame.NALUs.Count>1) { - //非首帧 - var styp = FM4Encoder.StypBox(); - var otherVideo = FM4Encoder.OtherVideoBox(SegmentPackages); - foreach (var session in otherHttpSessions) + if (cacheFrame.FirstCacheBuffer==null) { - HttpSessionManager.SendAVData(session, styp.Concat(otherVideo).ToArray(), false); - SegmentPackages.Clear();//发送完成后清理 + cacheFrame.FirstCacheBuffer=FM4Encoder.FirstVideoBox(cacheFrame.SPSNalu, cacheFrame.PPSNalu); } - + List tmp = new List(); + int i = 0; + foreach (var item in cacheFrame.NALUs) + { + if (item.NALUHeader.KeyFrame) + { + if (tmp.Count>0) + { + FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp))); + i+=tmp.Count; + tmp.Clear(); + } + tmp.Add(item); + i+=tmp.Count; + FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp))); + tmp.Clear(); + cacheFrame.PrevPrimaryNalu = item; + continue; + } + if (cacheFrame.PrevPrimaryNalu!=null) //第一帧I帧 + { + if (tmp.Count>1) + { + FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp))); + i+=tmp.Count; + tmp.Clear(); + } + tmp.Add(item); + } + } + cacheFrame.NALUs.RemoveRange(0, i); } } + } + catch (Exception ex) + { + Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } - if (SegmentPackages.Count==0) - SegmentPackages.Add(data); + } + }); + Task.Run(() => { + try + { + foreach(var item in FMp4Blocking.GetConsumingEnumerable(cancellationToken)) + { + var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(item.SIM.TrimStart('0'), item.ChannelNo); + var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); + var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList(); + if (firstHttpSessions.Count > 0) + { + //首帧 + foreach (var session in firstHttpSessions) + { + HttpSessionManager.SendAVData(session, item.FirstBuffer, true); + HttpSessionManager.SendAVData(session, item.OtherBuffer, false); + } + } + if (otherHttpSessions.Count > 0) + { + //非首帧 + foreach (var session in otherHttpSessions) + { + HttpSessionManager.SendAVData(session, item.OtherBuffer, false); + } + } } - else { - if (SegmentPackages.Count!=0) { - SegmentPackages.Add(data); - } - } } catch (Exception ex) { - Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}"); + } - - } - await Task.CompletedTask; + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + + public class FMp4AVContext + { + public byte[] FirstCacheBuffer { get; set; } + public H264NALU PrevPrimaryNalu { get; set; } + public H264NALU SPSNalu { get; set; } + public H264NALU PPSNalu { get; set; } + public List NALUs { get; set; } = new List(); } } } diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs index c251f33..9dfa361 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs @@ -33,16 +33,17 @@ namespace JT1078.Gateway.TestNormalHosting.Services var merge = JT1078.Protocol.JT1078Serializer.Merge(package); if (merge != null) { - Parallel.Invoke( - async() => { - await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken); - }, - async () => { - await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); - }, - async () => { - await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); - }); + await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken); + //Parallel.Invoke( + // async() => { + // await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken); + // }, + // async () => { + // await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken); + // }, + // async () => { + // await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken); + // }); } }); return Task.CompletedTask; diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index 4cf5778..74d7b8d 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -1,7 +1,7 @@  Microsoft Visual Studio Solution File, Format Version 12.00 -# Visual Studio Version 16 -VisualStudioVersion = 16.0.29418.71 +# Visual Studio Version 17 +VisualStudioVersion = 17.0.32002.185 MinimumVisualStudioVersion = 10.0.40219.1 Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" EndProject @@ -17,6 +17,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Ga EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{6631FE91-3525-4A84-937B-781C73B343C9}" + ProjectSection(SolutionItems) = preProject + Info.props = Info.props + EndProjectSection +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index a84fb15..9182d15 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -1,24 +1,12 @@  - + - net6.0; - 9.0 - Copyright 2019. - SmallChi(Koike) JT1078.Gateway JT1078.Gateway 基于Pipeline实现的JT1078Gateway库 基于Pipeline实现的JT1078Gateway库 - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE - false - false - LICENSE - true - $(JT1078PackageVersion) JT1078.Gateway.xml + LICENSE @@ -39,16 +27,17 @@ - - - + + + - - - + + + +