@@ -12,41 +12,87 @@ using System.Threading.Tasks; | |||||
using JT1078.Protocol; | using JT1078.Protocol; | ||||
using System.Collections.Concurrent; | using System.Collections.Concurrent; | ||||
using JT1078.Protocol.Enums; | using JT1078.Protocol.Enums; | ||||
using System.Diagnostics; | |||||
using System.IO.Pipes; | |||||
using Newtonsoft.Json; | |||||
namespace JT1078.DotNetty.TestHosting | namespace JT1078.DotNetty.TestHosting | ||||
{ | { | ||||
class FFMPEGHTTPFLVPHostedService : IHostedService | |||||
class FFMPEGHTTPFLVPHostedService : BackgroundService | |||||
{ | { | ||||
private readonly JT1078DataService jT1078DataService; | |||||
private readonly FFMPEGHTTPFLVPipingService fFMPEGHTTPFLVPipingService; | |||||
public FFMPEGHTTPFLVPHostedService( | |||||
JT1078DataService jT1078DataService) | |||||
private readonly Process process; | |||||
private readonly NamedPipeServerStream pipeServerOut; | |||||
private const string PipeNameOut = "demo1serverout"; | |||||
public FFMPEGHTTPFLVPHostedService() | |||||
{ | { | ||||
this.jT1078DataService = jT1078DataService; | |||||
fFMPEGHTTPFLVPipingService = new FFMPEGHTTPFLVPipingService("demo2"); | |||||
pipeServerOut = new NamedPipeServerStream(PipeNameOut, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous,10240,10240); | |||||
process = new Process | |||||
{ | |||||
StartInfo = | |||||
{ | |||||
FileName = @"C:\ffmpeg\bin\ffmpeg.exe", | |||||
Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -c copy -f flv -vcodec h264 -y \\.\pipe\{PipeNameOut}", | |||||
UseShellExecute = false, | |||||
CreateNoWindow = true, | |||||
RedirectStandardError = true | |||||
} | |||||
}; | |||||
} | } | ||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
public override void Dispose() | |||||
{ | { | ||||
try | |||||
{ | |||||
process.Close(); | |||||
pipeServerOut.Flush(); | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
process.Dispose(); | |||||
pipeServerOut.Dispose(); | |||||
base.Dispose(); | |||||
} | |||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) | |||||
{ | |||||
process.Start(); | |||||
Task.Run(() => | Task.Run(() => | ||||
{ | { | ||||
try | |||||
while (true) | |||||
{ | { | ||||
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable(cancellationToken)) | |||||
try | |||||
{ | { | ||||
fFMPEGHTTPFLVPipingService.Wirte(item); | |||||
Console.WriteLine("IsConnected>>>" + pipeServerOut.IsConnected); | |||||
if (pipeServerOut.IsConnected) | |||||
{ | |||||
if (pipeServerOut.CanRead) | |||||
{ | |||||
Span<byte> v1 = new byte[2048]; | |||||
var length = pipeServerOut.Read(v1); | |||||
var realValue = v1.Slice(0, length).ToArray(); | |||||
if (realValue.Length <= 0) continue; | |||||
Console.WriteLine(JsonConvert.SerializeObject(realValue)+"-"+ length.ToString()); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
if (!pipeServerOut.IsConnected) | |||||
{ | |||||
Console.WriteLine("WaitForConnection Star..."); | |||||
pipeServerOut.WaitForConnectionAsync(); | |||||
Console.WriteLine("WaitForConnection End..."); | |||||
} | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Console.WriteLine(ex); | |||||
} | } | ||||
} | } | ||||
catch(Exception ex) | |||||
{ | |||||
Console.WriteLine(ex); | |||||
} | |||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
}); | |||||
return Task.CompletedTask; | return Task.CompletedTask; | ||||
} | } | ||||
} | } | ||||
@@ -1,94 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.IO; | |||||
using System.IO.Pipes; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
public class FFMPEGHTTPFLVPipingService : IDisposable | |||||
{ | |||||
private readonly Process process; | |||||
private readonly NamedPipeServerStream pipeServer; | |||||
private readonly NamedPipeServerStream pipeServerOut; | |||||
public FFMPEGHTTPFLVPipingService(string pipeName) | |||||
{ | |||||
pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.Out, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); | |||||
pipeServerOut = new NamedPipeServerStream(pipeName+ "out", PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous); | |||||
process = new Process | |||||
{ | |||||
StartInfo = | |||||
{ | |||||
FileName = @"C:\ffmpeg\bin\ffmpeg.exe", | |||||
Arguments = $@"-i \\.\pipe\{pipeName} -c copy -f flv -y \\.\pipe\{pipeName}out", | |||||
UseShellExecute = false, | |||||
CreateNoWindow = true, | |||||
RedirectStandardError = true | |||||
} | |||||
}; | |||||
process.Start(); | |||||
process.ErrorDataReceived += ErrorDataReceived; | |||||
pipeServer.WaitForConnection(); | |||||
Task.Run(() => | |||||
{ | |||||
while (true) | |||||
{ | |||||
try | |||||
{ | |||||
if (pipeServerOut.IsConnected) | |||||
{ | |||||
if (pipeServerOut.CanRead) | |||||
{ | |||||
var value = pipeServerOut.ReadByte(); | |||||
Console.WriteLine(value); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
pipeServerOut.WaitForConnectionAsync(); | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Console.WriteLine(ex); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
public void ErrorDataReceived(object sender, DataReceivedEventArgs e) | |||||
{ | |||||
Console.WriteLine(e.Data); | |||||
} | |||||
public void OutputDataReceived(object sender, DataReceivedEventArgs e) | |||||
{ | |||||
Console.WriteLine(e.Data); | |||||
} | |||||
public void Wirte(byte[] buffer) | |||||
{ | |||||
if (pipeServer.IsConnected) | |||||
pipeServer.WriteAsync(buffer); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
try | |||||
{ | |||||
process.WaitForExit(); | |||||
pipeServer.Flush(); | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
process.Dispose(); | |||||
pipeServer.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -15,12 +15,9 @@ namespace JT1078.DotNetty.TestHosting.Handlers | |||||
{ | { | ||||
private readonly ILogger logger; | private readonly ILogger logger; | ||||
private readonly ILogger hexLogger; | private readonly ILogger hexLogger; | ||||
private readonly JT1078DataService jT1078DataService; | |||||
public JT1078TcpMessageHandlers( | public JT1078TcpMessageHandlers( | ||||
JT1078DataService jT1078DataService, | |||||
ILoggerFactory loggerFactory) | ILoggerFactory loggerFactory) | ||||
{ | { | ||||
this.jT1078DataService = jT1078DataService; | |||||
logger = loggerFactory.CreateLogger("JT1078TcpMessageHandlers"); | logger = loggerFactory.CreateLogger("JT1078TcpMessageHandlers"); | ||||
hexLogger = loggerFactory.CreateLogger("JT1078TcpMessageHandlersHex"); | hexLogger = loggerFactory.CreateLogger("JT1078TcpMessageHandlersHex"); | ||||
} | } | ||||
@@ -29,7 +26,6 @@ namespace JT1078.DotNetty.TestHosting.Handlers | |||||
{ | { | ||||
logger.LogInformation(JsonConvert.SerializeObject(request.Package)); | logger.LogInformation(JsonConvert.SerializeObject(request.Package)); | ||||
hexLogger.LogInformation($"{request.Package.SIM},{request.Package.SN},{request.Package.LogicChannelNumber},{request.Package.Label3.DataType.ToString()},{request.Package.Label3.SubpackageType.ToString()},{ByteBufferUtil.HexDump(request.Src)}"); | hexLogger.LogInformation($"{request.Package.SIM},{request.Package.SN},{request.Package.LogicChannelNumber},{request.Package.Label3.DataType.ToString()},{request.Package.Label3.SubpackageType.ToString()},{ByteBufferUtil.HexDump(request.Src)}"); | ||||
jT1078DataService.DataBlockingCollection.TryAdd(request.Src); | |||||
return Task.FromResult<JT1078Response>(default); | return Task.FromResult<JT1078Response>(default); | ||||
} | } | ||||
} | } | ||||
@@ -0,0 +1,24 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// .\ffmpeg -list_options true -f dshow -i video = "USB2.0 PC CAMERA" | |||||
/// .\ffmpeg -f dshow -i video = "USB2.0 PC CAMERA" -vcodec libx264 "D:\mycamera.flv" | |||||
/// | |||||
/// .\ffmpeg -f dshow -i video = "USB2.0 PC CAMERA" - c copy -f flv -vcodec h264 "rtmp://127.0.0.1/living/streamName" | |||||
/// .\ffplay rtmp://127.0.0.1/living/streamName | |||||
/// | |||||
/// .\ffmpeg -f dshow -i video = "USB2.0 PC CAMERA" - c copy -f -y flv -vcodec h264 "pipe://demoserverout" | |||||
/// | |||||
/// ref:https://www.cnblogs.com/lidabo/p/8662955.html | |||||
/// </summary> | |||||
public static class HardwareCamera | |||||
{ | |||||
public const string CameraName = "\"USB2.0 PC CAMERA\""; | |||||
public const string RTMPURL = "rtmp://127.0.0.1/living/streamName"; | |||||
} | |||||
} |
@@ -24,9 +24,6 @@ | |||||
<None Update="2019-07-12.log"> | <None Update="2019-07-12.log"> | ||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | <CopyToOutputDirectory>Always</CopyToOutputDirectory> | ||||
</None> | </None> | ||||
<None Update="2019-08-27-1.log"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="appsettings.json"> | <None Update="appsettings.json"> | ||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | <CopyToOutputDirectory>Always</CopyToOutputDirectory> | ||||
</None> | </None> | ||||
@@ -1,67 +0,0 @@ | |||||
using JT1078.Protocol; | |||||
using JT1078.Protocol.Enums; | |||||
using System; | |||||
using System.Collections.Concurrent; | |||||
using System.Collections.Generic; | |||||
using System.IO; | |||||
using System.Linq; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
public class JT1078DataService | |||||
{ | |||||
public BlockingCollection<byte[]> DataBlockingCollection { get; } | |||||
private readonly ConcurrentDictionary<string, byte[]> SubcontractKey; | |||||
public JT1078DataService() | |||||
{ | |||||
DataBlockingCollection = new BlockingCollection<byte[]>(60000); | |||||
SubcontractKey = new ConcurrentDictionary<string, byte[]>(StringComparer.OrdinalIgnoreCase); | |||||
var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-08-27-1.log")); | |||||
Task.Run(() => | |||||
{ | |||||
while (true) | |||||
{ | |||||
for (int i = 0; i < lines.Length; i++) | |||||
{ | |||||
try | |||||
{ | |||||
var item = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes()); | |||||
if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) | |||||
{ | |||||
SubcontractKey.TryRemove(item.SIM, out _); | |||||
SubcontractKey.TryAdd(item.SIM, item.Bodies); | |||||
} | |||||
else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) | |||||
{ | |||||
if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
{ | |||||
SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); | |||||
} | |||||
} | |||||
else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) | |||||
{ | |||||
if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
{ | |||||
DataBlockingCollection.Add(buffer.Concat(item.Bodies).ToArray()); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
DataBlockingCollection.Add(item.Bodies); | |||||
} | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
} | |||||
Thread.Sleep(60000); | |||||
} | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -1,62 +0,0 @@ | |||||
using DotNetty.Buffers; | |||||
using DotNetty.Codecs.Http.WebSockets; | |||||
using JT1078.DotNetty.Core.Session; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.IO; | |||||
using System.Linq; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using JT1078.Protocol; | |||||
using System.Collections.Concurrent; | |||||
using JT1078.Protocol.Enums; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
class JT1078WebSocketPushHostedService : IHostedService | |||||
{ | |||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | |||||
private readonly JT1078DataService jT1078DataService; | |||||
private readonly ConcurrentDictionary<string, byte[]> SubcontractKey; | |||||
public JT1078WebSocketPushHostedService( | |||||
JT1078DataService jT1078DataService, | |||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager) | |||||
{ | |||||
SubcontractKey = new ConcurrentDictionary<string, byte[]>(StringComparer.OrdinalIgnoreCase); | |||||
this.jT1078DataService = jT1078DataService; | |||||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Task.Run(() => | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable(cancellationToken)) | |||||
{ | |||||
Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session => | |||||
{ | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item))); | |||||
}); | |||||
} | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
} | |||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -57,20 +57,21 @@ namespace JT1078.DotNetty.TestHosting | |||||
{ | { | ||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | services.AddSingleton<ILoggerFactory, LoggerFactory>(); | ||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | ||||
services.AddSingleton<JT1078DataService>(); | |||||
services.AddJT1078Core(hostContext.Configuration); | |||||
services.AddJT1078Core(hostContext.Configuration) | |||||
// .AddJT1078TcpHost() | // .AddJT1078TcpHost() | ||||
// .Replace<JT1078TcpMessageHandlers>() | // .Replace<JT1078TcpMessageHandlers>() | ||||
// .Builder() | // .Builder() | ||||
// .AddJT1078UdpHost() | // .AddJT1078UdpHost() | ||||
// .Replace<JT1078UdpMessageHandlers>() | // .Replace<JT1078UdpMessageHandlers>() | ||||
// .Builder() | // .Builder() | ||||
// .AddJT1078WebSocketHost() | |||||
// .Replace() | |||||
// .Builder(); | |||||
//services.AddHostedService<JT1078WebSocketPushHostedService>(); | |||||
.AddJT1078WebSocketHost() | |||||
.Builder(); | |||||
//1.success 7-8s | |||||
//services.AddHostedService<FFMPEGRTMPHostedService>(); | //services.AddHostedService<FFMPEGRTMPHostedService>(); | ||||
services.AddHostedService<FFMPEGHTTPFLVPHostedService>(); | |||||
//2.test | |||||
//services.AddHostedService<FFMPEGHTTPFLVPHostedService>(); | |||||
//3.success 6-7s | |||||
//services.AddHostedService<FFMPEGWSFLVPHostedService>(); | |||||
}); | }); | ||||
await serverHostBuilder.RunConsoleAsync(); | await serverHostBuilder.RunConsoleAsync(); | ||||
@@ -12,42 +12,47 @@ using System.Threading.Tasks; | |||||
using JT1078.Protocol; | using JT1078.Protocol; | ||||
using System.Collections.Concurrent; | using System.Collections.Concurrent; | ||||
using JT1078.Protocol.Enums; | using JT1078.Protocol.Enums; | ||||
using System.Diagnostics; | |||||
namespace JT1078.DotNetty.TestHosting | namespace JT1078.DotNetty.TestHosting | ||||
{ | { | ||||
class FFMPEGRTMPHostedService : IHostedService | |||||
/// <summary> | |||||
/// 1.部署 RTMP 服务器 https://github.com/a1q123456/Harmonic | |||||
/// 2.使用ffplay播放器查看 ./ffplay rtmp://127.0.0.1/living/streamName | |||||
/// ref: | |||||
/// https://stackoverflow.com/questions/32157774/ffmpeg-output-pipeing-to-named-windows-pipe | |||||
/// https://mathewsachin.github.io/blog/2017/07/28/ffmpeg-pipe-csharp.html | |||||
/// https://csharp.hotexamples.com/examples/-/NamedPipeServerStream/-/php-namedpipeserverstream-class-examples.html | |||||
/// | |||||
/// ffmpeg pipe作为客户端 | |||||
/// NamedPipeServerStream作为服务端 | |||||
/// </summary> | |||||
class FFMPEGRTMPHostedService : BackgroundService | |||||
{ | { | ||||
private readonly JT1078DataService jT1078DataService; | |||||
private readonly FFMPEGRTMPPipingService fFMPEGPipingService; | |||||
public FFMPEGRTMPHostedService( | |||||
JT1078DataService jT1078DataService) | |||||
private readonly Process process; | |||||
public FFMPEGRTMPHostedService() | |||||
{ | { | ||||
this.jT1078DataService = jT1078DataService; | |||||
fFMPEGPipingService = new FFMPEGRTMPPipingService("demo1"); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Task.Run(() => | |||||
process = new Process | |||||
{ | { | ||||
try | |||||
{ | |||||
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable(cancellationToken)) | |||||
{ | |||||
fFMPEGPipingService.Wirte(item); | |||||
} | |||||
} | |||||
catch(Exception ex) | |||||
StartInfo = | |||||
{ | { | ||||
Console.WriteLine(ex); | |||||
FileName = @"C:\ffmpeg\bin\ffmpeg.exe", | |||||
Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -c copy -f flv -vcodec h264 {HardwareCamera.RTMPURL}", | |||||
UseShellExecute = false, | |||||
CreateNoWindow = true | |||||
} | } | ||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
}; | |||||
} | |||||
public override void Dispose() | |||||
{ | |||||
process.Dispose(); | |||||
base.Dispose(); | |||||
} | } | ||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) | |||||
{ | { | ||||
process.Start(); | |||||
return Task.CompletedTask; | return Task.CompletedTask; | ||||
} | } | ||||
} | } | ||||
@@ -1,71 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.IO; | |||||
using System.IO.Pipes; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
/// <summary> | |||||
/// 1.部署 RTMP 服务器 https://github.com/a1q123456/Harmonic | |||||
/// 2.使用ffplay播放器查看 ./ffplay rtmp://127.0.0.1/living/streamName | |||||
/// ref: | |||||
/// https://stackoverflow.com/questions/32157774/ffmpeg-output-pipeing-to-named-windows-pipe | |||||
/// https://mathewsachin.github.io/blog/2017/07/28/ffmpeg-pipe-csharp.html | |||||
/// https://csharp.hotexamples.com/examples/-/NamedPipeServerStream/-/php-namedpipeserverstream-class-examples.html | |||||
/// | |||||
/// ffmpeg pipe作为客户端 | |||||
/// NamedPipeServerStream作为服务端 | |||||
/// </summary> | |||||
public class FFMPEGRTMPPipingService : IDisposable | |||||
{ | |||||
private readonly Process process; | |||||
private readonly NamedPipeServerStream pipeServer; | |||||
public FFMPEGRTMPPipingService(string pipeName) | |||||
{ | |||||
pipeServer = new NamedPipeServerStream(pipeName, PipeDirection.Out, 1, PipeTransmissionMode.Byte, PipeOptions.WriteThrough, 10000, 10000); | |||||
string rtmp = "rtmp://127.0.0.1/living/streamName"; | |||||
process = new Process | |||||
{ | |||||
StartInfo = | |||||
{ | |||||
FileName = @"C:\ffmpeg\bin\ffmpeg.exe", | |||||
Arguments = $@"-i \\.\pipe\{pipeName} -c copy -f flv {rtmp}", | |||||
UseShellExecute = false, | |||||
CreateNoWindow = true, | |||||
RedirectStandardError = true | |||||
} | |||||
}; | |||||
process.Start(); | |||||
process.ErrorDataReceived += ErrorDataReceived; | |||||
pipeServer.WaitForConnection(); | |||||
} | |||||
public void ErrorDataReceived(object sender, DataReceivedEventArgs e) | |||||
{ | |||||
Console.WriteLine(e.Data); | |||||
} | |||||
public void Wirte(byte[] buffer) | |||||
{ | |||||
if (pipeServer.IsConnected) | |||||
pipeServer.WriteAsync(buffer); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
try | |||||
{ | |||||
process.Kill(); | |||||
pipeServer.Flush(); | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
process.Dispose(); | |||||
pipeServer.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,123 @@ | |||||
using DotNetty.Buffers; | |||||
using DotNetty.Codecs.Http.WebSockets; | |||||
using JT1078.DotNetty.Core.Session; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.IO; | |||||
using System.Linq; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using JT1078.Protocol; | |||||
using System.Collections.Concurrent; | |||||
using JT1078.Protocol.Enums; | |||||
using System.Diagnostics; | |||||
using System.IO.Pipes; | |||||
using Newtonsoft.Json; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
class FFMPEGWSFLVPHostedService :BackgroundService | |||||
{ | |||||
private readonly Process process; | |||||
private readonly NamedPipeServerStream pipeServerOut; | |||||
private const string PipeNameOut = "demo2serverout"; | |||||
private readonly JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | |||||
/// <summary> | |||||
/// 需要缓存flv的第一包数据,当新用户进来先推送第一包的数据 | |||||
/// </summary> | |||||
private byte[] flvFirstPackage; | |||||
private ConcurrentDictionary<string,byte> exists = new ConcurrentDictionary<string, byte>(); | |||||
public FFMPEGWSFLVPHostedService( | |||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager) | |||||
{ | |||||
pipeServerOut = new NamedPipeServerStream(PipeNameOut, PipeDirection.In, 1, PipeTransmissionMode.Byte, PipeOptions.Asynchronous,102400,102400); | |||||
process = new Process | |||||
{ | |||||
StartInfo = | |||||
{ | |||||
FileName = @"C:\ffmpeg\bin\ffmpeg.exe", | |||||
Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -c copy -f flv -vcodec h264 -y \\.\pipe\{PipeNameOut}", | |||||
UseShellExecute = false, | |||||
CreateNoWindow = true, | |||||
} | |||||
}; | |||||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | |||||
} | |||||
public override void Dispose() | |||||
{ | |||||
try | |||||
{ | |||||
process.Close(); | |||||
pipeServerOut.Flush(); | |||||
} | |||||
catch | |||||
{ | |||||
} | |||||
process.Dispose(); | |||||
pipeServerOut.Dispose(); | |||||
base.Dispose(); | |||||
} | |||||
protected override Task ExecuteAsync(CancellationToken stoppingToken) | |||||
{ | |||||
process.Start(); | |||||
Task.Run(() => | |||||
{ | |||||
while (true) | |||||
{ | |||||
try | |||||
{ | |||||
Console.WriteLine("IsConnected>>>" + pipeServerOut.IsConnected); | |||||
if (pipeServerOut.IsConnected) | |||||
{ | |||||
if (pipeServerOut.CanRead) | |||||
{ | |||||
Span<byte> v1 = new byte[2048]; | |||||
var length = pipeServerOut.Read(v1); | |||||
var realValue = v1.Slice(0, length).ToArray(); | |||||
if (realValue.Length <= 0) continue; | |||||
if (flvFirstPackage == null) | |||||
{ | |||||
flvFirstPackage = realValue; | |||||
} | |||||
if (jT1078WebSocketSessionManager.GetAll().Count() > 0) | |||||
{ | |||||
foreach (var session in jT1078WebSocketSessionManager.GetAll()) | |||||
{ | |||||
if (!exists.ContainsKey(session.Channel.Id.AsShortText())) | |||||
{ | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(flvFirstPackage))); | |||||
exists.TryAdd(session.Channel.Id.AsShortText(), 0); | |||||
} | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(realValue))); | |||||
} | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
if (!pipeServerOut.IsConnected) | |||||
{ | |||||
Console.WriteLine("WaitForConnection Star..."); | |||||
pipeServerOut.WaitForConnectionAsync(); | |||||
Console.WriteLine("WaitForConnection End..."); | |||||
} | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Console.WriteLine(ex); | |||||
} | |||||
} | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,26 @@ | |||||
<!DOCTYPE html> | |||||
<html lang="en" xmlns="http://www.w3.org/1999/xhtml"> | |||||
<head> | |||||
<meta charset="utf-8" /> | |||||
<title></title> | |||||
<script src="flv.min.js"></script> | |||||
</head> | |||||
<body> | |||||
<video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video> | |||||
<script> | |||||
if (flvjs.isSupported()) { | |||||
var player = document.getElementById('player'); | |||||
var flvPlayer = flvjs.createPlayer({ | |||||
type: 'flv', | |||||
isLive: true, | |||||
url: "ws://127.0.0.1:1818?token=" + Math.floor((Math.random() * 1000000) + 1) | |||||
}); | |||||
flvPlayer.attachMediaElement(player); | |||||
flvPlayer.load(); | |||||
flvPlayer.play(); | |||||
} | |||||
</script> | |||||
</body> | |||||
</html> |
@@ -17,7 +17,7 @@ | |||||
"UdpPort": 1808, | "UdpPort": 1808, | ||||
"WebSocketPort": 1818, | "WebSocketPort": 1818, | ||||
"RemoteServerOptions": { | "RemoteServerOptions": { | ||||
//"RemoteServers": [ "172.16.19.209:16868" ] | |||||
} | } | ||||
} | } | ||||
} | } |