From b4f0faa2c1fe120c9c3ebf9aa68f1acc5d09a15b Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Sun, 14 Jul 2019 20:45:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8E=A8=E9=80=81=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configurations/JT1078Configuration.cs | 2 + .../Interfaces/IJT1078WebSocketBuilder.cs | 13 ++ .../Metadata/JT1078WebSocketSession.cs | 31 ++++ .../Session/JT1078WebSocketSessionManager.cs | 100 +++++++++++ .../Configs/nlog.win.config | 2 +- .../HexExtensions.cs | 80 +++++++++ .../JT1078.DotNetty.TestHosting.csproj | 4 + .../JT1078WebSocketPushHostedService.cs | 52 ++++++ src/JT1078.DotNetty.TestHosting/Program.cs | 14 +- .../appsettings.json | 3 +- .../Handlers/JT1078WebSocketServerHandler.cs | 156 ++++++++++++++++++ .../JT1078.DotNetty.WebSocket.csproj | 22 +++ .../JT1078WebSocketBuilderDefault.cs | 24 +++ .../JT1078WebSocketDotnettyExtensions.cs | 22 +++ .../JT1078WebSocketServerHost.cs | 89 ++++++++++ src/JT1078DotNetty.sln | 6 + 16 files changed, 613 insertions(+), 7 deletions(-) create mode 100644 src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs create mode 100644 src/JT1078.DotNetty.Core/Metadata/JT1078WebSocketSession.cs create mode 100644 src/JT1078.DotNetty.Core/Session/JT1078WebSocketSessionManager.cs create mode 100644 src/JT1078.DotNetty.TestHosting/HexExtensions.cs create mode 100644 src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs create mode 100644 src/JT1078.DotNetty.WebSocket/Handlers/JT1078WebSocketServerHandler.cs create mode 100644 src/JT1078.DotNetty.WebSocket/JT1078.DotNetty.WebSocket.csproj create mode 100644 src/JT1078.DotNetty.WebSocket/JT1078WebSocketBuilderDefault.cs create mode 100644 src/JT1078.DotNetty.WebSocket/JT1078WebSocketDotnettyExtensions.cs create mode 100644 src/JT1078.DotNetty.WebSocket/JT1078WebSocketServerHost.cs diff --git a/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs b/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs index a98f3e0..bd94d65 100644 --- a/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs +++ b/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs @@ -11,6 +11,8 @@ namespace JT1078.DotNetty.Core.Configurations public int UdpPort { get; set; } = 1808; + public int WebSocketPort { get; set; } = 1818; + public int QuietPeriodSeconds { get; set; } = 1; public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs new file mode 100644 index 0000000..2da46df --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078WebSocketBuilder.cs @@ -0,0 +1,13 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078WebSocketBuilder + { + IJT1078Builder Instance { get; } + IJT1078Builder Builder(); + } +} diff --git a/src/JT1078.DotNetty.Core/Metadata/JT1078WebSocketSession.cs b/src/JT1078.DotNetty.Core/Metadata/JT1078WebSocketSession.cs new file mode 100644 index 0000000..e4fbb77 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Metadata/JT1078WebSocketSession.cs @@ -0,0 +1,31 @@ +using DotNetty.Transport.Channels; +using System; +using System.Net; + +namespace JT1078.DotNetty.Core.Metadata +{ + public class JT1078WebSocketSession + { + public JT1078WebSocketSession( + IChannel channel, + string userId) + { + Channel = channel; + UserId = userId; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + } + + public JT1078WebSocketSession() { } + + public string UserId { get; set; } + + public string AttachInfo { get; set; } + + public IChannel Channel { get; set; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; set; } + } +} diff --git a/src/JT1078.DotNetty.Core/Session/JT1078WebSocketSessionManager.cs b/src/JT1078.DotNetty.Core/Session/JT1078WebSocketSessionManager.cs new file mode 100644 index 0000000..1edbeba --- /dev/null +++ b/src/JT1078.DotNetty.Core/Session/JT1078WebSocketSessionManager.cs @@ -0,0 +1,100 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using DotNetty.Transport.Channels; +using JT1078.DotNetty.Core.Metadata; + +namespace JT1078.DotNetty.Core.Session +{ + /// + /// JT1078 WebSocket会话管理 + /// + public class JT1078WebSocketSessionManager + { + private readonly ILogger logger; + + public JT1078WebSocketSessionManager( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public JT1078WebSocketSession GetSession(string userId) + { + if (string.IsNullOrEmpty(userId)) + return default; + if (SessionIdDict.TryGetValue(userId, out JT1078WebSocketSession targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + + public void TryAdd(string terminalPhoneNo,IChannel channel) + { + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078WebSocketSession oldSession)) + { + oldSession.LastActiveTime = DateTime.Now; + oldSession.Channel = channel; + SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); + } + else + { + JT1078WebSocketSession session = new JT1078WebSocketSession(channel, terminalPhoneNo); + if (SessionIdDict.TryAdd(terminalPhoneNo, session)) + { + + } + } + } + + public JT1078WebSocketSession RemoveSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) return default; + if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078WebSocketSession sessionRemove)) + { + logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + return sessionRemove; + } + else + { + return default; + } + } + + public void RemoveSessionByChannel(IChannel channel) + { + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); + if (terminalPhoneNos.Count > 0) + { + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT1078WebSocketSession sessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{nos} Channel Remove."); + } + } + + public IEnumerable GetAll() + { + return SessionIdDict.Select(s => s.Value).ToList(); + } + } +} + diff --git a/src/JT1078.DotNetty.TestHosting/Configs/nlog.win.config b/src/JT1078.DotNetty.TestHosting/Configs/nlog.win.config index 4ba63a2..e803714 100644 --- a/src/JT1078.DotNetty.TestHosting/Configs/nlog.win.config +++ b/src/JT1078.DotNetty.TestHosting/Configs/nlog.win.config @@ -37,7 +37,7 @@ - + diff --git a/src/JT1078.DotNetty.TestHosting/HexExtensions.cs b/src/JT1078.DotNetty.TestHosting/HexExtensions.cs new file mode 100644 index 0000000..6e66f3d --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/HexExtensions.cs @@ -0,0 +1,80 @@ +using System; + +namespace JT1078.DotNetty.TestHosting +{ + public static partial class BinaryExtensions + { + public static string ToHexString(this byte[] source) + { + return HexUtil.DoHexDump(source, 0, source.Length).ToUpper(); + } + /// + /// 16进制字符串转16进制数组 + /// + /// + /// + /// + public static byte[] ToHexBytes(this string hexString) + { + hexString = hexString.Replace(" ", ""); + byte[] buf = new byte[hexString.Length / 2]; + ReadOnlySpan readOnlySpan = hexString.AsSpan(); + for (int i = 0; i < hexString.Length; i++) + { + if (i % 2 == 0) + { + buf[i / 2] = Convert.ToByte(readOnlySpan.Slice(i, 2).ToString(), 16); + } + } + return buf; + } + } + + public static class HexUtil + { + static readonly char[] HexdumpTable = new char[256 * 4]; + static HexUtil() + { + char[] digits = "0123456789ABCDEF".ToCharArray(); + for (int i = 0; i < 256; i++) + { + HexdumpTable[i << 1] = digits[(int)((uint)i >> 4 & 0x0F)]; + HexdumpTable[(i << 1) + 1] = digits[i & 0x0F]; + } + } + + public static string DoHexDump(ReadOnlySpan buffer, int fromIndex, int length) + { + if (length == 0) + { + return ""; + } + int endIndex = fromIndex + length; + var buf = new char[length << 1]; + int srcIdx = fromIndex; + int dstIdx = 0; + for (; srcIdx < endIndex; srcIdx++, dstIdx += 2) + { + Array.Copy(HexdumpTable, buffer[srcIdx] << 1, buf, dstIdx, 2); + } + return new string(buf); + } + + public static string DoHexDump(byte[] array, int fromIndex, int length) + { + if (length == 0) + { + return ""; + } + int endIndex = fromIndex + length; + var buf = new char[length << 1]; + int srcIdx = fromIndex; + int dstIdx = 0; + for (; srcIdx < endIndex; srcIdx++, dstIdx += 2) + { + Array.Copy(HexdumpTable, (array[srcIdx] & 0xFF) << 1, buf, dstIdx, 2); + } + return new string(buf); + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj index 50c4151..ddad0c9 100644 --- a/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj +++ b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj @@ -17,9 +17,13 @@ + + + Always + Always diff --git a/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs b/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs new file mode 100644 index 0000000..35d5460 --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/JT1078WebSocketPushHostedService.cs @@ -0,0 +1,52 @@ +using DotNetty.Buffers; +using DotNetty.Codecs.Http.WebSockets; +using JT1078.DotNetty.Core.Session; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.IO; +using System.Linq; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT1078.Protocol; + +namespace JT1078.DotNetty.TestHosting +{ + class JT1078WebSocketPushHostedService : IHostedService + { + JT1078WebSocketSessionManager jT1078WebSocketSessionManager; + + public JT1078WebSocketPushHostedService(JT1078WebSocketSessionManager jT1078WebSocketSessionManager) + { + this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; + } + public Task StartAsync(CancellationToken cancellationToken) + { + var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-07-12.log")); + + Task.Run(() => + { + while (true) + { + var session = jT1078WebSocketSessionManager.GetAll().FirstOrDefault(); + if (session != null) + { + for (int i = 0; i < lines.Length; i++) + { + var package = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes()); + session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(package.Bodies))); + } + } + Thread.Sleep(10000); + } + }); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/Program.cs b/src/JT1078.DotNetty.TestHosting/Program.cs index 5cc470d..d0e402a 100644 --- a/src/JT1078.DotNetty.TestHosting/Program.cs +++ b/src/JT1078.DotNetty.TestHosting/Program.cs @@ -2,6 +2,7 @@ using JT1078.DotNetty.Tcp; using JT1078.DotNetty.TestHosting.Handlers; using JT1078.DotNetty.Udp; +using JT1078.DotNetty.WebSocket; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; @@ -57,12 +58,15 @@ namespace JT1078.DotNetty.TestHosting services.AddSingleton(); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT1078Core(hostContext.Configuration) - .AddJT1078TcpHost() - .Replace() - .Builder() - .AddJT1078UdpHost() - .Replace() + //.AddJT1078TcpHost() + //.Replace() + //.Builder() + //.AddJT1078UdpHost() + //.Replace() + //.Builder() + .AddJT1078WebSocketHost() .Builder(); + services.AddHostedService(); }); await serverHostBuilder.RunConsoleAsync(); diff --git a/src/JT1078.DotNetty.TestHosting/appsettings.json b/src/JT1078.DotNetty.TestHosting/appsettings.json index 8a684ec..128e346 100644 --- a/src/JT1078.DotNetty.TestHosting/appsettings.json +++ b/src/JT1078.DotNetty.TestHosting/appsettings.json @@ -15,8 +15,9 @@ "JT1078Configuration": { "TcpPort": 1808, "UdpPort": 1808, + "WebSocketPort": 1818, "RemoteServerOptions": { - "RemoteServers": ["172.16.19.209:16868"] + "RemoteServers": [ "172.16.19.209:16868" ] } } } diff --git a/src/JT1078.DotNetty.WebSocket/Handlers/JT1078WebSocketServerHandler.cs b/src/JT1078.DotNetty.WebSocket/Handlers/JT1078WebSocketServerHandler.cs new file mode 100644 index 0000000..6ef013d --- /dev/null +++ b/src/JT1078.DotNetty.WebSocket/Handlers/JT1078WebSocketServerHandler.cs @@ -0,0 +1,156 @@ +using System; +using System.Diagnostics; +using System.Text; +using System.Threading.Tasks; +using DotNetty.Buffers; +using DotNetty.Codecs.Http; +using DotNetty.Codecs.Http.WebSockets; +using DotNetty.Common.Utilities; +using DotNetty.Transport.Channels; +using static DotNetty.Codecs.Http.HttpVersion; +using static DotNetty.Codecs.Http.HttpResponseStatus; +using Microsoft.Extensions.Logging; +using JT1078.DotNetty.Core.Session; +using System.Text.RegularExpressions; + +namespace JT1078.DotNetty.WebSocket.Handlers +{ + public sealed class JT1078WebSocketServerHandler : SimpleChannelInboundHandler + { + const string WebsocketPath = "/jt1078live"; + + WebSocketServerHandshaker handshaker; + + private readonly ILogger logger; + + private readonly JT1078WebSocketSessionManager jT1078WebSocketSessionManager; + + public JT1078WebSocketServerHandler( + JT1078WebSocketSessionManager jT1078WebSocketSessionManager, + ILoggerFactory loggerFactory) + { + this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; + logger = loggerFactory.CreateLogger(); + } + + public override void ChannelInactive(IChannelHandlerContext context) + { + if (logger.IsEnabled(LogLevel.Information)) + { + logger.LogInformation(context.Channel.Id.AsShortText()); + } + jT1078WebSocketSessionManager.RemoveSessionByChannel(context.Channel); + base.ChannelInactive(context); + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) + { + if (msg is IFullHttpRequest request) + { + this.HandleHttpRequest(ctx, request); + } + else if (msg is WebSocketFrame frame) + { + this.HandleWebSocketFrame(ctx, frame); + } + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + + void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req) + { + // Handle a bad request. + if (!req.Result.IsSuccess) + { + SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest)); + return; + } + // Allow only GET methods. + if (!Equals(req.Method, HttpMethod.Get)) + { + SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Forbidden)); + return; + } + if ("/favicon.ico".Equals(req.Uri)) + { + var res = new DefaultFullHttpResponse(Http11, NotFound); + SendHttpResponse(ctx, req, res); + return; + } + // Handshake + var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); + this.handshaker = wsFactory.NewHandshaker(req); + if (this.handshaker == null) + { + WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); + } + else + { + this.handshaker.HandshakeAsync(ctx.Channel, req); + var uriSpan = req.Uri.AsSpan(); + var userId = uriSpan.Slice(uriSpan.IndexOf('?')).ToString().Split('=')[1]; + jT1078WebSocketSessionManager.TryAdd(userId, ctx.Channel); + } + } + + void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame) + { + // Check for closing frame + if (frame is CloseWebSocketFrame) + { + this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain()); + return; + } + if (frame is PingWebSocketFrame) + { + ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain())); + return; + } + if (frame is TextWebSocketFrame) + { + // Echo the frame + ctx.WriteAsync(frame.Retain()); + return; + } + if (frame is BinaryWebSocketFrame) + { + // Echo the frame + ctx.WriteAsync(frame.Retain()); + } + } + + static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res) + { + // Generate an error page if response getStatus code is not OK (200). + if (res.Status.Code != 200) + { + IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString())); + res.Content.WriteBytes(buf); + buf.Release(); + HttpUtil.SetContentLength(res, res.Content.ReadableBytes); + } + // Send the response and close the connection if necessary. + Task task = ctx.Channel.WriteAndFlushAsync(res); + if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200) + { + task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(), + ctx, TaskContinuationOptions.ExecuteSynchronously); + } + } + + public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e) + { + logger.LogError(e, ctx.Channel.Id.AsShortText()); + jT1078WebSocketSessionManager.RemoveSessionByChannel(ctx.Channel); + ctx.CloseAsync(); + } + + static string GetWebSocketLocation(IFullHttpRequest req) + { + bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value); + + string location= value.ToString() + WebsocketPath; + return "ws://" + location; + } + } +} diff --git a/src/JT1078.DotNetty.WebSocket/JT1078.DotNetty.WebSocket.csproj b/src/JT1078.DotNetty.WebSocket/JT1078.DotNetty.WebSocket.csproj new file mode 100644 index 0000000..df2c3ea --- /dev/null +++ b/src/JT1078.DotNetty.WebSocket/JT1078.DotNetty.WebSocket.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + + + + diff --git a/src/JT1078.DotNetty.WebSocket/JT1078WebSocketBuilderDefault.cs b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketBuilderDefault.cs new file mode 100644 index 0000000..0c2c017 --- /dev/null +++ b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketBuilderDefault.cs @@ -0,0 +1,24 @@ +using JT1078.DotNetty.Core.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.WebSocket +{ + class JT1078WebSocketBuilderDefault : IJT1078WebSocketBuilder + { + public IJT1078Builder Instance { get; } + + public JT1078WebSocketBuilderDefault(IJT1078Builder builder) + { + Instance = builder; + } + + public IJT1078Builder Builder() + { + return Instance; + } + } +} diff --git a/src/JT1078.DotNetty.WebSocket/JT1078WebSocketDotnettyExtensions.cs b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketDotnettyExtensions.cs new file mode 100644 index 0000000..b4acc99 --- /dev/null +++ b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketDotnettyExtensions.cs @@ -0,0 +1,22 @@ +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.DotNetty.Core.Session; +using JT1078.DotNetty.WebSocket.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System.Runtime.CompilerServices; + + +namespace JT1078.DotNetty.WebSocket +{ + public static class JT1078WebSocketDotnettyExtensions + { + public static IJT1078WebSocketBuilder AddJT1078WebSocketHost(this IJT1078Builder builder) + { + builder.Services.TryAddSingleton(); + builder.Services.AddScoped(); + builder.Services.AddHostedService(); + return new JT1078WebSocketBuilderDefault(builder); + } + } +} \ No newline at end of file diff --git a/src/JT1078.DotNetty.WebSocket/JT1078WebSocketServerHost.cs b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketServerHost.cs new file mode 100644 index 0000000..70eecba --- /dev/null +++ b/src/JT1078.DotNetty.WebSocket/JT1078WebSocketServerHost.cs @@ -0,0 +1,89 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Codecs.Http; +using DotNetty.Handlers.Streams; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.WebSocket.Handlers; +using JT1078.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.WebSocket +{ + /// + /// JT1078 WebSocket服务 + /// + internal class JT1078WebSocketServerHost : IHostedService + { + private readonly JT1078Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + private IByteBufferAllocator serverBufferAllocator; + private readonly IServiceProvider serviceProvider; + public JT1078WebSocketServerHost( + IServiceProvider serviceProvider, + ILoggerFactory loggerFactory, + IOptions configurationAccessor) + { + this.serviceProvider = serviceProvider; + configuration = configurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); + serverBufferAllocator = new PooledByteBufferAllocator(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, 8192) + .ChildOption(ChannelOption.Allocator, serverBufferAllocator) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + pipeline.AddLast(new HttpServerCodec()); + pipeline.AddLast(new HttpObjectAggregator(65536)); + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast("JT1078WebSocketServerHandler", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT1078 WebSocket Server start at {IPAddress.Any}:{configuration.WebSocketPort}."); + return bootstrap.BindAsync(configuration.WebSocketPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT1078DotNetty.sln b/src/JT1078DotNetty.sln index aa6c2f5..3942827 100644 --- a/src/JT1078DotNetty.sln +++ b/src/JT1078DotNetty.sln @@ -13,6 +13,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Udp", "JT10 EndProject Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{1C26DF6A-2978-46B7-B921-BB7776CC6EE8}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.DotNetty.WebSocket", "JT1078.DotNetty.WebSocket\JT1078.DotNetty.WebSocket.csproj", "{55181194-5AED-4C4B-8501-C8A17A3587B1}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -35,6 +37,10 @@ Global {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.Build.0 = Debug|Any CPU {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.ActiveCfg = Release|Any CPU {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.Build.0 = Release|Any CPU + {55181194-5AED-4C4B-8501-C8A17A3587B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {55181194-5AED-4C4B-8501-C8A17A3587B1}.Debug|Any CPU.Build.0 = Debug|Any CPU + {55181194-5AED-4C4B-8501-C8A17A3587B1}.Release|Any CPU.ActiveCfg = Release|Any CPU + {55181194-5AED-4C4B-8501-C8A17A3587B1}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE