diff --git a/src/Info.props b/src/Info.props
new file mode 100644
index 0000000..a8c4cf6
--- /dev/null
+++ b/src/Info.props
@@ -0,0 +1,33 @@
+
+
+ net6.0
+ 10.0
+ Copyright 2019.
+ SmallChi(Koike)
+ https://github.com/SmallChi/JT1078Gateway
+ https://github.com/SmallChi/JT1078Gateway
+ https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
+ https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
+ 1.0.0-preview3
+ LICENSE
+ true
+ latest
+ true
+ true
+ false
+ README.md
+ true
+ true
+ embedded
+
+
+
+ true
+ true
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
index de064f4..65c60d1 100644
--- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
+++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
@@ -1,24 +1,12 @@
-
+
- netstandard2.1;net5.0;
- 9.0
- Copyright 2019.
- SmallChi(Koike)
JT1078.Gateway.Abstractions
JT1078.Gateway.Abstractions
基于JT1078Gateway的抽象库
基于JT1078Gateway的抽象库
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- false
- false
- LICENSE
- true
- $(JT1078PackageVersion)
JT1078.Gateway.Abstractions.xml
+ LICENSE
@@ -27,10 +15,11 @@
-
-
-
-
+
+
+
+
+
diff --git a/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj
index db44ac1..0e1c528 100644
--- a/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj
+++ b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj
@@ -1,7 +1,7 @@
- net5.0
+ net6.0
true
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
index 87b241f..77de09e 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
@@ -1,32 +1,21 @@
-
+
- netstandard2.1;net5.0;
- 9.0
- Copyright 2019.
- SmallChi(Koike)
JT1078.Gateway.InMemoryMQ
JT1078.Gateway.InMemoryMQ
基于JT1078Gateway实现的内存队列
基于JT1078Gateway实现的内存队列
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- false
- false
- LICENSE
- true
- $(JT1078PackageVersion)
JT1078.Gateway.InMemoryMQ.xml
+ LICENSE
-
-
-
-
+
+
+
+
+
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj
index 3cba514..1bf1616 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj
@@ -1,19 +1,19 @@
- net5.0
+ net6.0
false
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
index e6bb06e..88abf38 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
@@ -6,11 +6,11 @@
-
-
-
-
-
+
+
+
+
+
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs
index 053b0e0..7a1f245 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FMp4NormalMsgHostedService.cs
@@ -17,20 +17,19 @@ using System.IO;
namespace JT1078.Gateway.TestNormalHosting.Services
{
- public class JT1078FMp4NormalMsgHostedService : BackgroundService
+ public class JT1078FMp4NormalMsgHostedService : IHostedService
{
private JT1078HttpSessionManager HttpSessionManager;
private FMp4Encoder FM4Encoder;
private ILogger Logger;
- private IMemoryCache memoryCache;
private const string ikey = "IFMp4KEY";
private MessageDispatchDataService messageDispatchDataService;
- private ConcurrentDictionary> avFrameDict;
+ private ConcurrentDictionary avFrameDict;
private H264Decoder H264Decoder;
- List SegmentPackages = new List();// 一段包 以I帧为界 IPPPP , IPPPP 一组
+ List NaluFilter;
+ BlockingCollection<(string SIM, byte ChannelNo,byte[]FirstBuffer, byte[] OtherBuffer)> FMp4Blocking;
public JT1078FMp4NormalMsgHostedService(
MessageDispatchDataService messageDispatchDataService,
- IMemoryCache memoryCache,
ILoggerFactory loggerFactory,
FMp4Encoder fM4Encoder,
H264Decoder h264Decoder,
@@ -40,81 +39,149 @@ namespace JT1078.Gateway.TestNormalHosting.Services
HttpSessionManager = httpSessionManager;
FM4Encoder = fM4Encoder;
H264Decoder= h264Decoder;
- this.memoryCache = memoryCache;
this.messageDispatchDataService = messageDispatchDataService;
- //todo:定时清理
- avFrameDict = new ConcurrentDictionary>();
+ avFrameDict = new ConcurrentDictionary();
+ FMp4Blocking=new BlockingCollection<(string SIM, byte ChannelNo, byte[] FirstBuffer, byte[] OtherBuffer)>();
+ NaluFilter = new List();
+ NaluFilter.Add(NalUnitType.SEI);
+ NaluFilter.Add(NalUnitType.PPS);
+ NaluFilter.Add(NalUnitType.SPS);
+ NaluFilter.Add(NalUnitType.AUD);
}
- protected async override Task ExecuteAsync(CancellationToken stoppingToken)
+
+ public Task StartAsync(CancellationToken cancellationToken)
{
- while (!stoppingToken.IsCancellationRequested)
- {
- var data = await messageDispatchDataService.FMp4Channel.Reader.ReadAsync();
- try
+ Task.Run(async () => {
+ while (!cancellationToken.IsCancellationRequested)
{
- if (Logger.IsEnabled(LogLevel.Debug))
- {
- Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
- Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
- }
-
- if (data.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
+ var data = await messageDispatchDataService.FMp4Channel.Reader.ReadAsync();
+ try
{
- if (SegmentPackages.Count>0)
+ if (Logger.IsEnabled(LogLevel.Debug))
{
- //判断是否首帧
- //Logger.LogDebug($"时间1:{DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss ffff")}");
- var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(data.SIM.TrimStart('0'), data.LogicChannelNumber);
- var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList();
- var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList();
- if (firstHttpSessions.Count > 0)
+ Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
+ Logger.LogDebug($"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
+ }
+ List h264NALUs = H264Decoder.ParseNALU(data);
+ if (h264NALUs!=null && h264NALUs.Count>0)
+ {
+ if(!avFrameDict.TryGetValue(data.GetKey(),out FMp4AVContext cacheFrame))
+ {
+ cacheFrame=new FMp4AVContext();
+ avFrameDict.TryAdd(data.GetKey(), cacheFrame);
+ }
+ foreach(var nalu in h264NALUs)
{
- //唯一
- var ftyp = FM4Encoder.FtypBox();
- var package1 = SegmentPackages[0];
- var nalus1 = H264Decoder.ParseNALU(package1);
- var moov = FM4Encoder.MoovBox(
- nalus1.FirstOrDefault(f => f.NALUHeader.NalUnitType == NalUnitType.SPS),
- nalus1.FirstOrDefault(f => f.NALUHeader.NalUnitType == NalUnitType.PPS));
- //首帧
- var styp = FM4Encoder.StypBox();
- var firstVideo = FM4Encoder.OtherVideoBox(SegmentPackages);
- foreach (var session in firstHttpSessions)
+ if (NaluFilter.Contains(nalu.NALUHeader.NalUnitType))
{
- HttpSessionManager.SendAVData(session, ftyp.Concat(moov).Concat(styp).Concat(firstVideo).ToArray(), true);
- SegmentPackages.Clear();//发送完成后清理
+ if (nalu.NALUHeader.NalUnitType== NalUnitType.SPS)
+ {
+ cacheFrame.SPSNalu=nalu;
+ }
+ else if (nalu.NALUHeader.NalUnitType== NalUnitType.PPS)
+ {
+ cacheFrame.PPSNalu=nalu;
+ }
+ }
+ else
+ {
+ cacheFrame.NALUs.Add(nalu);
}
}
- if (otherHttpSessions.Count > 0)
+ if (cacheFrame.NALUs.Count>1)
{
- //非首帧
- var styp = FM4Encoder.StypBox();
- var otherVideo = FM4Encoder.OtherVideoBox(SegmentPackages);
- foreach (var session in otherHttpSessions)
+ if (cacheFrame.FirstCacheBuffer==null)
{
- HttpSessionManager.SendAVData(session, styp.Concat(otherVideo).ToArray(), false);
- SegmentPackages.Clear();//发送完成后清理
+ cacheFrame.FirstCacheBuffer=FM4Encoder.FirstVideoBox(cacheFrame.SPSNalu, cacheFrame.PPSNalu);
}
-
+ List tmp = new List();
+ int i = 0;
+ foreach (var item in cacheFrame.NALUs)
+ {
+ if (item.NALUHeader.KeyFrame)
+ {
+ if (tmp.Count>0)
+ {
+ FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp)));
+ i+=tmp.Count;
+ tmp.Clear();
+ }
+ tmp.Add(item);
+ i+=tmp.Count;
+ FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp)));
+ tmp.Clear();
+ cacheFrame.PrevPrimaryNalu = item;
+ continue;
+ }
+ if (cacheFrame.PrevPrimaryNalu!=null) //第一帧I帧
+ {
+ if (tmp.Count>1)
+ {
+ FMp4Blocking.Add((data.SIM, data.LogicChannelNumber, cacheFrame.FirstCacheBuffer, FM4Encoder.OtherVideoBox(tmp)));
+ i+=tmp.Count;
+ tmp.Clear();
+ }
+ tmp.Add(item);
+ }
+ }
+ cacheFrame.NALUs.RemoveRange(0, i);
}
}
+ }
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
+ }
- if (SegmentPackages.Count==0)
- SegmentPackages.Add(data);
+ }
+ });
+ Task.Run(() => {
+ try
+ {
+ foreach(var item in FMp4Blocking.GetConsumingEnumerable(cancellationToken))
+ {
+ var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(item.SIM.TrimStart('0'), item.ChannelNo);
+ var firstHttpSessions = httpSessions.Where(w => !w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList();
+ var otherHttpSessions = httpSessions.Where(w => w.FirstSend && (w.RTPVideoType == Metadata.RTPVideoType.Http_FMp4 || w.RTPVideoType == Metadata.RTPVideoType.Ws_FMp4)).ToList();
+ if (firstHttpSessions.Count > 0)
+ {
+ //首帧
+ foreach (var session in firstHttpSessions)
+ {
+ HttpSessionManager.SendAVData(session, item.FirstBuffer, true);
+ HttpSessionManager.SendAVData(session, item.OtherBuffer, false);
+ }
+ }
+ if (otherHttpSessions.Count > 0)
+ {
+ //非首帧
+ foreach (var session in otherHttpSessions)
+ {
+ HttpSessionManager.SendAVData(session, item.OtherBuffer, false);
+ }
+ }
}
- else {
- if (SegmentPackages.Count!=0) {
- SegmentPackages.Add(data);
- }
- }
}
catch (Exception ex)
{
- Logger.LogError(ex, $"{data.SIM},{data.SN},{data.LogicChannelNumber},{data.Label3.DataType.ToString()},{data.Label3.SubpackageType.ToString()},{data.Bodies.ToHexString()}");
+
}
-
- }
- await Task.CompletedTask;
+ });
+ return Task.CompletedTask;
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken)
+ {
+ return Task.CompletedTask;
+ }
+
+ public class FMp4AVContext
+ {
+ public byte[] FirstCacheBuffer { get; set; }
+ public H264NALU PrevPrimaryNalu { get; set; }
+ public H264NALU SPSNalu { get; set; }
+ public H264NALU PPSNalu { get; set; }
+ public List NALUs { get; set; } = new List();
}
}
}
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs
index c251f33..9dfa361 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/MessageDispatchHostedService.cs
@@ -33,16 +33,17 @@ namespace JT1078.Gateway.TestNormalHosting.Services
var merge = JT1078.Protocol.JT1078Serializer.Merge(package);
if (merge != null)
{
- Parallel.Invoke(
- async() => {
- await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken);
- },
- async () => {
- await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken);
- },
- async () => {
- await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken);
- });
+ await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken);
+ //Parallel.Invoke(
+ // async() => {
+ // await messageDispatchDataService.FMp4Channel.Writer.WriteAsync(merge, stoppingToken);
+ // },
+ // async () => {
+ // await messageDispatchDataService.FlvChannel.Writer.WriteAsync(merge, stoppingToken);
+ // },
+ // async () => {
+ // await messageDispatchDataService.HlsChannel.Writer.WriteAsync(merge, stoppingToken);
+ // });
}
});
return Task.CompletedTask;
diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln
index 4cf5778..74d7b8d 100644
--- a/src/JT1078.Gateway.sln
+++ b/src/JT1078.Gateway.sln
@@ -1,7 +1,7 @@
Microsoft Visual Studio Solution File, Format Version 12.00
-# Visual Studio Version 16
-VisualStudioVersion = 16.0.29418.71
+# Visual Studio Version 17
+VisualStudioVersion = 17.0.32002.185
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}"
EndProject
@@ -17,6 +17,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Ga
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution Items", "{6631FE91-3525-4A84-937B-781C73B343C9}"
+ ProjectSection(SolutionItems) = preProject
+ Info.props = Info.props
+ EndProjectSection
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj
index a84fb15..9182d15 100644
--- a/src/JT1078.Gateway/JT1078.Gateway.csproj
+++ b/src/JT1078.Gateway/JT1078.Gateway.csproj
@@ -1,24 +1,12 @@
-
+
- net6.0;
- 9.0
- Copyright 2019.
- SmallChi(Koike)
JT1078.Gateway
JT1078.Gateway
基于Pipeline实现的JT1078Gateway库
基于Pipeline实现的JT1078Gateway库
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE
- false
- false
- LICENSE
- true
- $(JT1078PackageVersion)
JT1078.Gateway.xml
+ LICENSE
@@ -39,16 +27,17 @@
-
-
-
+
+
+
-
-
-
+
+
+
+