@@ -11,6 +11,8 @@ namespace JT1078.DotNetty.Core.Configurations | |||||
public int UdpPort { get; set; } = 1808; | public int UdpPort { get; set; } = 1808; | ||||
public int WebSocketPort { get; set; } = 1818; | |||||
public int QuietPeriodSeconds { get; set; } = 1; | public int QuietPeriodSeconds { get; set; } = 1; | ||||
public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | ||||
@@ -0,0 +1,13 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.Core.Interfaces | |||||
{ | |||||
public interface IJT1078WebSocketBuilder | |||||
{ | |||||
IJT1078Builder Instance { get; } | |||||
IJT1078Builder Builder(); | |||||
} | |||||
} |
@@ -0,0 +1,31 @@ | |||||
using DotNetty.Transport.Channels; | |||||
using System; | |||||
using System.Net; | |||||
namespace JT1078.DotNetty.Core.Metadata | |||||
{ | |||||
public class JT1078WebSocketSession | |||||
{ | |||||
public JT1078WebSocketSession( | |||||
IChannel channel, | |||||
string userId) | |||||
{ | |||||
Channel = channel; | |||||
UserId = userId; | |||||
StartTime = DateTime.Now; | |||||
LastActiveTime = DateTime.Now; | |||||
} | |||||
public JT1078WebSocketSession() { } | |||||
public string UserId { get; set; } | |||||
public string AttachInfo { get; set; } | |||||
public IChannel Channel { get; set; } | |||||
public DateTime LastActiveTime { get; set; } | |||||
public DateTime StartTime { get; set; } | |||||
} | |||||
} |
@@ -0,0 +1,100 @@ | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Concurrent; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using DotNetty.Transport.Channels; | |||||
using JT1078.DotNetty.Core.Metadata; | |||||
namespace JT1078.DotNetty.Core.Session | |||||
{ | |||||
/// <summary> | |||||
/// JT1078 WebSocket会话管理 | |||||
/// </summary> | |||||
public class JT1078WebSocketSessionManager | |||||
{ | |||||
private readonly ILogger<JT1078WebSocketSessionManager> logger; | |||||
public JT1078WebSocketSessionManager( | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
logger = loggerFactory.CreateLogger<JT1078WebSocketSessionManager>(); | |||||
} | |||||
private ConcurrentDictionary<string, JT1078WebSocketSession> SessionIdDict = new ConcurrentDictionary<string, JT1078WebSocketSession>(StringComparer.OrdinalIgnoreCase); | |||||
public int SessionCount | |||||
{ | |||||
get | |||||
{ | |||||
return SessionIdDict.Count; | |||||
} | |||||
} | |||||
public JT1078WebSocketSession GetSession(string userId) | |||||
{ | |||||
if (string.IsNullOrEmpty(userId)) | |||||
return default; | |||||
if (SessionIdDict.TryGetValue(userId, out JT1078WebSocketSession targetSession)) | |||||
{ | |||||
return targetSession; | |||||
} | |||||
else | |||||
{ | |||||
return default; | |||||
} | |||||
} | |||||
public void TryAdd(string terminalPhoneNo,IChannel channel) | |||||
{ | |||||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078WebSocketSession oldSession)) | |||||
{ | |||||
oldSession.LastActiveTime = DateTime.Now; | |||||
oldSession.Channel = channel; | |||||
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); | |||||
} | |||||
else | |||||
{ | |||||
JT1078WebSocketSession session = new JT1078WebSocketSession(channel, terminalPhoneNo); | |||||
if (SessionIdDict.TryAdd(terminalPhoneNo, session)) | |||||
{ | |||||
} | |||||
} | |||||
} | |||||
public JT1078WebSocketSession RemoveSession(string terminalPhoneNo) | |||||
{ | |||||
if (string.IsNullOrEmpty(terminalPhoneNo)) return default; | |||||
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078WebSocketSession sessionRemove)) | |||||
{ | |||||
logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); | |||||
return sessionRemove; | |||||
} | |||||
else | |||||
{ | |||||
return default; | |||||
} | |||||
} | |||||
public void RemoveSessionByChannel(IChannel channel) | |||||
{ | |||||
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); | |||||
if (terminalPhoneNos.Count > 0) | |||||
{ | |||||
foreach (var key in terminalPhoneNos) | |||||
{ | |||||
SessionIdDict.TryRemove(key, out JT1078WebSocketSession sessionRemove); | |||||
} | |||||
string nos = string.Join(",", terminalPhoneNos); | |||||
logger.LogInformation($">>>{nos} Channel Remove."); | |||||
} | |||||
} | |||||
public IEnumerable<JT1078WebSocketSession> GetAll() | |||||
{ | |||||
return SessionIdDict.Select(s => s.Value).ToList(); | |||||
} | |||||
} | |||||
} | |||||
@@ -37,7 +37,7 @@ | |||||
</target> | </target> | ||||
</targets> | </targets> | ||||
<rules> | <rules> | ||||
<logger name="*" minlevel="Error" maxlevel="Fatal" writeTo="all,console"/> | |||||
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="all,console"/> | |||||
<logger name="JT1078TcpMessageHandlers" minlevel="Debug" maxlevel="Fatal" writeTo="JT1078TcpMessageHandlers,console"/> | <logger name="JT1078TcpMessageHandlers" minlevel="Debug" maxlevel="Fatal" writeTo="JT1078TcpMessageHandlers,console"/> | ||||
<logger name="JT1078TcpMessageHandlersHex" minlevel="Debug" maxlevel="Fatal" writeTo="JT1078TcpMessageHandlersHex,console"/> | <logger name="JT1078TcpMessageHandlersHex" minlevel="Debug" maxlevel="Fatal" writeTo="JT1078TcpMessageHandlersHex,console"/> | ||||
</rules> | </rules> |
@@ -0,0 +1,80 @@ | |||||
using System; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
public static partial class BinaryExtensions | |||||
{ | |||||
public static string ToHexString(this byte[] source) | |||||
{ | |||||
return HexUtil.DoHexDump(source, 0, source.Length).ToUpper(); | |||||
} | |||||
/// <summary> | |||||
/// 16进制字符串转16进制数组 | |||||
/// </summary> | |||||
/// <param name="hexString"></param> | |||||
/// <param name="separator"></param> | |||||
/// <returns></returns> | |||||
public static byte[] ToHexBytes(this string hexString) | |||||
{ | |||||
hexString = hexString.Replace(" ", ""); | |||||
byte[] buf = new byte[hexString.Length / 2]; | |||||
ReadOnlySpan<char> readOnlySpan = hexString.AsSpan(); | |||||
for (int i = 0; i < hexString.Length; i++) | |||||
{ | |||||
if (i % 2 == 0) | |||||
{ | |||||
buf[i / 2] = Convert.ToByte(readOnlySpan.Slice(i, 2).ToString(), 16); | |||||
} | |||||
} | |||||
return buf; | |||||
} | |||||
} | |||||
public static class HexUtil | |||||
{ | |||||
static readonly char[] HexdumpTable = new char[256 * 4]; | |||||
static HexUtil() | |||||
{ | |||||
char[] digits = "0123456789ABCDEF".ToCharArray(); | |||||
for (int i = 0; i < 256; i++) | |||||
{ | |||||
HexdumpTable[i << 1] = digits[(int)((uint)i >> 4 & 0x0F)]; | |||||
HexdumpTable[(i << 1) + 1] = digits[i & 0x0F]; | |||||
} | |||||
} | |||||
public static string DoHexDump(ReadOnlySpan<byte> buffer, int fromIndex, int length) | |||||
{ | |||||
if (length == 0) | |||||
{ | |||||
return ""; | |||||
} | |||||
int endIndex = fromIndex + length; | |||||
var buf = new char[length << 1]; | |||||
int srcIdx = fromIndex; | |||||
int dstIdx = 0; | |||||
for (; srcIdx < endIndex; srcIdx++, dstIdx += 2) | |||||
{ | |||||
Array.Copy(HexdumpTable, buffer[srcIdx] << 1, buf, dstIdx, 2); | |||||
} | |||||
return new string(buf); | |||||
} | |||||
public static string DoHexDump(byte[] array, int fromIndex, int length) | |||||
{ | |||||
if (length == 0) | |||||
{ | |||||
return ""; | |||||
} | |||||
int endIndex = fromIndex + length; | |||||
var buf = new char[length << 1]; | |||||
int srcIdx = fromIndex; | |||||
int dstIdx = 0; | |||||
for (; srcIdx < endIndex; srcIdx++, dstIdx += 2) | |||||
{ | |||||
Array.Copy(HexdumpTable, (array[srcIdx] & 0xFF) << 1, buf, dstIdx, 2); | |||||
} | |||||
return new string(buf); | |||||
} | |||||
} | |||||
} |
@@ -17,9 +17,13 @@ | |||||
<ItemGroup> | <ItemGroup> | ||||
<ProjectReference Include="..\JT1078.DotNetty.Tcp\JT1078.DotNetty.Tcp.csproj" /> | <ProjectReference Include="..\JT1078.DotNetty.Tcp\JT1078.DotNetty.Tcp.csproj" /> | ||||
<ProjectReference Include="..\JT1078.DotNetty.Udp\JT1078.DotNetty.Udp.csproj" /> | <ProjectReference Include="..\JT1078.DotNetty.Udp\JT1078.DotNetty.Udp.csproj" /> | ||||
<ProjectReference Include="..\JT1078.DotNetty.WebSocket\JT1078.DotNetty.WebSocket.csproj" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<None Update="2019-07-12.log"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
<None Update="appsettings.json"> | <None Update="appsettings.json"> | ||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | <CopyToOutputDirectory>Always</CopyToOutputDirectory> | ||||
</None> | </None> | ||||
@@ -0,0 +1,52 @@ | |||||
using DotNetty.Buffers; | |||||
using DotNetty.Codecs.Http.WebSockets; | |||||
using JT1078.DotNetty.Core.Session; | |||||
using Microsoft.Extensions.Hosting; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.IO; | |||||
using System.Linq; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using JT1078.Protocol; | |||||
namespace JT1078.DotNetty.TestHosting | |||||
{ | |||||
class JT1078WebSocketPushHostedService : IHostedService | |||||
{ | |||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | |||||
public JT1078WebSocketPushHostedService(JT1078WebSocketSessionManager jT1078WebSocketSessionManager) | |||||
{ | |||||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
var lines = File.ReadAllLines(Path.Combine(AppDomain.CurrentDomain.BaseDirectory, "2019-07-12.log")); | |||||
Task.Run(() => | |||||
{ | |||||
while (true) | |||||
{ | |||||
var session = jT1078WebSocketSessionManager.GetAll().FirstOrDefault(); | |||||
if (session != null) | |||||
{ | |||||
for (int i = 0; i < lines.Length; i++) | |||||
{ | |||||
var package = JT1078Serializer.Deserialize(lines[i].Split(',')[6].ToHexBytes()); | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(package.Bodies))); | |||||
} | |||||
} | |||||
Thread.Sleep(10000); | |||||
} | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -2,6 +2,7 @@ | |||||
using JT1078.DotNetty.Tcp; | using JT1078.DotNetty.Tcp; | ||||
using JT1078.DotNetty.TestHosting.Handlers; | using JT1078.DotNetty.TestHosting.Handlers; | ||||
using JT1078.DotNetty.Udp; | using JT1078.DotNetty.Udp; | ||||
using JT1078.DotNetty.WebSocket; | |||||
using Microsoft.Extensions.Configuration; | using Microsoft.Extensions.Configuration; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.DependencyInjection.Extensions; | using Microsoft.Extensions.DependencyInjection.Extensions; | ||||
@@ -57,12 +58,15 @@ namespace JT1078.DotNetty.TestHosting | |||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | services.AddSingleton<ILoggerFactory, LoggerFactory>(); | ||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | ||||
services.AddJT1078Core(hostContext.Configuration) | services.AddJT1078Core(hostContext.Configuration) | ||||
.AddJT1078TcpHost() | |||||
.Replace<JT1078TcpMessageHandlers>() | |||||
.Builder() | |||||
.AddJT1078UdpHost() | |||||
.Replace<JT1078UdpMessageHandlers>() | |||||
//.AddJT1078TcpHost() | |||||
//.Replace<JT1078TcpMessageHandlers>() | |||||
//.Builder() | |||||
//.AddJT1078UdpHost() | |||||
//.Replace<JT1078UdpMessageHandlers>() | |||||
//.Builder() | |||||
.AddJT1078WebSocketHost() | |||||
.Builder(); | .Builder(); | ||||
services.AddHostedService<JT1078WebSocketPushHostedService>(); | |||||
}); | }); | ||||
await serverHostBuilder.RunConsoleAsync(); | await serverHostBuilder.RunConsoleAsync(); | ||||
@@ -15,8 +15,9 @@ | |||||
"JT1078Configuration": { | "JT1078Configuration": { | ||||
"TcpPort": 1808, | "TcpPort": 1808, | ||||
"UdpPort": 1808, | "UdpPort": 1808, | ||||
"WebSocketPort": 1818, | |||||
"RemoteServerOptions": { | "RemoteServerOptions": { | ||||
"RemoteServers": ["172.16.19.209:16868"] | |||||
"RemoteServers": [ "172.16.19.209:16868" ] | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,156 @@ | |||||
using System; | |||||
using System.Diagnostics; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
using DotNetty.Buffers; | |||||
using DotNetty.Codecs.Http; | |||||
using DotNetty.Codecs.Http.WebSockets; | |||||
using DotNetty.Common.Utilities; | |||||
using DotNetty.Transport.Channels; | |||||
using static DotNetty.Codecs.Http.HttpVersion; | |||||
using static DotNetty.Codecs.Http.HttpResponseStatus; | |||||
using Microsoft.Extensions.Logging; | |||||
using JT1078.DotNetty.Core.Session; | |||||
using System.Text.RegularExpressions; | |||||
namespace JT1078.DotNetty.WebSocket.Handlers | |||||
{ | |||||
public sealed class JT1078WebSocketServerHandler : SimpleChannelInboundHandler<object> | |||||
{ | |||||
const string WebsocketPath = "/jt1078live"; | |||||
WebSocketServerHandshaker handshaker; | |||||
private readonly ILogger<JT1078WebSocketServerHandler> logger; | |||||
private readonly JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | |||||
public JT1078WebSocketServerHandler( | |||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | |||||
logger = loggerFactory.CreateLogger<JT1078WebSocketServerHandler>(); | |||||
} | |||||
public override void ChannelInactive(IChannelHandlerContext context) | |||||
{ | |||||
if (logger.IsEnabled(LogLevel.Information)) | |||||
{ | |||||
logger.LogInformation(context.Channel.Id.AsShortText()); | |||||
} | |||||
jT1078WebSocketSessionManager.RemoveSessionByChannel(context.Channel); | |||||
base.ChannelInactive(context); | |||||
} | |||||
protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) | |||||
{ | |||||
if (msg is IFullHttpRequest request) | |||||
{ | |||||
this.HandleHttpRequest(ctx, request); | |||||
} | |||||
else if (msg is WebSocketFrame frame) | |||||
{ | |||||
this.HandleWebSocketFrame(ctx, frame); | |||||
} | |||||
} | |||||
public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); | |||||
void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req) | |||||
{ | |||||
// Handle a bad request. | |||||
if (!req.Result.IsSuccess) | |||||
{ | |||||
SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest)); | |||||
return; | |||||
} | |||||
// Allow only GET methods. | |||||
if (!Equals(req.Method, HttpMethod.Get)) | |||||
{ | |||||
SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Forbidden)); | |||||
return; | |||||
} | |||||
if ("/favicon.ico".Equals(req.Uri)) | |||||
{ | |||||
var res = new DefaultFullHttpResponse(Http11, NotFound); | |||||
SendHttpResponse(ctx, req, res); | |||||
return; | |||||
} | |||||
// Handshake | |||||
var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); | |||||
this.handshaker = wsFactory.NewHandshaker(req); | |||||
if (this.handshaker == null) | |||||
{ | |||||
WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); | |||||
} | |||||
else | |||||
{ | |||||
this.handshaker.HandshakeAsync(ctx.Channel, req); | |||||
var uriSpan = req.Uri.AsSpan(); | |||||
var userId = uriSpan.Slice(uriSpan.IndexOf('?')).ToString().Split('=')[1]; | |||||
jT1078WebSocketSessionManager.TryAdd(userId, ctx.Channel); | |||||
} | |||||
} | |||||
void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame) | |||||
{ | |||||
// Check for closing frame | |||||
if (frame is CloseWebSocketFrame) | |||||
{ | |||||
this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain()); | |||||
return; | |||||
} | |||||
if (frame is PingWebSocketFrame) | |||||
{ | |||||
ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain())); | |||||
return; | |||||
} | |||||
if (frame is TextWebSocketFrame) | |||||
{ | |||||
// Echo the frame | |||||
ctx.WriteAsync(frame.Retain()); | |||||
return; | |||||
} | |||||
if (frame is BinaryWebSocketFrame) | |||||
{ | |||||
// Echo the frame | |||||
ctx.WriteAsync(frame.Retain()); | |||||
} | |||||
} | |||||
static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res) | |||||
{ | |||||
// Generate an error page if response getStatus code is not OK (200). | |||||
if (res.Status.Code != 200) | |||||
{ | |||||
IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString())); | |||||
res.Content.WriteBytes(buf); | |||||
buf.Release(); | |||||
HttpUtil.SetContentLength(res, res.Content.ReadableBytes); | |||||
} | |||||
// Send the response and close the connection if necessary. | |||||
Task task = ctx.Channel.WriteAndFlushAsync(res); | |||||
if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200) | |||||
{ | |||||
task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(), | |||||
ctx, TaskContinuationOptions.ExecuteSynchronously); | |||||
} | |||||
} | |||||
public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e) | |||||
{ | |||||
logger.LogError(e, ctx.Channel.Id.AsShortText()); | |||||
jT1078WebSocketSessionManager.RemoveSessionByChannel(ctx.Channel); | |||||
ctx.CloseAsync(); | |||||
} | |||||
static string GetWebSocketLocation(IFullHttpRequest req) | |||||
{ | |||||
bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value); | |||||
string location= value.ToString() + WebsocketPath; | |||||
return "ws://" + location; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,22 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.0</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" /> | |||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\JT1078.DotNetty.Core\JT1078.DotNetty.Core.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,24 @@ | |||||
using JT1078.DotNetty.Core.Interfaces; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.WebSocket | |||||
{ | |||||
class JT1078WebSocketBuilderDefault : IJT1078WebSocketBuilder | |||||
{ | |||||
public IJT1078Builder Instance { get; } | |||||
public JT1078WebSocketBuilderDefault(IJT1078Builder builder) | |||||
{ | |||||
Instance = builder; | |||||
} | |||||
public IJT1078Builder Builder() | |||||
{ | |||||
return Instance; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,22 @@ | |||||
using JT1078.DotNetty.Core.Codecs; | |||||
using JT1078.DotNetty.Core.Interfaces; | |||||
using JT1078.DotNetty.Core.Session; | |||||
using JT1078.DotNetty.WebSocket.Handlers; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
using System.Runtime.CompilerServices; | |||||
namespace JT1078.DotNetty.WebSocket | |||||
{ | |||||
public static class JT1078WebSocketDotnettyExtensions | |||||
{ | |||||
public static IJT1078WebSocketBuilder AddJT1078WebSocketHost(this IJT1078Builder builder) | |||||
{ | |||||
builder.Services.TryAddSingleton<JT1078WebSocketSessionManager>(); | |||||
builder.Services.AddScoped<JT1078WebSocketServerHandler>(); | |||||
builder.Services.AddHostedService<JT1078WebSocketServerHost>(); | |||||
return new JT1078WebSocketBuilderDefault(builder); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,89 @@ | |||||
using DotNetty.Buffers; | |||||
using DotNetty.Codecs; | |||||
using DotNetty.Codecs.Http; | |||||
using DotNetty.Handlers.Streams; | |||||
using DotNetty.Handlers.Timeout; | |||||
using DotNetty.Transport.Bootstrapping; | |||||
using DotNetty.Transport.Channels; | |||||
using DotNetty.Transport.Libuv; | |||||
using JT1078.DotNetty.Core.Codecs; | |||||
using JT1078.DotNetty.Core.Configurations; | |||||
using JT1078.DotNetty.WebSocket.Handlers; | |||||
using JT1078.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Net; | |||||
using System.Runtime.InteropServices; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.DotNetty.WebSocket | |||||
{ | |||||
/// <summary> | |||||
/// JT1078 WebSocket服务 | |||||
/// </summary> | |||||
internal class JT1078WebSocketServerHost : IHostedService | |||||
{ | |||||
private readonly JT1078Configuration configuration; | |||||
private readonly ILogger<JT1078WebSocketServerHost> logger; | |||||
private DispatcherEventLoopGroup bossGroup; | |||||
private WorkerEventLoopGroup workerGroup; | |||||
private IChannel bootstrapChannel; | |||||
private IByteBufferAllocator serverBufferAllocator; | |||||
private readonly IServiceProvider serviceProvider; | |||||
public JT1078WebSocketServerHost( | |||||
IServiceProvider serviceProvider, | |||||
ILoggerFactory loggerFactory, | |||||
IOptions<JT1078Configuration> configurationAccessor) | |||||
{ | |||||
this.serviceProvider = serviceProvider; | |||||
configuration = configurationAccessor.Value; | |||||
logger=loggerFactory.CreateLogger<JT1078WebSocketServerHost>(); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
bossGroup = new DispatcherEventLoopGroup(); | |||||
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); | |||||
serverBufferAllocator = new PooledByteBufferAllocator(); | |||||
ServerBootstrap bootstrap = new ServerBootstrap(); | |||||
bootstrap.Group(bossGroup, workerGroup); | |||||
bootstrap.Channel<TcpServerChannel>(); | |||||
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) | |||||
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) | |||||
{ | |||||
bootstrap | |||||
.Option(ChannelOption.SoReuseport, true) | |||||
.ChildOption(ChannelOption.SoReuseaddr, true); | |||||
} | |||||
bootstrap | |||||
.Option(ChannelOption.SoBacklog, 8192) | |||||
.ChildOption(ChannelOption.Allocator, serverBufferAllocator) | |||||
.ChildHandler(new ActionChannelInitializer<IChannel>(channel => | |||||
{ | |||||
IChannelPipeline pipeline = channel.Pipeline; | |||||
pipeline.AddLast(new HttpServerCodec()); | |||||
pipeline.AddLast(new HttpObjectAggregator(65536)); | |||||
using (var scope = serviceProvider.CreateScope()) | |||||
{ | |||||
pipeline.AddLast("JT1078WebSocketServerHandler", scope.ServiceProvider.GetRequiredService<JT1078WebSocketServerHandler>()); | |||||
} | |||||
})); | |||||
logger.LogInformation($"JT1078 WebSocket Server start at {IPAddress.Any}:{configuration.WebSocketPort}."); | |||||
return bootstrap.BindAsync(configuration.WebSocketPort) | |||||
.ContinueWith(i => bootstrapChannel = i.Result); | |||||
} | |||||
public async Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
await bootstrapChannel.CloseAsync(); | |||||
var quietPeriod = configuration.QuietPeriodTimeSpan; | |||||
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; | |||||
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||||
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); | |||||
} | |||||
} | |||||
} |
@@ -13,6 +13,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Udp", "JT10 | |||||
EndProject | EndProject | ||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{1C26DF6A-2978-46B7-B921-BB7776CC6EE8}" | Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{1C26DF6A-2978-46B7-B921-BB7776CC6EE8}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.DotNetty.WebSocket", "JT1078.DotNetty.WebSocket\JT1078.DotNetty.WebSocket.csproj", "{55181194-5AED-4C4B-8501-C8A17A3587B1}" | |||||
EndProject | |||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
Debug|Any CPU = Debug|Any CPU | Debug|Any CPU = Debug|Any CPU | ||||
@@ -35,6 +37,10 @@ Global | |||||
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.Build.0 = Debug|Any CPU | {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.ActiveCfg = Release|Any CPU | {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.Build.0 = Release|Any CPU | {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{55181194-5AED-4C4B-8501-C8A17A3587B1}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{55181194-5AED-4C4B-8501-C8A17A3587B1}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{55181194-5AED-4C4B-8501-C8A17A3587B1}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{55181194-5AED-4C4B-8501-C8A17A3587B1}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||