浏览代码

1.完善http-flv的chunk传输及测试

2.增加http的自定义中间件及发送扩展封装
old
smallchi 5 年前
父节点
当前提交
dee1ed202b
共有 22 个文件被更改,包括 131 次插入334 次删除
  1. +36
    -0
      src/JT1078.DotNetty.Core/Extensions/JT1078HttpSessionExtensions.cs
  2. +0
    -6
      src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs
  3. +0
    -213
      src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs
  4. +14
    -0
      src/JT1078.DotNetty.Core/Interfaces/IHttpMiddleware.cs
  5. +0
    -2
      src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs
  6. +2
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs
  7. +0
    -12
      src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs
  8. +0
    -14
      src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs
  9. +0
    -2
      src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs
  10. +7
    -1
      src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs
  11. +12
    -0
      src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs
  12. +10
    -1
      src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs
  13. +0
    -5
      src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs
  14. +18
    -0
      src/JT1078.DotNetty.TestHosting/CustomHttpMiddleware.cs
  15. +2
    -2
      src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs
  16. +15
    -56
      src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs
  17. +4
    -4
      src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html
  18. +0
    -3
      src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj
  19. +6
    -3
      src/JT1078.DotNetty.TestHosting/Program.cs
  20. +4
    -3
      src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs
  21. +1
    -2
      src/JT1078.DotNetty.TestHosting/appsettings.json
  22. +0
    -5
      src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs

+ 36
- 0
src/JT1078.DotNetty.Core/Extensions/JT1078HttpSessionExtensions.cs 查看文件

@@ -0,0 +1,36 @@
using DotNetty.Buffers;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.WebSockets;
using DotNetty.Common.Utilities;
using JT1078.DotNetty.Core.Metadata;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Extensions
{
public static class JT1078HttpSessionExtensions
{
private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty");
private static readonly AsciiString DateEntity = HttpHeaderNames.Date;
private static readonly AsciiString ServerEntity = HttpHeaderNames.Server;
public static void SendBinaryWebSocketAsync(this JT1078HttpSession session,byte[] data)
{
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(data)));
}
public static void SendHttpFirstChunkAsync(this JT1078HttpSession session, byte[] data)
{
DefaultHttpResponse firstRes = new DefaultHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK);
firstRes.Headers.Set(ServerEntity, ServerName);
firstRes.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
firstRes.Headers.Set(HttpHeaderNames.ContentType, (AsciiString)"video/x-flv");
HttpUtil.SetTransferEncodingChunked(firstRes, true);
session.Channel.WriteAsync(firstRes);
session.Channel.WriteAndFlushAsync(Unpooled.CopiedBuffer(data));
}
public static void SendHttpOtherChunkAsync(this JT1078HttpSession session, byte[] data)
{
session.Channel.WriteAndFlushAsync(Unpooled.CopiedBuffer(data));
}
}
}

+ 0
- 6
src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs 查看文件

@@ -15,11 +15,5 @@ namespace JT1078.DotNetty.Core.Impl
{
Services = services;
}

public IJT1078Builder Replace<T>() where T : IJT1078SourcePackageDispatcher
{
Services.Replace(new ServiceDescriptor(typeof(IJT1078SourcePackageDispatcher), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 0
- 213
src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs 查看文件

@@ -1,213 +0,0 @@
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Core.Interfaces;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;

namespace JT1078.DotNetty.Core.Impl
{
class JT1078SourcePackageDispatcherDefault : IJT1078SourcePackageDispatcher,IDisposable
{
private readonly ILogger<JT1078SourcePackageDispatcherDefault> logger;
private IOptionsMonitor<JT1078Configuration> optionsMonitor;
private ConcurrentDictionary<string, TcpClient> channeldic = new ConcurrentDictionary<string, TcpClient>();
private Queue<string> reconnectionQueue = new Queue<string>();
public JT1078SourcePackageDispatcherDefault(ILoggerFactory loggerFactory,
IOptionsMonitor<JT1078Configuration> optionsMonitor)
{
logger = loggerFactory.CreateLogger<JT1078SourcePackageDispatcherDefault>();
this.optionsMonitor = optionsMonitor;
timer = new System.Timers.Timer(10000);
timer.Elapsed += new ElapsedEventHandler(timer_Elapsed);
timer.AutoReset = false;
timer.Start();
InitialDispatcherClient();
}

private System.Timers.Timer timer;

public Task SendAsync(byte[] data)
{
foreach (var item in channeldic)
{
try
{
if (item.Value.Connected)
{
item.Value.Client.Send(data);
}
else
{
logger.LogError($"{item}链接已关闭");
item.Value.Close();
item.Value.Dispose();
channeldic.TryRemove(item.Key, out _);
reconnectionQueue.Enqueue(item.Key);
}
}
catch (Exception ex)
{
logger.LogError($"{item}发送数据出现异常:{ex}");
item.Value.Close();
item.Value.Dispose();
reconnectionQueue.Enqueue(item.Key);
channeldic.TryRemove(item.Key, out _);
}
}
return Task.CompletedTask;
}

public void InitialDispatcherClient()
{
Task.Run(async () =>
{
optionsMonitor.OnChange(options =>
{
List<string> lastRemoteServers = new List<string>();
if (options.RemoteServerOptions.RemoteServers != null)
{
lastRemoteServers = options.RemoteServerOptions.RemoteServers;
}
DelRemoteServsers(lastRemoteServers);
AddRemoteServsers(lastRemoteServers);
});
await InitRemoteServsers();
});
}

void timer_Elapsed(object sender, ElapsedEventArgs e)
{
timer.Stop();
Thread.CurrentThread.IsBackground = true;
try
{
if (reconnectionQueue.Count > 0)
{
var ip = reconnectionQueue.Dequeue();
if (!string.IsNullOrEmpty(ip))
{
AddRemoteServsers(new List<string>() { ip });
}
}
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
finally
{
timer.Start();
}
}

/// <summary>
/// 初始化远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="remoteServers"></param>
/// <returns></returns>
private async Task InitRemoteServsers()
{
List<string> remoteServers = new List<string>();
if (optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers != null)
{
remoteServers = optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers;
}
foreach (var item in remoteServers)
{
try
{
TcpClient client = new TcpClient();
client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1])));
if (client.Connected)
{
channeldic.TryAdd(item, client);
}
}
catch (Exception ex)
{
logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}");
}
}
await Task.CompletedTask;
}

/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="lastRemoteServers"></param>
private void DelRemoteServsers(List<string> lastRemoteServers)
{
var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList();
foreach (var item in delChannels)
{
channeldic[item].Close();
channeldic[item].Dispose();
channeldic.TryRemove(item, out var client);
}
}
/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="lastRemoteServers"></param>
private void AddRemoteServsers(List<string> lastRemoteServers)
{
var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList();
foreach (var item in addChannels)
{
try
{
TcpClient client = new TcpClient();
client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1])));
if (client.Connected)
{
channeldic.TryAdd(item, client);
}
else
{
client.Dispose();
reconnectionQueue.Enqueue(item);
}
}
catch (Exception ex)
{
logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}");
reconnectionQueue.Enqueue(item);
}
}
}

public void Dispose()
{
timer.Stop();
if (channeldic != null)
{
foreach (var item in channeldic)
{
try
{
if (item.Value.Connected)
{
item.Value.Close();
item.Value.Dispose();
}
}
catch (Exception)
{

}
}
}
timer.Dispose();
}
}
}

+ 14
- 0
src/JT1078.DotNetty.Core/Interfaces/IHttpMiddleware.cs 查看文件

@@ -0,0 +1,14 @@
using DotNetty.Codecs.Http;
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.Security.Principal;
using System.Text;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IHttpMiddleware
{
void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal);
}
}

+ 0
- 2
src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs 查看文件

@@ -8,7 +8,5 @@ namespace JT1078.DotNetty.Core.Interfaces
public interface IJT1078Builder
{
IServiceCollection Services { get; }

IJT1078Builder Replace<T>() where T: IJT1078SourcePackageDispatcher;
}
}

+ 2
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs 查看文件

@@ -9,5 +9,7 @@ namespace JT1078.DotNetty.Core.Interfaces
{
IJT1078Builder Instance { get; }
IJT1078Builder Builder();
IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization;
IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware;
}
}

+ 0
- 12
src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs 查看文件

@@ -1,12 +0,0 @@
using System.Threading.Tasks;

namespace JT1078.DotNetty.Core.Interfaces
{
/// <summary>
/// 源包分发器
/// </summary>
public interface IJT1078SourcePackageDispatcher
{
Task SendAsync(byte[] data);
}
}

+ 0
- 14
src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs 查看文件

@@ -1,14 +0,0 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078WebSocketBuilder
{
IJT1078Builder Instance { get; }
IJT1078Builder Builder();
IJT1078WebSocketBuilder Replace<T>() where T : IJT1078Authorization;
}
}

+ 0
- 2
src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs 查看文件

@@ -51,7 +51,6 @@ namespace JT1078.DotNetty.Core
}
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration"));
builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>();
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>();
return builder;
}
@@ -70,7 +69,6 @@ namespace JT1078.DotNetty.Core
}
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure(jt1078Options);
builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>();
builder.Services.TryAddSingleton<JT1078AtomicCounterService>();
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>();
return builder;


+ 7
- 1
src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs 查看文件

@@ -30,13 +30,17 @@ namespace JT1078.DotNetty.Http.Handlers

private readonly IJT1078Authorization iJT1078Authorization;

private readonly IHttpMiddleware httpMiddleware;

public JT1078HttpServerHandler(
JT1078HttpSessionManager jT1078HttpSessionManager,
IJT1078Authorization iJT1078Authorization,
ILoggerFactory loggerFactory)
ILoggerFactory loggerFactory,
IHttpMiddleware httpMiddleware = null)
{
this.jT1078HttpSessionManager = jT1078HttpSessionManager;
this.iJT1078Authorization = iJT1078Authorization;
this.httpMiddleware = httpMiddleware;
logger = loggerFactory.CreateLogger<JT1078HttpServerHandler>();
}

@@ -93,11 +97,13 @@ namespace JT1078.DotNetty.Http.Handlers
{
this.handshaker.HandshakeAsync(ctx.Channel, req);
jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel);
httpMiddleware?.Next(ctx, req, principal);
}
}
else
{
jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel);
httpMiddleware?.Next(ctx, req, principal);
}
}
else {


+ 12
- 0
src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs 查看文件

@@ -20,5 +20,17 @@ namespace JT1078.DotNetty.Http
{
return Instance;
}

public IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078Authorization), typeof(T), ServiceLifetime.Singleton));
return this;
}

public IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware
{
Instance.Services.TryAdd(new ServiceDescriptor(typeof(IHttpMiddleware), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 10
- 1
src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs 查看文件

@@ -1,6 +1,8 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.Cors;
using DotNetty.Common.Utilities;
using DotNetty.Handlers.Streams;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
@@ -66,8 +68,15 @@ namespace JT1078.DotNetty.Http
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new HttpServerCodec());
pipeline.AddLast(new CorsHandler(CorsConfigBuilder
.ForAnyOrigin()
.AllowNullOrigin()
.AllowedRequestMethods(HttpMethod.Get, HttpMethod.Post, HttpMethod.Options, HttpMethod.Delete)
.AllowedRequestHeaders((AsciiString)"origin", (AsciiString)"range", (AsciiString)"accept-encoding", (AsciiString)"referer", (AsciiString)"Cache-Control", (AsciiString)"X-Proxy-Authorization", (AsciiString)"X-Requested-With", (AsciiString)"Content-Type")
.ExposeHeaders((StringCharSequence)"Server", (StringCharSequence)"range", (StringCharSequence)"Content-Length", (StringCharSequence)"Content-Range")
.AllowCredentials()
.Build()));
pipeline.AddLast(new HttpObjectAggregator(int.MaxValue));
pipeline.AddLast("chunkedWriter", new ChunkedWriteHandler<IHttpContent>());
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("JT1078HttpServerHandler", scope.ServiceProvider.GetRequiredService<JT1078HttpServerHandler>());


+ 0
- 5
src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs 查看文件

@@ -24,16 +24,12 @@ namespace JT1078.DotNetty.Tcp.Handlers

private readonly IJT1078TcpMessageHandlers handlers;

private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher;

public JT1078TcpServerHandler(
IJT1078SourcePackageDispatcher sourcePackageDispatcher,
IJT1078TcpMessageHandlers handlers,
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
JT1078TcpSessionManager sessionManager)
{
this.sourcePackageDispatcher = sourcePackageDispatcher;
this.handlers = handlers;
this.SessionManager = sessionManager;
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp);
@@ -45,7 +41,6 @@ namespace JT1078.DotNetty.Tcp.Handlers
{
try
{
sourcePackageDispatcher.SendAsync(msg);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());


+ 18
- 0
src/JT1078.DotNetty.TestHosting/CustomHttpMiddleware.cs 查看文件

@@ -0,0 +1,18 @@
using DotNetty.Codecs.Http;
using DotNetty.Transport.Channels;
using JT1078.DotNetty.Core.Interfaces;
using System;
using System.Collections.Generic;
using System.Security.Principal;
using System.Text;

namespace JT1078.DotNetty.TestHosting
{
public class CustomHttpMiddleware : IHttpMiddleware
{
public void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal)
{
Console.WriteLine("CustomHttpMiddleware");
}
}
}

+ 2
- 2
src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs 查看文件

@@ -59,7 +59,7 @@ namespace JT1078.DotNetty.TestHosting
StartInfo =
{
FileName = @"C:\ffmpeg\bin\ffmpeg.exe",
Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -vcodec h264 -start_number 0 -hls_list_size 0 -f hls {filePath}",
Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -vcodec h264 -start_number 0 -hls_list_size 10 -f hls {filePath}",
UseShellExecute = false,
CreateNoWindow = true
}
@@ -93,8 +93,8 @@ namespace JT1078.DotNetty.TestHosting

public Task StopAsync(CancellationToken cancellationToken)
{
webHost.WaitForShutdownAsync();
process.Kill();
webHost.WaitForShutdownAsync();
return Task.CompletedTask;
}
}


+ 15
- 56
src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs 查看文件

@@ -19,6 +19,11 @@ using DotNetty.Common.Utilities;
using DotNetty.Codecs.Http;
using DotNetty.Handlers.Streams;
using DotNetty.Transport.Channels;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using JT1078.DotNetty.Core.Extensions;

namespace JT1078.DotNetty.TestHosting
{
@@ -27,14 +32,14 @@ namespace JT1078.DotNetty.TestHosting
private readonly Process process;
private readonly NamedPipeServerStream pipeServerOut;
private const string PipeNameOut = "demo1serverout";
private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty");
private static readonly AsciiString DateEntity = HttpHeaderNames.Date;
private static readonly AsciiString ServerEntity = HttpHeaderNames.Server;

private readonly JT1078HttpSessionManager jT1078HttpSessionManager;

/// <summary>
/// 需要缓存flv的第一包数据,当新用户进来先推送第一包的数据
/// </summary>
private byte[] flvFirstPackage;
private ConcurrentDictionary<string, byte> exists = new ConcurrentDictionary<string, byte>();
public FFMPEGHTTPFLVHostedService(JT1078HttpSessionManager jT1078HttpSessionManager)
{
@@ -51,29 +56,6 @@ namespace JT1078.DotNetty.TestHosting
};
this.jT1078HttpSessionManager = jT1078HttpSessionManager;
}


public void Dispose()
{
pipeServerOut.Dispose();
}


public byte[] Chunk(byte[] data)
{
byte[] buffer =new byte[4+2+2+ data.Length];
buffer[0] = (byte)(data.Length >> 24);
buffer[1] = (byte)(data.Length >> 16);
buffer[2] = (byte)(data.Length >> 8);
buffer[3] = (byte)data.Length;
buffer[4]=(byte)'\r';
buffer[5] = (byte)'\n';
Array.Copy(data,0, buffer, 7,data.Length);
buffer[buffer.Length - 2] = (byte)'\r';
buffer[buffer.Length - 1] = (byte)'\n';
return buffer;
}

public Task StartAsync(CancellationToken cancellationToken)
{
process.Start();
@@ -102,36 +84,10 @@ namespace JT1078.DotNetty.TestHosting
{
if (!exists.ContainsKey(session.Channel.Id.AsShortText()))
{
IFullHttpResponse firstRes = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK);
firstRes.Headers.Set(ServerEntity, ServerName);
firstRes.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowOrigin, "*");
firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowMethods, "GET,POST,HEAD,PUT,DELETE,OPTIONS");
firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowCredentials, "*");
firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowHeaders, "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type");
firstRes.Headers.Set(HttpHeaderNames.AccessControlExposeHeaders, "Server,range,Content-Length,Content-Range");
firstRes.Headers.Set(HttpHeaderNames.AcceptRanges, "bytes");
firstRes.Headers.Set(HttpHeaderNames.ContentType, "video/x-flv");
firstRes.Headers.Set(HttpHeaderNames.Connection, "Keep-Alive");
//HttpUtil.SetContentLength(firstRes, long.MaxValue);
firstRes.Content.WriteBytes(flvFirstPackage);
session.Channel.WriteAndFlushAsync(firstRes);
session.SendHttpFirstChunkAsync(flvFirstPackage);
exists.TryAdd(session.Channel.Id.AsShortText(), 0);
}
IFullHttpResponse res2 = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK);
res2.Headers.Set(ServerEntity, ServerName);
res2.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
res2.Headers.Set(HttpHeaderNames.AccessControlAllowOrigin, "*");
res2.Headers.Set(HttpHeaderNames.AccessControlAllowMethods, "GET,POST,HEAD,PUT,DELETE,OPTIONS");
res2.Headers.Set(HttpHeaderNames.AccessControlAllowCredentials, "*");
res2.Headers.Set(HttpHeaderNames.AccessControlAllowHeaders, "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type");
res2.Headers.Set(HttpHeaderNames.AccessControlExposeHeaders, "Server,range,Content-Length,Content-Range");
res2.Headers.Set(HttpHeaderNames.AcceptRanges, "bytes");
res2.Headers.Set(HttpHeaderNames.ContentType, "video/x-flv");
res2.Headers.Set(HttpHeaderNames.Connection, "Keep-Alive");
//HttpUtil.SetContentLength(res2, long.MaxValue);
res2.Content.WriteBytes(realValue);
session.Channel.WriteAndFlushAsync(res2);
session.SendHttpOtherChunkAsync(realValue);
}
}
//Console.WriteLine(JsonConvert.SerializeObject(realValue)+"-"+ length.ToString());
@@ -142,7 +98,7 @@ namespace JT1078.DotNetty.TestHosting
if (!pipeServerOut.IsConnected)
{
Console.WriteLine("WaitForConnection Star...");
pipeServerOut.WaitForConnectionAsync();
pipeServerOut.WaitForConnectionAsync().Wait(300);
Console.WriteLine("WaitForConnection End...");
}
}
@@ -155,7 +111,10 @@ namespace JT1078.DotNetty.TestHosting
});
return Task.CompletedTask;
}

public void Dispose()
{
pipeServerOut.Dispose();
}
public Task StopAsync(CancellationToken cancellationToken)
{
try


+ 4
- 4
src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html 查看文件

@@ -7,15 +7,15 @@
<script src="flv.min.js"></script>
</head>
<body>
<video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video>
<video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video>
<script>
if (flvjs.isSupported()) {
var player = document.getElementById('player');
var flvPlayer = flvjs.createPlayer({
type: 'flv',
isLive: true,
url: "http://127.0.0.1:1819/demo.flv?token=" + Math.floor((Math.random() * 1000000) + 1)
isLive: true,
url: "http://127.0.0.1:1818/demo.flv?token=" + Math.floor((Math.random() * 1000000) + 1)
});
flvPlayer.attachMediaElement(player);
flvPlayer.load();


+ 0
- 3
src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj 查看文件

@@ -25,9 +25,6 @@
</ItemGroup>

<ItemGroup>
<None Update="2019-07-12.log">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>


+ 6
- 3
src/JT1078.DotNetty.TestHosting/Program.cs 查看文件

@@ -64,16 +64,19 @@ namespace JT1078.DotNetty.TestHosting
// .AddJT1078UdpHost()
// .Replace<JT1078UdpMessageHandlers>()
// .Builder()
//.AddJT1078HttpHost()
//.Builder();
.AddJT1078HttpHost()
//.UseHttpMiddleware<CustomHttpMiddleware>()
.Builder()
;
//使用ffmpeg工具
//1.success
//services.AddHostedService<FFMPEGRTMPHostedService>();
//2.test
//2.success
//services.AddHostedService<FFMPEGHTTPFLVHostedService>();
//3.success
//services.AddHostedService<FFMPEGWSFLVPHostedService>();
//4.success
//http://127.0.0.1:5001/HLS/hls.html
services.AddHostedService<FFMPEGHLSHostedService>();
});



+ 4
- 3
src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs 查看文件

@@ -1,6 +1,7 @@
using DotNetty.Buffers;
using DotNetty.Codecs.Http.WebSockets;
using JT1078.DotNetty.Core.Session;
using JT1078.DotNetty.Core.Extensions;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
@@ -82,10 +83,10 @@ namespace JT1078.DotNetty.TestHosting
{
if (!exists.ContainsKey(session.Channel.Id.AsShortText()))
{
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(flvFirstPackage)));
session.SendBinaryWebSocketAsync(flvFirstPackage);
exists.TryAdd(session.Channel.Id.AsShortText(), 0);
}
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(realValue)));
session.SendBinaryWebSocketAsync(realValue);
}
}
}
@@ -95,7 +96,7 @@ namespace JT1078.DotNetty.TestHosting
if (!pipeServerOut.IsConnected)
{
Console.WriteLine("WaitForConnection Star...");
pipeServerOut.WaitForConnectionAsync();
pipeServerOut.WaitForConnectionAsync().Wait(300);
Console.WriteLine("WaitForConnection End...");
}
}


+ 1
- 2
src/JT1078.DotNetty.TestHosting/appsettings.json 查看文件

@@ -15,8 +15,7 @@
"JT1078Configuration": {
"TcpPort": 1808,
"UdpPort": 1808,
"WebSocketPort": 1818,
"HttpPort": 1819,
"HttpPort": 1818,
"RemoteServerOptions": {

}


+ 0
- 5
src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs 查看文件

@@ -23,16 +23,12 @@ namespace JT1078.DotNetty.Udp.Handlers
private readonly JT1078AtomicCounterService AtomicCounterService;

private readonly IJT1078UdpMessageHandlers handlers;

private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher;
public JT1078UdpServerHandler(
IJT1078SourcePackageDispatcher sourcePackageDispatcher,
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
IJT1078UdpMessageHandlers handlers,
JT1078UdpSessionManager sessionManager)
{
this.sourcePackageDispatcher = sourcePackageDispatcher;
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp);
this.SessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT1078UdpServerHandler>();
@@ -43,7 +39,6 @@ namespace JT1078.DotNetty.Udp.Handlers
{
try
{
sourcePackageDispatcher.SendAsync(msg.Buffer);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());


正在加载...
取消
保存