@@ -0,0 +1,13 @@ | |||||
using DotNetty.Codecs.Http; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Security.Principal; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.Core.Interfaces | |||||
{ | |||||
public interface IJT1078Authorization | |||||
{ | |||||
bool Authorization(IFullHttpRequest request, out IPrincipal principal); | |||||
} | |||||
} |
@@ -9,5 +9,6 @@ namespace JT1078.DotNetty.Core.Interfaces | |||||
{ | { | ||||
IJT1078Builder Instance { get; } | IJT1078Builder Instance { get; } | ||||
IJT1078Builder Builder(); | IJT1078Builder Builder(); | ||||
IJT1078WebSocketBuilder Replace<T>() where T : IJT1078Authorization; | |||||
} | } | ||||
} | } |
@@ -9,6 +9,7 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | <PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | ||||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | <PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | ||||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | <PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | ||||
@@ -21,79 +21,43 @@ namespace JT1078.DotNetty.Core.Session | |||||
logger = loggerFactory.CreateLogger<JT1078WebSocketSessionManager>(); | logger = loggerFactory.CreateLogger<JT1078WebSocketSessionManager>(); | ||||
} | } | ||||
private ConcurrentDictionary<string, JT1078WebSocketSession> SessionIdDict = new ConcurrentDictionary<string, JT1078WebSocketSession>(StringComparer.OrdinalIgnoreCase); | |||||
private ConcurrentDictionary<string, JT1078WebSocketSession> SessionDict = new ConcurrentDictionary<string,JT1078WebSocketSession>(); | |||||
public int SessionCount | public int SessionCount | ||||
{ | { | ||||
get | get | ||||
{ | { | ||||
return SessionIdDict.Count; | |||||
return SessionDict.Count; | |||||
} | } | ||||
} | } | ||||
public JT1078WebSocketSession GetSession(string userId) | |||||
public List<JT1078WebSocketSession> GetSessions(string userId) | |||||
{ | { | ||||
if (string.IsNullOrEmpty(userId)) | |||||
return default; | |||||
if (SessionIdDict.TryGetValue(userId, out JT1078WebSocketSession targetSession)) | |||||
{ | |||||
return targetSession; | |||||
} | |||||
else | |||||
{ | |||||
return default; | |||||
} | |||||
} | |||||
public void TryAdd(string terminalPhoneNo,IChannel channel) | |||||
{ | |||||
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078WebSocketSession oldSession)) | |||||
{ | |||||
oldSession.LastActiveTime = DateTime.Now; | |||||
oldSession.Channel = channel; | |||||
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); | |||||
} | |||||
else | |||||
{ | |||||
JT1078WebSocketSession session = new JT1078WebSocketSession(channel, terminalPhoneNo); | |||||
if (SessionIdDict.TryAdd(terminalPhoneNo, session)) | |||||
{ | |||||
} | |||||
} | |||||
return SessionDict.Where(m => m.Value.UserId == userId).Select(m=>m.Value).ToList(); | |||||
} | } | ||||
public JT1078WebSocketSession RemoveSession(string terminalPhoneNo) | |||||
public void TryAdd(string userId,IChannel channel) | |||||
{ | { | ||||
if (string.IsNullOrEmpty(terminalPhoneNo)) return default; | |||||
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078WebSocketSession sessionRemove)) | |||||
SessionDict.TryAdd(channel.Id.AsShortText(), new JT1078WebSocketSession(channel, userId)); | |||||
if (logger.IsEnabled(LogLevel.Information)) | |||||
{ | { | ||||
logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); | |||||
return sessionRemove; | |||||
logger.LogInformation($">>>{userId},{channel.Id.AsShortText()} Channel Connection."); | |||||
} | } | ||||
else | |||||
{ | |||||
return default; | |||||
} | |||||
} | } | ||||
public void RemoveSessionByChannel(IChannel channel) | public void RemoveSessionByChannel(IChannel channel) | ||||
{ | { | ||||
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); | |||||
if (terminalPhoneNos.Count > 0) | |||||
if (channel.Open&& SessionDict.TryRemove(channel.Id.AsShortText(), out var session)) | |||||
{ | { | ||||
foreach (var key in terminalPhoneNos) | |||||
if (logger.IsEnabled(LogLevel.Information)) | |||||
{ | { | ||||
SessionIdDict.TryRemove(key, out JT1078WebSocketSession sessionRemove); | |||||
logger.LogInformation($">>>{session.UserId},{session.Channel.Id.AsShortText()} Channel Remove."); | |||||
} | } | ||||
string nos = string.Join(",", terminalPhoneNos); | |||||
logger.LogInformation($">>>{nos} Channel Remove."); | |||||
} | } | ||||
} | } | ||||
public IEnumerable<JT1078WebSocketSession> GetAll() | public IEnumerable<JT1078WebSocketSession> GetAll() | ||||
{ | { | ||||
return SessionIdDict.Select(s => s.Value).ToList(); | |||||
return SessionDict.Select(s => s.Value).ToList(); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -38,37 +38,44 @@ namespace JT1078.DotNetty.TestHosting | |||||
{ | { | ||||
foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable()) | foreach (var item in jT1078DataService.DataBlockingCollection.GetConsumingEnumerable()) | ||||
{ | { | ||||
//if (jT1078WebSocketSessionManager.GetAll().Count() > 0) | |||||
//{ | |||||
// Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session => | |||||
// { | |||||
// //if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) | |||||
// //{ | |||||
// // SubcontractKey.TryRemove(item.SIM, out _); | |||||
// // SubcontractKey.TryAdd(item.SIM, item.Bodies); | |||||
// //} | |||||
// //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) | |||||
// //{ | |||||
// // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
// // { | |||||
// // SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); | |||||
// // } | |||||
// //} | |||||
// //else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) | |||||
// //{ | |||||
// // if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
// // { | |||||
// // session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray()))); | |||||
// // } | |||||
// //} | |||||
// //else | |||||
// //{ | |||||
// session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); | |||||
// // } | |||||
// }); | |||||
//} | |||||
if (jT1078WebSocketSessionManager.GetAll().Count() > 0) | if (jT1078WebSocketSessionManager.GetAll().Count() > 0) | ||||
{ | { | ||||
Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session => | Parallel.ForEach(jT1078WebSocketSessionManager.GetAll(), new ParallelOptions { MaxDegreeOfParallelism = 5 }, session => | ||||
{ | |||||
//if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的第一个包) | |||||
//{ | |||||
// SubcontractKey.TryRemove(item.SIM, out _); | |||||
// SubcontractKey.TryAdd(item.SIM, item.Bodies); | |||||
//} | |||||
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的中间包) | |||||
//{ | |||||
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
// { | |||||
// SubcontractKey[item.SIM] = buffer.Concat(item.Bodies).ToArray(); | |||||
// } | |||||
//} | |||||
//else if (item.Label3.SubpackageType == JT1078SubPackageType.分包处理时的最后一个包) | |||||
//{ | |||||
// if (SubcontractKey.TryGetValue(item.SIM, out var buffer)) | |||||
// { | |||||
// session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(buffer.Concat(item.Bodies).ToArray()))); | |||||
// } | |||||
//} | |||||
//else | |||||
//{ | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); | |||||
// } | |||||
{ | |||||
session.Channel.WriteAndFlushAsync(new BinaryWebSocketFrame(Unpooled.WrappedBuffer(item.Bodies))); | |||||
}); | }); | ||||
} | } | ||||
} | } | ||||
} | } | ||||
catch | catch | ||||
@@ -66,6 +66,7 @@ namespace JT1078.DotNetty.TestHosting | |||||
//.Replace<JT1078UdpMessageHandlers>() | //.Replace<JT1078UdpMessageHandlers>() | ||||
//.Builder() | //.Builder() | ||||
.AddJT1078WebSocketHost() | .AddJT1078WebSocketHost() | ||||
// .Replace() | |||||
.Builder(); | .Builder(); | ||||
services.AddHostedService<JT1078WebSocketPushHostedService>(); | services.AddHostedService<JT1078WebSocketPushHostedService>(); | ||||
}); | }); | ||||
@@ -0,0 +1,32 @@ | |||||
using DotNetty.Codecs.Http; | |||||
using JT1078.DotNetty.Core.Interfaces; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Security.Claims; | |||||
using System.Security.Principal; | |||||
using System.Text; | |||||
namespace JT1078.DotNetty.WebSocket.Authorization | |||||
{ | |||||
class JT1078AuthorizationDefault : IJT1078Authorization | |||||
{ | |||||
public bool Authorization(IFullHttpRequest request, out IPrincipal principal) | |||||
{ | |||||
var uriSpan = request.Uri.AsSpan(); | |||||
var uriParamStr = uriSpan.Slice(uriSpan.IndexOf('?')+1).ToString().ToLower(); | |||||
var uriParams = uriParamStr.Split('&'); | |||||
var tokenParam = uriParams.FirstOrDefault(m => m.Contains("token")); | |||||
if (!string.IsNullOrEmpty(tokenParam)) | |||||
{ | |||||
principal = new ClaimsPrincipal(new GenericIdentity(tokenParam.Split('=')[1])); | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
principal = null; | |||||
return false; | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -12,6 +12,7 @@ using static DotNetty.Codecs.Http.HttpResponseStatus; | |||||
using Microsoft.Extensions.Logging; | using Microsoft.Extensions.Logging; | ||||
using JT1078.DotNetty.Core.Session; | using JT1078.DotNetty.Core.Session; | ||||
using System.Text.RegularExpressions; | using System.Text.RegularExpressions; | ||||
using JT1078.DotNetty.Core.Interfaces; | |||||
namespace JT1078.DotNetty.WebSocket.Handlers | namespace JT1078.DotNetty.WebSocket.Handlers | ||||
{ | { | ||||
@@ -24,12 +25,15 @@ namespace JT1078.DotNetty.WebSocket.Handlers | |||||
private readonly ILogger<JT1078WebSocketServerHandler> logger; | private readonly ILogger<JT1078WebSocketServerHandler> logger; | ||||
private readonly JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | private readonly JT1078WebSocketSessionManager jT1078WebSocketSessionManager; | ||||
private readonly IJT1078Authorization iJT1078Authorization; | |||||
public JT1078WebSocketServerHandler( | public JT1078WebSocketServerHandler( | ||||
JT1078WebSocketSessionManager jT1078WebSocketSessionManager, | JT1078WebSocketSessionManager jT1078WebSocketSessionManager, | ||||
IJT1078Authorization iJT1078Authorization, | |||||
ILoggerFactory loggerFactory) | ILoggerFactory loggerFactory) | ||||
{ | { | ||||
this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | this.jT1078WebSocketSessionManager = jT1078WebSocketSessionManager; | ||||
this.iJT1078Authorization = iJT1078Authorization; | |||||
logger = loggerFactory.CreateLogger<JT1078WebSocketServerHandler>(); | logger = loggerFactory.CreateLogger<JT1078WebSocketServerHandler>(); | ||||
} | } | ||||
@@ -46,7 +50,7 @@ namespace JT1078.DotNetty.WebSocket.Handlers | |||||
protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) | protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) | ||||
{ | { | ||||
if (msg is IFullHttpRequest request) | if (msg is IFullHttpRequest request) | ||||
{ | |||||
{ | |||||
this.HandleHttpRequest(ctx, request); | this.HandleHttpRequest(ctx, request); | ||||
} | } | ||||
else if (msg is WebSocketFrame frame) | else if (msg is WebSocketFrame frame) | ||||
@@ -77,19 +81,24 @@ namespace JT1078.DotNetty.WebSocket.Handlers | |||||
SendHttpResponse(ctx, req, res); | SendHttpResponse(ctx, req, res); | ||||
return; | return; | ||||
} | } | ||||
// Handshake | |||||
var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); | |||||
this.handshaker = wsFactory.NewHandshaker(req); | |||||
if (this.handshaker == null) | |||||
if (iJT1078Authorization.Authorization(req, out var principal)) | |||||
{ | { | ||||
WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); | |||||
// Handshake | |||||
var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); | |||||
this.handshaker = wsFactory.NewHandshaker(req); | |||||
if (this.handshaker == null) | |||||
{ | |||||
WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); | |||||
} | |||||
else | |||||
{ | |||||
this.handshaker.HandshakeAsync(ctx.Channel, req); | |||||
jT1078WebSocketSessionManager.TryAdd(principal.Identity.Name, ctx.Channel); | |||||
} | |||||
} | } | ||||
else | |||||
{ | |||||
this.handshaker.HandshakeAsync(ctx.Channel, req); | |||||
var uriSpan = req.Uri.AsSpan(); | |||||
var userId = uriSpan.Slice(uriSpan.IndexOf('?')).ToString().Split('=')[1]; | |||||
jT1078WebSocketSessionManager.TryAdd(userId, ctx.Channel); | |||||
else { | |||||
SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Unauthorized)); | |||||
return; | |||||
} | } | ||||
} | } | ||||
@@ -141,6 +150,7 @@ namespace JT1078.DotNetty.WebSocket.Handlers | |||||
public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e) | public override void ExceptionCaught(IChannelHandlerContext ctx, Exception e) | ||||
{ | { | ||||
logger.LogError(e, ctx.Channel.Id.AsShortText()); | logger.LogError(e, ctx.Channel.Id.AsShortText()); | ||||
ctx.Channel.WriteAndFlushAsync(new DefaultFullHttpResponse(Http11, InternalServerError)); | |||||
jT1078WebSocketSessionManager.RemoveSessionByChannel(ctx.Channel); | jT1078WebSocketSessionManager.RemoveSessionByChannel(ctx.Channel); | ||||
ctx.CloseAsync(); | ctx.CloseAsync(); | ||||
} | } | ||||
@@ -20,5 +20,11 @@ namespace JT1078.DotNetty.WebSocket | |||||
{ | { | ||||
return Instance; | return Instance; | ||||
} | } | ||||
public IJT1078WebSocketBuilder Replace<T>() where T : IJT1078Authorization | |||||
{ | |||||
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078Authorization), typeof(T), ServiceLifetime.Singleton)); | |||||
return this; | |||||
} | |||||
} | } | ||||
} | } |
@@ -1,6 +1,8 @@ | |||||
using JT1078.DotNetty.Core.Codecs; | using JT1078.DotNetty.Core.Codecs; | ||||
using JT1078.DotNetty.Core.Impl; | |||||
using JT1078.DotNetty.Core.Interfaces; | using JT1078.DotNetty.Core.Interfaces; | ||||
using JT1078.DotNetty.Core.Session; | using JT1078.DotNetty.Core.Session; | ||||
using JT1078.DotNetty.WebSocket.Authorization; | |||||
using JT1078.DotNetty.WebSocket.Handlers; | using JT1078.DotNetty.WebSocket.Handlers; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.DependencyInjection.Extensions; | using Microsoft.Extensions.DependencyInjection.Extensions; | ||||
@@ -14,6 +16,7 @@ namespace JT1078.DotNetty.WebSocket | |||||
public static IJT1078WebSocketBuilder AddJT1078WebSocketHost(this IJT1078Builder builder) | public static IJT1078WebSocketBuilder AddJT1078WebSocketHost(this IJT1078Builder builder) | ||||
{ | { | ||||
builder.Services.TryAddSingleton<JT1078WebSocketSessionManager>(); | builder.Services.TryAddSingleton<JT1078WebSocketSessionManager>(); | ||||
builder.Services.TryAddSingleton<IJT1078Authorization,JT1078AuthorizationDefault>(); | |||||
builder.Services.AddScoped<JT1078WebSocketServerHandler>(); | builder.Services.AddScoped<JT1078WebSocketServerHandler>(); | ||||
builder.Services.AddHostedService<JT1078WebSocketServerHost>(); | builder.Services.AddHostedService<JT1078WebSocketServerHost>(); | ||||
return new JT1078WebSocketBuilderDefault(builder); | return new JT1078WebSocketBuilderDefault(builder); | ||||