ソースを参照

完善flv首包和其他包组包待测试

master
SmallChi(Koike) 4年前
コミット
7954bd8e05
7個のファイルの変更63行の追加108行の削除
  1. +4
    -4
      src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
  2. +2
    -2
      src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj
  3. +4
    -3
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
  4. +4
    -2
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  5. +29
    -3
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
  6. +3
    -3
      src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
  7. +17
    -91
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 4
- 4
src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj ファイルの表示

@@ -26,10 +26,10 @@


<ItemGroup> <ItemGroup>
<PackageReference Include="JT1078" Version="1.0.3" /> <PackageReference Include="JT1078" Version="1.0.3" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.7" />
<PackageReference Include="System.Text.Json" Version="4.7.2" /> <PackageReference Include="System.Text.Json" Version="4.7.2" />
</ItemGroup> </ItemGroup>
</Project> </Project>

+ 2
- 2
src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj ファイルの表示

@@ -7,10 +7,10 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.7.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.2" /> <PackageReference Include="System.IO.Pipelines" Version="4.7.2" />
<PackageReference Include="xunit" Version="2.4.1" /> <PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2">
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.3">
<PrivateAssets>all</PrivateAssets> <PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> <IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference> </PackageReference>


+ 4
- 3
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj ファイルの表示

@@ -6,9 +6,10 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.5" />
<PackageReference Include="JT1078.Flv" Version="1.0.0-preview6" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.7" />
</ItemGroup> </ItemGroup>


<ItemGroup> <ItemGroup>


+ 4
- 2
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs ファイルの表示

@@ -1,4 +1,5 @@
using JT1078.Gateway.InMemoryMQ;
using JT1078.Flv;
using JT1078.Gateway.InMemoryMQ;
using JT1078.Gateway.TestNormalHosting.Services; using JT1078.Gateway.TestNormalHosting.Services;
using Microsoft.Extensions.Configuration; using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
@@ -28,12 +29,13 @@ namespace JT1078.Gateway.TestNormalHosting
{ {
services.AddSingleton<ILoggerFactory, LoggerFactory>(); services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddSingleton<FlvEncoder>();
//使用内存队列实现会话通知 //使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration) services.AddJT1078Gateway(hostContext.Configuration)
.AddTcp() .AddTcp()
.AddUdp() .AddUdp()
.AddHttp() .AddHttp()
.AddCoordinatorHttpClient()
//.AddCoordinatorHttpClient()
.AddNormal() .AddNormal()
.AddMsgProducer() .AddMsgProducer()
.AddMsgConsumer(); .AddMsgConsumer();


+ 29
- 3
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs ファイルの表示

@@ -1,25 +1,51 @@
using JT1078.Gateway.Abstractions; using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Sessions;
using JT1078.Flv;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Text; using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Linq;


namespace JT1078.Gateway.TestNormalHosting.Services namespace JT1078.Gateway.TestNormalHosting.Services
{ {
public class JT1078NormalMsgHostedService : BackgroundService public class JT1078NormalMsgHostedService : BackgroundService
{ {
private IJT1078PackageConsumer PackageConsumer; private IJT1078PackageConsumer PackageConsumer;
public JT1078NormalMsgHostedService(IJT1078PackageConsumer packageConsumer)
private JT1078HttpSessionManager HttpSessionManager;
private FlvEncoder FlvEncoder;
public JT1078NormalMsgHostedService(
FlvEncoder flvEncoder,
JT1078HttpSessionManager httpSessionManager,
IJT1078PackageConsumer packageConsumer)
{ {
PackageConsumer = packageConsumer; PackageConsumer = packageConsumer;
HttpSessionManager = httpSessionManager;
FlvEncoder = flvEncoder;
} }
protected override Task ExecuteAsync(CancellationToken stoppingToken) protected override Task ExecuteAsync(CancellationToken stoppingToken)
{ {
PackageConsumer.OnMessage((Message) =>
PackageConsumer.OnMessage((Message) =>
{ {

var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data);
if (merge != null)
{
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM, Message.Data.LogicChannelNumber);
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
if (firstHttpSessions.Count > 0)
{
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, true);
HttpSessionManager.SendAVData(firstHttpSessions, flvVideoBuffer, true);
}
var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList();
if (otherHttpSessions.Count > 0)
{
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
HttpSessionManager.SendAVData(firstHttpSessions, flvVideoBuffer, false);
}
}
}); });
return Task.CompletedTask; return Task.CompletedTask;
} }


+ 3
- 3
src/JT1078.Gateway/Metadata/JT1078HttpContext.cs ファイルの表示

@@ -23,14 +23,14 @@ namespace JT1078.Gateway.Metadata
} }
} }
public DateTime StartTime { get; set; } public DateTime StartTime { get; set; }
public bool SendChunked { get; set; }
public bool FirstSend { get; set; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user) public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{ {
Context = context; Context = context;
User = user; User = user;
StartTime = DateTime.Now; StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N"); SessionId = Guid.NewGuid().ToString("N");
SendChunked = false;
FirstSend = false;
} }
public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user) public JT1078HttpContext(HttpListenerContext context, HttpListenerWebSocketContext webSocketContext, IPrincipal user)
{ {
@@ -39,7 +39,7 @@ namespace JT1078.Gateway.Metadata
User = user; User = user;
StartTime = DateTime.Now; StartTime = DateTime.Now;
SessionId = Guid.NewGuid().ToString("N"); SessionId = Guid.NewGuid().ToString("N");
SendChunked = false;
FirstSend = false;
} }
} }
} }

+ 17
- 91
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs ファイルの表示

@@ -88,16 +88,20 @@ namespace JT1078.Gateway.Sessions
/// <summary> /// <summary>
/// 发送音视频数据 /// 发送音视频数据
/// </summary> /// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="httpContexts"></param>
/// <param name="data"></param> /// <param name="data"></param>
public void SendAVData(string sim,int channelNo,byte[] data)
/// <param name="firstSend"></param>
public void SendAVData(List<JT1078HttpContext> httpContexts, byte[] data, bool firstSend)
{ {
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList();
ParallelLoopResult parallelLoopResult= Parallel.ForEach(contexts, async(context) =>
ParallelLoopResult parallelLoopResult = Parallel.ForEach(httpContexts, async (context) =>
{ {
if (context.IsWebSocket) if (context.IsWebSocket)
{ {
if (firstSend)
{
context.FirstSend = firstSend;
Sessions.TryUpdate(context.SessionId, context, context);
}
try try
{ {
await context.WebSocketSendBinaryAsync(data); await context.WebSocketSendBinaryAsync(data);
@@ -109,13 +113,13 @@ namespace JT1078.Gateway.Sessions
Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}"); Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
} }
remove(context.SessionId); remove(context.SessionId);
}
}
} }
else else
{ {
if (!context.SendChunked)
if (firstSend)
{ {
context.SendChunked = true;
context.FirstSend = firstSend;
Sessions.TryUpdate(context.SessionId, context, context); Sessions.TryUpdate(context.SessionId, context, context);
try try
{ {
@@ -153,89 +157,6 @@ namespace JT1078.Gateway.Sessions
} }
} }


/// <summary>
/// 发送音视频数据到websocket
/// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="data"></param>
public void SendAVData2WebSocket(string sim, int channelNo, byte[] data)
{
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && w.IsWebSocket).ToList();
ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
{
if (context.IsWebSocket)
{
try
{
await context.WebSocketSendBinaryAsync(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
});
if (parallelLoopResult.IsCompleted)
{

}
}

/// <summary>
/// 发送音视频数据到Http Chunked中
/// </summary>
/// <param name="sim"></param>
/// <param name="channelNo"></param>
/// <param name="data"></param>
public void SendAVData2HttpChunked(string sim, int channelNo, byte[] data)
{
var contexts = Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo && !w.IsWebSocket).ToList();
ParallelLoopResult parallelLoopResult = Parallel.ForEach(contexts, async (context) =>
{
if (!context.SendChunked)
{
context.SendChunked = true;
Sessions.TryUpdate(context.SessionId, context, context);
try
{
await context.HttpSendFirstChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
else
{
try
{
await context.HttpSendChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
}
});
if (parallelLoopResult.IsCompleted)
{

}
}

public int SessionCount public int SessionCount
{ {
get get
@@ -260,6 +181,11 @@ namespace JT1078.Gateway.Sessions
} }
} }


public List<JT1078HttpContext> GetAllBySimAndChannelNo(string sim, int channelNo)
{
return Sessions.Select(s => s.Value).Where(w => w.Sim == sim && w.ChannelNo == channelNo).ToList();
}

public List<JT1078HttpContext> GetAll() public List<JT1078HttpContext> GetAll()
{ {
return Sessions.Select(s => s.Value).ToList(); return Sessions.Select(s => s.Value).ToList();


読み込み中…
キャンセル
保存