diff --git a/src/JT1078.DotNetty.Core/Extensions/JT1078HttpSessionExtensions.cs b/src/JT1078.DotNetty.Core/Extensions/JT1078HttpSessionExtensions.cs new file mode 100644 index 0000000..38e32c8 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Extensions/JT1078HttpSessionExtensions.cs @@ -0,0 +1,36 @@ +using DotNetty.Buffers; +using DotNetty.Codecs.Http; +using DotNetty.Codecs.Http.WebSockets; +using DotNetty.Common.Utilities; +using JT1078.DotNetty.Core.Metadata; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Extensions +{ + public static class JT1078HttpSessionExtensions + { + private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty"); + private static readonly AsciiString DateEntity = HttpHeaderNames.Date; + private static readonly AsciiString ServerEntity = HttpHeaderNames.Server; + public static void SendBinaryWebSocketAsync(this JT1078HttpSession session,byte[] data) + { + session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(data))); + } + public static void SendHttpFirstChunkAsync(this JT1078HttpSession session, byte[] data) + { + DefaultHttpResponse firstRes = new DefaultHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK); + firstRes.Headers.Set(ServerEntity, ServerName); + firstRes.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); + firstRes.Headers.Set(HttpHeaderNames.ContentType, (AsciiString)"video/x-flv"); + HttpUtil.SetTransferEncodingChunked(firstRes, true); + session.Channel.WriteAsync(firstRes); + session.Channel.WriteAndFlushAsync(Unpooled.CopiedBuffer(data)); + } + public static void SendHttpOtherChunkAsync(this JT1078HttpSession session, byte[] data) + { + session.Channel.WriteAndFlushAsync(Unpooled.CopiedBuffer(data)); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs b/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs index a34b4f7..35d66c7 100644 --- a/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs +++ b/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs @@ -15,11 +15,5 @@ namespace JT1078.DotNetty.Core.Impl { Services = services; } - - public IJT1078Builder Replace<T>() where T : IJT1078SourcePackageDispatcher - { - Services.Replace(new ServiceDescriptor(typeof(IJT1078SourcePackageDispatcher), typeof(T), ServiceLifetime.Singleton)); - return this; - } } } diff --git a/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs b/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs deleted file mode 100644 index 63103c8..0000000 --- a/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs +++ /dev/null @@ -1,213 +0,0 @@ -using JT1078.DotNetty.Core.Configurations; -using JT1078.DotNetty.Core.Interfaces; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using System.Net; -using System.Net.Sockets; -using System.Text; -using System.Threading; -using System.Threading.Tasks; -using System.Timers; - -namespace JT1078.DotNetty.Core.Impl -{ - class JT1078SourcePackageDispatcherDefault : IJT1078SourcePackageDispatcher,IDisposable - { - private readonly ILogger<JT1078SourcePackageDispatcherDefault> logger; - private IOptionsMonitor<JT1078Configuration> optionsMonitor; - private ConcurrentDictionary<string, TcpClient> channeldic = new ConcurrentDictionary<string, TcpClient>(); - private Queue<string> reconnectionQueue = new Queue<string>(); - public JT1078SourcePackageDispatcherDefault(ILoggerFactory loggerFactory, - IOptionsMonitor<JT1078Configuration> optionsMonitor) - { - logger = loggerFactory.CreateLogger<JT1078SourcePackageDispatcherDefault>(); - this.optionsMonitor = optionsMonitor; - timer = new System.Timers.Timer(10000); - timer.Elapsed += new ElapsedEventHandler(timer_Elapsed); - timer.AutoReset = false; - timer.Start(); - InitialDispatcherClient(); - } - - private System.Timers.Timer timer; - - public Task SendAsync(byte[] data) - { - foreach (var item in channeldic) - { - try - { - if (item.Value.Connected) - { - item.Value.Client.Send(data); - } - else - { - logger.LogError($"{item}链接已关闭"); - item.Value.Close(); - item.Value.Dispose(); - channeldic.TryRemove(item.Key, out _); - reconnectionQueue.Enqueue(item.Key); - } - } - catch (Exception ex) - { - logger.LogError($"{item}发送数据出现异常:{ex}"); - item.Value.Close(); - item.Value.Dispose(); - reconnectionQueue.Enqueue(item.Key); - channeldic.TryRemove(item.Key, out _); - } - } - return Task.CompletedTask; - } - - public void InitialDispatcherClient() - { - Task.Run(async () => - { - optionsMonitor.OnChange(options => - { - List<string> lastRemoteServers = new List<string>(); - if (options.RemoteServerOptions.RemoteServers != null) - { - lastRemoteServers = options.RemoteServerOptions.RemoteServers; - } - DelRemoteServsers(lastRemoteServers); - AddRemoteServsers(lastRemoteServers); - }); - await InitRemoteServsers(); - }); - } - - void timer_Elapsed(object sender, ElapsedEventArgs e) - { - timer.Stop(); - Thread.CurrentThread.IsBackground = true; - try - { - if (reconnectionQueue.Count > 0) - { - var ip = reconnectionQueue.Dequeue(); - if (!string.IsNullOrEmpty(ip)) - { - AddRemoteServsers(new List<string>() { ip }); - } - } - } - catch (Exception ex) - { - logger.LogError(ex, ""); - } - finally - { - timer.Start(); - } - } - - /// <summary> - /// 初始化远程服务器 - /// </summary> - /// <param name="bootstrap"></param> - /// <param name="remoteServers"></param> - /// <returns></returns> - private async Task InitRemoteServsers() - { - List<string> remoteServers = new List<string>(); - if (optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers != null) - { - remoteServers = optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers; - } - foreach (var item in remoteServers) - { - try - { - TcpClient client = new TcpClient(); - client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1]))); - if (client.Connected) - { - channeldic.TryAdd(item, client); - } - } - catch (Exception ex) - { - logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}"); - } - } - await Task.CompletedTask; - } - - /// <summary> - /// 动态删除远程服务器 - /// </summary> - /// <param name="lastRemoteServers"></param> - private void DelRemoteServsers(List<string> lastRemoteServers) - { - var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList(); - foreach (var item in delChannels) - { - channeldic[item].Close(); - channeldic[item].Dispose(); - channeldic.TryRemove(item, out var client); - } - } - /// <summary> - /// 动态添加远程服务器 - /// </summary> - /// <param name="lastRemoteServers"></param> - private void AddRemoteServsers(List<string> lastRemoteServers) - { - var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList(); - foreach (var item in addChannels) - { - try - { - TcpClient client = new TcpClient(); - client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1]))); - if (client.Connected) - { - channeldic.TryAdd(item, client); - } - else - { - client.Dispose(); - reconnectionQueue.Enqueue(item); - } - } - catch (Exception ex) - { - logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}"); - reconnectionQueue.Enqueue(item); - } - } - } - - public void Dispose() - { - timer.Stop(); - if (channeldic != null) - { - foreach (var item in channeldic) - { - try - { - if (item.Value.Connected) - { - item.Value.Close(); - item.Value.Dispose(); - } - } - catch (Exception) - { - - } - } - } - timer.Dispose(); - } - } -} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IHttpMiddleware.cs b/src/JT1078.DotNetty.Core/Interfaces/IHttpMiddleware.cs new file mode 100644 index 0000000..ce060d7 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IHttpMiddleware.cs @@ -0,0 +1,14 @@ +using DotNetty.Codecs.Http; +using DotNetty.Transport.Channels; +using System; +using System.Collections.Generic; +using System.Security.Principal; +using System.Text; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IHttpMiddleware + { + void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal); + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs index 633d30a..cd47e54 100644 --- a/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs @@ -8,7 +8,5 @@ namespace JT1078.DotNetty.Core.Interfaces public interface IJT1078Builder { IServiceCollection Services { get; } - - IJT1078Builder Replace<T>() where T: IJT1078SourcePackageDispatcher; } } diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs index 4280619..8427c9e 100644 --- a/src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078HttpBuilder.cs @@ -9,5 +9,7 @@ namespace JT1078.DotNetty.Core.Interfaces { IJT1078Builder Instance { get; } IJT1078Builder Builder(); + IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization; + IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware; } } diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs deleted file mode 100644 index 3df9f76..0000000 --- a/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Threading.Tasks; - -namespace JT1078.DotNetty.Core.Interfaces -{ - /// <summary> - /// 源包分发器 - /// </summary> - public interface IJT1078SourcePackageDispatcher - { - Task SendAsync(byte[] data); - } -} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs deleted file mode 100644 index d96181b..0000000 --- a/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.DotNetty.Core.Interfaces -{ - public interface IJT1078WebSocketBuilder - { - IJT1078Builder Instance { get; } - IJT1078Builder Builder(); - IJT1078WebSocketBuilder Replace<T>() where T : IJT1078Authorization; - } -} diff --git a/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs b/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs index d416ec0..cf7946c 100644 --- a/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs +++ b/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs @@ -51,7 +51,6 @@ namespace JT1078.DotNetty.Core } IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration")); - builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>(); builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>(); return builder; } @@ -70,7 +69,6 @@ namespace JT1078.DotNetty.Core } IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure(jt1078Options); - builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>(); builder.Services.TryAddSingleton<JT1078AtomicCounterService>(); builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>(); return builder; diff --git a/src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs b/src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs index 379d5ee..3336d42 100644 --- a/src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs +++ b/src/JT1078.DotNetty.Http/Handlers/JT1078HttpServerHandler.cs @@ -30,13 +30,17 @@ namespace JT1078.DotNetty.Http.Handlers private readonly IJT1078Authorization iJT1078Authorization; + private readonly IHttpMiddleware httpMiddleware; + public JT1078HttpServerHandler( JT1078HttpSessionManager jT1078HttpSessionManager, IJT1078Authorization iJT1078Authorization, - ILoggerFactory loggerFactory) + ILoggerFactory loggerFactory, + IHttpMiddleware httpMiddleware = null) { this.jT1078HttpSessionManager = jT1078HttpSessionManager; this.iJT1078Authorization = iJT1078Authorization; + this.httpMiddleware = httpMiddleware; logger = loggerFactory.CreateLogger<JT1078HttpServerHandler>(); } @@ -93,11 +97,13 @@ namespace JT1078.DotNetty.Http.Handlers { this.handshaker.HandshakeAsync(ctx.Channel, req); jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel); + httpMiddleware?.Next(ctx, req, principal); } } else { jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel); + httpMiddleware?.Next(ctx, req, principal); } } else { diff --git a/src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs b/src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs index 5d21653..52236bf 100644 --- a/src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs +++ b/src/JT1078.DotNetty.Http/JT1078HttpBuilderDefault.cs @@ -20,5 +20,17 @@ namespace JT1078.DotNetty.Http { return Instance; } + + public IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization + { + Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078Authorization), typeof(T), ServiceLifetime.Singleton)); + return this; + } + + public IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware + { + Instance.Services.TryAdd(new ServiceDescriptor(typeof(IHttpMiddleware), typeof(T), ServiceLifetime.Singleton)); + return this; + } } } diff --git a/src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs b/src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs index 8a55b86..05b2c86 100644 --- a/src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs +++ b/src/JT1078.DotNetty.Http/JT1078HttpServerHost.cs @@ -1,6 +1,8 @@ using DotNetty.Buffers; using DotNetty.Codecs; using DotNetty.Codecs.Http; +using DotNetty.Codecs.Http.Cors; +using DotNetty.Common.Utilities; using DotNetty.Handlers.Streams; using DotNetty.Handlers.Timeout; using DotNetty.Transport.Bootstrapping; @@ -66,8 +68,15 @@ namespace JT1078.DotNetty.Http { IChannelPipeline pipeline = channel.Pipeline; pipeline.AddLast(new HttpServerCodec()); + pipeline.AddLast(new CorsHandler(CorsConfigBuilder + .ForAnyOrigin() + .AllowNullOrigin() + .AllowedRequestMethods(HttpMethod.Get, HttpMethod.Post, HttpMethod.Options, HttpMethod.Delete) + .AllowedRequestHeaders((AsciiString)"origin", (AsciiString)"range", (AsciiString)"accept-encoding", (AsciiString)"referer", (AsciiString)"Cache-Control", (AsciiString)"X-Proxy-Authorization", (AsciiString)"X-Requested-With", (AsciiString)"Content-Type") + .ExposeHeaders((StringCharSequence)"Server", (StringCharSequence)"range", (StringCharSequence)"Content-Length", (StringCharSequence)"Content-Range") + .AllowCredentials() + .Build())); pipeline.AddLast(new HttpObjectAggregator(int.MaxValue)); - pipeline.AddLast("chunkedWriter", new ChunkedWriteHandler<IHttpContent>()); using (var scope = serviceProvider.CreateScope()) { pipeline.AddLast("JT1078HttpServerHandler", scope.ServiceProvider.GetRequiredService<JT1078HttpServerHandler>()); diff --git a/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs index c149409..d6f2b5b 100644 --- a/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs +++ b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs @@ -24,16 +24,12 @@ namespace JT1078.DotNetty.Tcp.Handlers private readonly IJT1078TcpMessageHandlers handlers; - private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher; - public JT1078TcpServerHandler( - IJT1078SourcePackageDispatcher sourcePackageDispatcher, IJT1078TcpMessageHandlers handlers, ILoggerFactory loggerFactory, JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, JT1078TcpSessionManager sessionManager) { - this.sourcePackageDispatcher = sourcePackageDispatcher; this.handlers = handlers; this.SessionManager = sessionManager; this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp); @@ -45,7 +41,6 @@ namespace JT1078.DotNetty.Tcp.Handlers { try { - sourcePackageDispatcher.SendAsync(msg); if (logger.IsEnabled(LogLevel.Trace)) { logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); diff --git a/src/JT1078.DotNetty.TestHosting/CustomHttpMiddleware.cs b/src/JT1078.DotNetty.TestHosting/CustomHttpMiddleware.cs new file mode 100644 index 0000000..aa31628 --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/CustomHttpMiddleware.cs @@ -0,0 +1,18 @@ +using DotNetty.Codecs.Http; +using DotNetty.Transport.Channels; +using JT1078.DotNetty.Core.Interfaces; +using System; +using System.Collections.Generic; +using System.Security.Principal; +using System.Text; + +namespace JT1078.DotNetty.TestHosting +{ + public class CustomHttpMiddleware : IHttpMiddleware + { + public void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal) + { + Console.WriteLine("CustomHttpMiddleware"); + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs b/src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs index 4b1ef1b..a76384e 100644 --- a/src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs +++ b/src/JT1078.DotNetty.TestHosting/HLS/FFMPEGHLSHostedService.cs @@ -59,7 +59,7 @@ namespace JT1078.DotNetty.TestHosting StartInfo = { FileName = @"C:\ffmpeg\bin\ffmpeg.exe", - Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -vcodec h264 -start_number 0 -hls_list_size 0 -f hls {filePath}", + Arguments = $@"-f dshow -i video={HardwareCamera.CameraName} -vcodec h264 -start_number 0 -hls_list_size 10 -f hls {filePath}", UseShellExecute = false, CreateNoWindow = true } @@ -93,8 +93,8 @@ namespace JT1078.DotNetty.TestHosting public Task StopAsync(CancellationToken cancellationToken) { - webHost.WaitForShutdownAsync(); process.Kill(); + webHost.WaitForShutdownAsync(); return Task.CompletedTask; } } diff --git a/src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs b/src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs index 7246cc9..749fd16 100644 --- a/src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs +++ b/src/JT1078.DotNetty.TestHosting/HTTPFLV/FFMPEGHTTPFLVHostedService.cs @@ -19,6 +19,11 @@ using DotNetty.Common.Utilities; using DotNetty.Codecs.Http; using DotNetty.Handlers.Streams; using DotNetty.Transport.Channels; +using Microsoft.AspNetCore.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using JT1078.DotNetty.Core.Extensions; namespace JT1078.DotNetty.TestHosting { @@ -27,14 +32,14 @@ namespace JT1078.DotNetty.TestHosting private readonly Process process; private readonly NamedPipeServerStream pipeServerOut; private const string PipeNameOut = "demo1serverout"; - private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty"); - private static readonly AsciiString DateEntity = HttpHeaderNames.Date; - private static readonly AsciiString ServerEntity = HttpHeaderNames.Server; + private readonly JT1078HttpSessionManager jT1078HttpSessionManager; + /// <summary> /// 需要缓存flv的第一包数据,当新用户进来先推送第一包的数据 /// </summary> private byte[] flvFirstPackage; + private ConcurrentDictionary<string, byte> exists = new ConcurrentDictionary<string, byte>(); public FFMPEGHTTPFLVHostedService(JT1078HttpSessionManager jT1078HttpSessionManager) { @@ -51,29 +56,6 @@ namespace JT1078.DotNetty.TestHosting }; this.jT1078HttpSessionManager = jT1078HttpSessionManager; } - - - public void Dispose() - { - pipeServerOut.Dispose(); - } - - - public byte[] Chunk(byte[] data) - { - byte[] buffer =new byte[4+2+2+ data.Length]; - buffer[0] = (byte)(data.Length >> 24); - buffer[1] = (byte)(data.Length >> 16); - buffer[2] = (byte)(data.Length >> 8); - buffer[3] = (byte)data.Length; - buffer[4]=(byte)'\r'; - buffer[5] = (byte)'\n'; - Array.Copy(data,0, buffer, 7,data.Length); - buffer[buffer.Length - 2] = (byte)'\r'; - buffer[buffer.Length - 1] = (byte)'\n'; - return buffer; - } - public Task StartAsync(CancellationToken cancellationToken) { process.Start(); @@ -102,36 +84,10 @@ namespace JT1078.DotNetty.TestHosting { if (!exists.ContainsKey(session.Channel.Id.AsShortText())) { - IFullHttpResponse firstRes = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK); - firstRes.Headers.Set(ServerEntity, ServerName); - firstRes.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); - firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowOrigin, "*"); - firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowMethods, "GET,POST,HEAD,PUT,DELETE,OPTIONS"); - firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowCredentials, "*"); - firstRes.Headers.Set(HttpHeaderNames.AccessControlAllowHeaders, "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type"); - firstRes.Headers.Set(HttpHeaderNames.AccessControlExposeHeaders, "Server,range,Content-Length,Content-Range"); - firstRes.Headers.Set(HttpHeaderNames.AcceptRanges, "bytes"); - firstRes.Headers.Set(HttpHeaderNames.ContentType, "video/x-flv"); - firstRes.Headers.Set(HttpHeaderNames.Connection, "Keep-Alive"); - //HttpUtil.SetContentLength(firstRes, long.MaxValue); - firstRes.Content.WriteBytes(flvFirstPackage); - session.Channel.WriteAndFlushAsync(firstRes); + session.SendHttpFirstChunkAsync(flvFirstPackage); exists.TryAdd(session.Channel.Id.AsShortText(), 0); } - IFullHttpResponse res2 = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.OK); - res2.Headers.Set(ServerEntity, ServerName); - res2.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); - res2.Headers.Set(HttpHeaderNames.AccessControlAllowOrigin, "*"); - res2.Headers.Set(HttpHeaderNames.AccessControlAllowMethods, "GET,POST,HEAD,PUT,DELETE,OPTIONS"); - res2.Headers.Set(HttpHeaderNames.AccessControlAllowCredentials, "*"); - res2.Headers.Set(HttpHeaderNames.AccessControlAllowHeaders, "origin,range,accept-encoding,referer,Cache-Control,X-Proxy-Authorization,X-Requested-With,Content-Type"); - res2.Headers.Set(HttpHeaderNames.AccessControlExposeHeaders, "Server,range,Content-Length,Content-Range"); - res2.Headers.Set(HttpHeaderNames.AcceptRanges, "bytes"); - res2.Headers.Set(HttpHeaderNames.ContentType, "video/x-flv"); - res2.Headers.Set(HttpHeaderNames.Connection, "Keep-Alive"); - //HttpUtil.SetContentLength(res2, long.MaxValue); - res2.Content.WriteBytes(realValue); - session.Channel.WriteAndFlushAsync(res2); + session.SendHttpOtherChunkAsync(realValue); } } //Console.WriteLine(JsonConvert.SerializeObject(realValue)+"-"+ length.ToString()); @@ -142,7 +98,7 @@ namespace JT1078.DotNetty.TestHosting if (!pipeServerOut.IsConnected) { Console.WriteLine("WaitForConnection Star..."); - pipeServerOut.WaitForConnectionAsync(); + pipeServerOut.WaitForConnectionAsync().Wait(300); Console.WriteLine("WaitForConnection End..."); } } @@ -155,7 +111,10 @@ namespace JT1078.DotNetty.TestHosting }); return Task.CompletedTask; } - + public void Dispose() + { + pipeServerOut.Dispose(); + } public Task StopAsync(CancellationToken cancellationToken) { try diff --git a/src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html b/src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html index 6cf03b4..675386a 100644 --- a/src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html +++ b/src/JT1078.DotNetty.TestHosting/HTTPFLV/flv.html @@ -7,15 +7,15 @@ <script src="flv.min.js"></script> </head> <body> - <video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video> + <video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video> <script> - + if (flvjs.isSupported()) { var player = document.getElementById('player'); var flvPlayer = flvjs.createPlayer({ type: 'flv', - isLive: true, - url: "http://127.0.0.1:1819/demo.flv?token=" + Math.floor((Math.random() * 1000000) + 1) + isLive: true, + url: "http://127.0.0.1:1818/demo.flv?token=" + Math.floor((Math.random() * 1000000) + 1) }); flvPlayer.attachMediaElement(player); flvPlayer.load(); diff --git a/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj index 226b016..a53ef3b 100644 --- a/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj +++ b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj @@ -25,9 +25,6 @@ </ItemGroup> <ItemGroup> - <None Update="2019-07-12.log"> - <CopyToOutputDirectory>Always</CopyToOutputDirectory> - </None> <None Update="appsettings.json"> <CopyToOutputDirectory>Always</CopyToOutputDirectory> </None> diff --git a/src/JT1078.DotNetty.TestHosting/Program.cs b/src/JT1078.DotNetty.TestHosting/Program.cs index 4bed0ea..4faa2b5 100644 --- a/src/JT1078.DotNetty.TestHosting/Program.cs +++ b/src/JT1078.DotNetty.TestHosting/Program.cs @@ -64,16 +64,19 @@ namespace JT1078.DotNetty.TestHosting // .AddJT1078UdpHost() // .Replace<JT1078UdpMessageHandlers>() // .Builder() - //.AddJT1078HttpHost() - //.Builder(); + .AddJT1078HttpHost() + //.UseHttpMiddleware<CustomHttpMiddleware>() + .Builder() ; + //使用ffmpeg工具 //1.success //services.AddHostedService<FFMPEGRTMPHostedService>(); - //2.test + //2.success //services.AddHostedService<FFMPEGHTTPFLVHostedService>(); //3.success //services.AddHostedService<FFMPEGWSFLVPHostedService>(); //4.success + //http://127.0.0.1:5001/HLS/hls.html services.AddHostedService<FFMPEGHLSHostedService>(); }); diff --git a/src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs b/src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs index 7b70afe..7eb74ab 100644 --- a/src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs +++ b/src/JT1078.DotNetty.TestHosting/WSFLV/FFMPEGWSFLVPHostedService.cs @@ -1,6 +1,7 @@ using DotNetty.Buffers; using DotNetty.Codecs.Http.WebSockets; using JT1078.DotNetty.Core.Session; +using JT1078.DotNetty.Core.Extensions; using Microsoft.Extensions.Hosting; using System; using System.Collections.Generic; @@ -82,10 +83,10 @@ namespace JT1078.DotNetty.TestHosting { if (!exists.ContainsKey(session.Channel.Id.AsShortText())) { - session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(flvFirstPackage))); + session.SendBinaryWebSocketAsync(flvFirstPackage); exists.TryAdd(session.Channel.Id.AsShortText(), 0); } - session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(realValue))); + session.SendBinaryWebSocketAsync(realValue); } } } @@ -95,7 +96,7 @@ namespace JT1078.DotNetty.TestHosting if (!pipeServerOut.IsConnected) { Console.WriteLine("WaitForConnection Star..."); - pipeServerOut.WaitForConnectionAsync(); + pipeServerOut.WaitForConnectionAsync().Wait(300); Console.WriteLine("WaitForConnection End..."); } } diff --git a/src/JT1078.DotNetty.TestHosting/appsettings.json b/src/JT1078.DotNetty.TestHosting/appsettings.json index 9dbbc47..6e4cb07 100644 --- a/src/JT1078.DotNetty.TestHosting/appsettings.json +++ b/src/JT1078.DotNetty.TestHosting/appsettings.json @@ -15,8 +15,7 @@ "JT1078Configuration": { "TcpPort": 1808, "UdpPort": 1808, - "WebSocketPort": 1818, - "HttpPort": 1819, + "HttpPort": 1818, "RemoteServerOptions": { } diff --git a/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs index 3b289e5..33b0ccd 100644 --- a/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs +++ b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs @@ -23,16 +23,12 @@ namespace JT1078.DotNetty.Udp.Handlers private readonly JT1078AtomicCounterService AtomicCounterService; private readonly IJT1078UdpMessageHandlers handlers; - - private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher; public JT1078UdpServerHandler( - IJT1078SourcePackageDispatcher sourcePackageDispatcher, ILoggerFactory loggerFactory, JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, IJT1078UdpMessageHandlers handlers, JT1078UdpSessionManager sessionManager) { - this.sourcePackageDispatcher = sourcePackageDispatcher; this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp); this.SessionManager = sessionManager; logger = loggerFactory.CreateLogger<JT1078UdpServerHandler>(); @@ -43,7 +39,6 @@ namespace JT1078.DotNetty.Udp.Handlers { try { - sourcePackageDispatcher.SendAsync(msg.Buffer); if (logger.IsEnabled(LogLevel.Trace)) { logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());