diff --git a/src/JT1078.Gateway.Abstractions/IJT1078Authorization.cs b/src/JT1078.Gateway.Abstractions/IJT1078Authorization.cs new file mode 100644 index 0000000..3264d5d --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078Authorization.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Security.Principal; +using System.Text; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078Authorization + { + bool Authorization(HttpListenerContext context, out IPrincipal principal); + } +} diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj index fedc579..9cc33a1 100644 --- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -25,7 +25,7 @@ - + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index 7788f62..a6cfc23 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -30,6 +30,7 @@ namespace JT1078.Gateway.TestNormalHosting services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) + .AddHttp() .AddUdp() .AddTcp() .AddNormal() diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json index 08cd388..92898f8 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json @@ -11,5 +11,8 @@ "Default": "Trace" } } + }, + "JT1078Configuration": { + "HttpPort": 15555 } } diff --git a/src/JT1078.Gateway/Codecs/JT1078TcpDecoder.cs b/src/JT1078.Gateway/Codecs/JT1078TcpDecoder.cs deleted file mode 100644 index e28cfd7..0000000 --- a/src/JT1078.Gateway/Codecs/JT1078TcpDecoder.cs +++ /dev/null @@ -1,20 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Codecs; -using System.Collections.Generic; -using DotNetty.Transport.Channels; -using JT1078.Protocol; -using System; - -namespace JT1078.Gateway.Codecs -{ - public class JT1078TcpDecoder : ByteToMessageDecoder - { - protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) - { - byte[] buffer = new byte[input.Capacity+4]; - input.ReadBytes(buffer, 4, input.Capacity); - Array.Copy(JT1078Package.FH_Bytes, 0,buffer, 0, 4); - output.Add(buffer); - } - } -} diff --git a/src/JT1078.Gateway/Codecs/JT1078UdpDecoder.cs b/src/JT1078.Gateway/Codecs/JT1078UdpDecoder.cs deleted file mode 100644 index e07ac56..0000000 --- a/src/JT1078.Gateway/Codecs/JT1078UdpDecoder.cs +++ /dev/null @@ -1,21 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Codecs; -using DotNetty.Transport.Channels; -using System.Collections.Generic; -using DotNetty.Transport.Channels.Sockets; -using JT1078.Gateway.Metadata; - -namespace JT1078.Gateway.Codecs -{ - public class JT1078UdpDecoder : MessageToMessageDecoder - { - protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List output) - { - if (!message.Content.IsReadable()) return; - IByteBuffer byteBuffer = message.Content; - byte[] buffer = new byte[byteBuffer.ReadableBytes]; - byteBuffer.ReadBytes(buffer); - output.Add(new JT1078UdpPackage(buffer, message.Sender)); - } - } -} diff --git a/src/JT1078.Gateway/Configurations/JT1078RemoteServerOptions.cs b/src/JT1078.Gateway/Configurations/JT1078RemoteServerOptions.cs deleted file mode 100644 index 77aec37..0000000 --- a/src/JT1078.Gateway/Configurations/JT1078RemoteServerOptions.cs +++ /dev/null @@ -1,12 +0,0 @@ -using Microsoft.Extensions.Options; -using System.Collections.Generic; - -namespace JT1078.Gateway.Configurations -{ - public class JT1078RemoteServerOptions:IOptions - { - public List RemoteServers { get; set; } - - public JT1078RemoteServerOptions Value => this; - } -} diff --git a/src/JT1078.Gateway/Http/Authorization/JT1078AuthorizationDefault.cs b/src/JT1078.Gateway/Http/Authorization/JT1078AuthorizationDefault.cs deleted file mode 100644 index a947a04..0000000 --- a/src/JT1078.Gateway/Http/Authorization/JT1078AuthorizationDefault.cs +++ /dev/null @@ -1,32 +0,0 @@ -using DotNetty.Codecs.Http; -using JT1078.Gateway.Interfaces; -using System; -using System.Collections.Generic; -using System.Linq; -using System.Security.Claims; -using System.Security.Principal; -using System.Text; - -namespace JT1078.Gateway.Http.Authorization -{ - class JT1078AuthorizationDefault : IJT1078Authorization - { - public bool Authorization(IFullHttpRequest request, out IPrincipal principal) - { - var uriSpan = request.Uri.AsSpan(); - var uriParamStr = uriSpan.Slice(uriSpan.IndexOf('?')+1).ToString().ToLower(); - var uriParams = uriParamStr.Split('&'); - var tokenParam = uriParams.FirstOrDefault(m => m.Contains("token")); - if (!string.IsNullOrEmpty(tokenParam)) - { - principal = new ClaimsPrincipal(new GenericIdentity(tokenParam.Split('=')[1])); - return true; - } - else - { - principal = null; - return false; - } - } - } -} diff --git a/src/JT1078.Gateway/Http/Handlers/JT1078HttpServerHandler.cs b/src/JT1078.Gateway/Http/Handlers/JT1078HttpServerHandler.cs deleted file mode 100644 index 5d19ea2..0000000 --- a/src/JT1078.Gateway/Http/Handlers/JT1078HttpServerHandler.cs +++ /dev/null @@ -1,183 +0,0 @@ -using System; -using System.Diagnostics; -using System.Text; -using System.Threading.Tasks; -using DotNetty.Buffers; -using DotNetty.Codecs.Http; -using DotNetty.Codecs.Http.WebSockets; -using DotNetty.Common.Utilities; -using DotNetty.Transport.Channels; -using static DotNetty.Codecs.Http.HttpVersion; -using static DotNetty.Codecs.Http.HttpResponseStatus; -using Microsoft.Extensions.Logging; -using System.Text.RegularExpressions; -using JT1078.Gateway.Session; -using JT1078.Gateway.Interfaces; - -namespace JT1078.Gateway.Http.Handlers -{ - public sealed class JT1078HttpServerHandler : SimpleChannelInboundHandler - { - const string WebsocketPath = "/jt1078live"; - WebSocketServerHandshaker handshaker; - private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty"); - private static readonly AsciiString DateEntity = HttpHeaderNames.Date; - private static readonly AsciiString ServerEntity = HttpHeaderNames.Server; - - private readonly ILogger logger; - - private readonly JT1078HttpSessionManager jT1078HttpSessionManager; - - private readonly IJT1078Authorization iJT1078Authorization; - - private readonly IHttpMiddleware httpMiddleware; - - public JT1078HttpServerHandler( - JT1078HttpSessionManager jT1078HttpSessionManager, - IJT1078Authorization iJT1078Authorization, - ILoggerFactory loggerFactory, - IHttpMiddleware httpMiddleware = null) - { - this.jT1078HttpSessionManager = jT1078HttpSessionManager; - this.iJT1078Authorization = iJT1078Authorization; - this.httpMiddleware = httpMiddleware; - logger = loggerFactory.CreateLogger(); - } - - public override void ChannelInactive(IChannelHandlerContext context) - { - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation(context.Channel.Id.AsShortText()); - } - jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel); - base.ChannelInactive(context); - } - - protected override void ChannelRead0(IChannelHandlerContext ctx, object msg) - { - if (msg is IFullHttpRequest request) - { - this.HandleHttpRequest(ctx, request); - } - else if (msg is WebSocketFrame frame) - { - this.HandleWebSocketFrame(ctx, frame); - } - } - - public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); - - void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req) - { - // Handle a bad request. - if (!req.Result.IsSuccess) - { - SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest)); - return; - } - if ("/favicon.ico".Equals(req.Uri)) - { - var res = new DefaultFullHttpResponse(Http11, NotFound); - SendHttpResponse(ctx, req, res); - return; - } - if (iJT1078Authorization.Authorization(req, out var principal)) - { - if (req.Uri.StartsWith(WebsocketPath)) - { - // Handshake - var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024); - this.handshaker = wsFactory.NewHandshaker(req); - if (this.handshaker == null) - { - WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel); - } - else - { - this.handshaker.HandshakeAsync(ctx.Channel, req); - jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel); - httpMiddleware?.Next(ctx, req, principal); - } - } - else - { - jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel); - httpMiddleware?.Next(ctx, req, principal); - } - } - else { - SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Unauthorized)); - return; - } - } - - void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame) - { - // Check for closing frame - if (frame is CloseWebSocketFrame) - { - this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain()); - return; - } - if (frame is PingWebSocketFrame) - { - ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain())); - return; - } - if (frame is TextWebSocketFrame) - { - // Echo the frame - ctx.WriteAsync(frame.Retain()); - return; - } - if (frame is BinaryWebSocketFrame) - { - // Echo the frame - ctx.WriteAsync(frame.Retain()); - } - } - - static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res) - { - // Generate an error page if response getStatus code is not OK (200). - if (res.Status.Code != 200) - { - res.Headers.Set(ServerEntity, ServerName); - res.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff")); - IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString())); - res.Content.WriteBytes(buf); - buf.Release(); - HttpUtil.SetContentLength(res, res.Content.ReadableBytes); - } - // Send the response and close the connection if necessary. - Task task = ctx.Channel.WriteAndFlushAsync(res); - if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200) - { - task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(), ctx, TaskContinuationOptions.ExecuteSynchronously); - } - } - - public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) - { - logger.LogError(exception, context.Channel.Id.AsShortText()); - context.Channel.WriteAndFlushAsync(new DefaultFullHttpResponse(Http11, InternalServerError)); - jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel); - CloseAsync(context); - base.ExceptionCaught(context, exception); - } - - public override Task CloseAsync(IChannelHandlerContext context) - { - jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel); - return base.CloseAsync(context); - } - - static string GetWebSocketLocation(IFullHttpRequest req) - { - bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value); - string location= value.ToString() + WebsocketPath; - return "ws://" + location; - } - } -} diff --git a/src/JT1078.Gateway/Http/JT1078HttpBuilderDefault.cs b/src/JT1078.Gateway/Http/JT1078HttpBuilderDefault.cs deleted file mode 100644 index b1f9094..0000000 --- a/src/JT1078.Gateway/Http/JT1078HttpBuilderDefault.cs +++ /dev/null @@ -1,36 +0,0 @@ -using JT1078.Gateway.Interfaces; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Http -{ - class JT1078HttpBuilderDefault : IJT1078HttpBuilder - { - public IJT1078Builder Instance { get; } - - public JT1078HttpBuilderDefault(IJT1078Builder builder) - { - Instance = builder; - } - - public IJT1078Builder Builder() - { - return Instance; - } - - public IJT1078HttpBuilder Replace() where T : IJT1078Authorization - { - Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078Authorization), typeof(T), ServiceLifetime.Singleton)); - return this; - } - - public IJT1078HttpBuilder UseHttpMiddleware() where T : IHttpMiddleware - { - Instance.Services.TryAdd(new ServiceDescriptor(typeof(IHttpMiddleware), typeof(T), ServiceLifetime.Singleton)); - return this; - } - } -} diff --git a/src/JT1078.Gateway/Http/JT1078HttpDotnettyExtensions.cs b/src/JT1078.Gateway/Http/JT1078HttpDotnettyExtensions.cs deleted file mode 100644 index dc9907e..0000000 --- a/src/JT1078.Gateway/Http/JT1078HttpDotnettyExtensions.cs +++ /dev/null @@ -1,24 +0,0 @@ -using JT1078.Gateway.Http; -using JT1078.Gateway.Http.Authorization; -using JT1078.Gateway.Http.Handlers; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Session; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System.Runtime.CompilerServices; - - -namespace JT1078.Gateway -{ - public static class JT1078HttpDotnettyExtensions - { - public static IJT1078HttpBuilder AddHttpHost(this IJT1078Builder builder) - { - builder.Services.TryAddSingleton(); - builder.Services.TryAddSingleton(); - builder.Services.AddScoped(); - builder.Services.AddHostedService(); - return new JT1078HttpBuilderDefault(builder); - } - } -} \ No newline at end of file diff --git a/src/JT1078.Gateway/Http/JT1078HttpServerHost.cs b/src/JT1078.Gateway/Http/JT1078HttpServerHost.cs deleted file mode 100644 index f26e44f..0000000 --- a/src/JT1078.Gateway/Http/JT1078HttpServerHost.cs +++ /dev/null @@ -1,98 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Codecs; -using DotNetty.Codecs.Http; -using DotNetty.Codecs.Http.Cors; -using DotNetty.Common.Utilities; -using DotNetty.Handlers.Streams; -using DotNetty.Handlers.Timeout; -using DotNetty.Transport.Bootstrapping; -using DotNetty.Transport.Channels; -using DotNetty.Transport.Libuv; -using JT1078.Gateway.Configurations; -using JT1078.Gateway.Http.Handlers; -using JT1078.Protocol; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Net; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Http -{ - /// - /// JT1078 http服务 - /// - internal class JT1078HttpServerHost : IHostedService - { - private readonly JT1078Configuration configuration; - private readonly ILogger logger; - private DispatcherEventLoopGroup bossGroup; - private WorkerEventLoopGroup workerGroup; - private IChannel bootstrapChannel; - private IByteBufferAllocator serverBufferAllocator; - private readonly IServiceProvider serviceProvider; - public JT1078HttpServerHost( - IServiceProvider serviceProvider, - ILoggerFactory loggerFactory, - IOptions configurationAccessor) - { - this.serviceProvider = serviceProvider; - configuration = configurationAccessor.Value; - logger=loggerFactory.CreateLogger(); - } - - public Task StartAsync(CancellationToken cancellationToken) - { - bossGroup = new DispatcherEventLoopGroup(); - workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); - serverBufferAllocator = new PooledByteBufferAllocator(); - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.Group(bossGroup, workerGroup); - bootstrap.Channel(); - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) - || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - bootstrap - .Option(ChannelOption.SoReuseport, true) - .ChildOption(ChannelOption.SoReuseaddr, true); - } - bootstrap - .Option(ChannelOption.SoBacklog, 8192) - .ChildOption(ChannelOption.Allocator, serverBufferAllocator) - .ChildHandler(new ActionChannelInitializer(channel => - { - IChannelPipeline pipeline = channel.Pipeline; - pipeline.AddLast(new HttpServerCodec()); - pipeline.AddLast(new CorsHandler(CorsConfigBuilder - .ForAnyOrigin() - .AllowNullOrigin() - .AllowedRequestMethods(HttpMethod.Get, HttpMethod.Post, HttpMethod.Options, HttpMethod.Delete) - .AllowedRequestHeaders((AsciiString)"origin", (AsciiString)"range", (AsciiString)"accept-encoding", (AsciiString)"referer", (AsciiString)"Cache-Control", (AsciiString)"X-Proxy-Authorization", (AsciiString)"X-Requested-With", (AsciiString)"Content-Type") - .ExposeHeaders((StringCharSequence)"Server", (StringCharSequence)"range", (StringCharSequence)"Content-Length", (StringCharSequence)"Content-Range") - .AllowCredentials() - .Build())); - pipeline.AddLast(new HttpObjectAggregator(int.MaxValue)); - using (var scope = serviceProvider.CreateScope()) - { - pipeline.AddLast("JT1078HttpServerHandler", scope.ServiceProvider.GetRequiredService()); - } - })); - logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{configuration.HttpPort}."); - return bootstrap.BindAsync(configuration.HttpPort) - .ContinueWith(i => bootstrapChannel = i.Result); - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - await bootstrapChannel.CloseAsync(); - var quietPeriod = configuration.QuietPeriodTimeSpan; - var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; - await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); - await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); - } - } -} diff --git a/src/JT1078.Gateway/Impl/JT1078AuthorizationDefault.cs b/src/JT1078.Gateway/Impl/JT1078AuthorizationDefault.cs new file mode 100644 index 0000000..2009c59 --- /dev/null +++ b/src/JT1078.Gateway/Impl/JT1078AuthorizationDefault.cs @@ -0,0 +1,28 @@ +using JT1078.Gateway.Abstractions; +using System; +using System.Collections.Generic; +using System.Net; +using System.Security.Claims; +using System.Security.Principal; +using System.Text; + +namespace JT1078.Gateway.Impl +{ + class JT1078AuthorizationDefault : IJT1078Authorization + { + public bool Authorization(HttpListenerContext context, out IPrincipal principal) + { + var token = context.Request.QueryString.Get("token"); + if (!string.IsNullOrEmpty(token)) + { + principal = new ClaimsPrincipal(new GenericIdentity(token)); + return true; + } + else + { + principal = null; + return false; + } + } + } +} diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 651c32a..2c63c6d 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -38,11 +38,6 @@ - - - - - diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index 9c5e755..1f599cc 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -50,6 +50,13 @@ namespace JT1078.Gateway return builder; } + public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddHostedService(); + return builder; + } + public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder) { return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder); diff --git a/src/JT1078.Gateway/JT1078HttpServer.cs b/src/JT1078.Gateway/JT1078HttpServer.cs new file mode 100644 index 0000000..bad2553 --- /dev/null +++ b/src/JT1078.Gateway/JT1078HttpServer.cs @@ -0,0 +1,191 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Metadata; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO; +using System.Net; +using System.Net.WebSockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.Gateway +{ + public class JT1078HttpServer : IHostedService + { + private readonly ILogger Logger; + + private readonly JT1078Configuration Configuration; + + private readonly IJT1078Authorization authorization; + + private HttpListener listener; + + public JT1078HttpServer( + IOptions jT1078ConfigurationAccessor, + IJT1078Authorization authorization, + ILoggerFactory loggerFactory) + { + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor.Value; + this.authorization = authorization; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + if (!HttpListener.IsSupported) + { + Logger.LogWarning("Windows XP SP2 or Server 2003 is required to use the HttpListener class."); + return Task.CompletedTask; + } + listener = new HttpListener(); + listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous; + listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/"); + listener.Start(); + Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}."); + Task.Factory.StartNew(async() => + { + while (listener.IsListening) + { + var context = await listener.GetContextAsync(); + try + { + if (authorization.Authorization(context,out var principal)) + { + //new JT1078HttpContext(context, principal); + //todo:session manager + await ProcessRequestAsync(context); + } + else + { + await Http401(context); + } + } + catch (Exception ex) + { + await Http500(context); + Logger.LogError(ex, ex.StackTrace); + } + } + }, cancellationToken); + return Task.CompletedTask; + } + + private async ValueTask ProcessRequestAsync(HttpListenerContext context) + { + if(context.Request.RawUrl.StartsWith("/favicon.ico")) + { + Http404(context); + } + if (Logger.IsEnabled(LogLevel.Trace)) + { + Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}"); + } + if (context.Request.IsWebSocketRequest) + { + HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null); + //todo:websocket context 管理 + await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None); + await Task.Factory.StartNew(async(state) => + { + + //https://www.bejson.com/httputil/websocket/ + //ws://127.0.0.1:15555?token=22 + var websocketContext = state as HttpListenerWebSocketContext; + while(websocketContext.WebSocket.State == WebSocketState.Open || + websocketContext.WebSocket.State == WebSocketState.Connecting) + { + var buffer = ArrayPool.Shared.Rent(256); + try + { + WebSocketReceiveResult receiveResult = await websocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None); + if (receiveResult.EndOfMessage) + { + if (receiveResult.Count > 0) + { + var data = buffer.AsSpan().Slice(0, receiveResult.Count).ToArray(); + if (Logger.IsEnabled(LogLevel.Trace)) + { + Logger.LogTrace($"[ws receive]:{Encoding.UTF8.GetString(data)}"); + } + await websocketContext.WebSocket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None); + } + } + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + if (Logger.IsEnabled(LogLevel.Trace)) + { + Logger.LogTrace($"[ws close]:{wsContext}"); + } + //todo:session close notice + await wsContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None); + }, wsContext); + } + else + { + //todo:set http chunk + //todo:session close notice + //var body = await new StreamReader(context.Request.InputStream).ReadToEndAsync(); + var options = context.Request.QueryString; + //var keys = options.AllKeys; + byte[] b = Encoding.UTF8.GetBytes("ack"); + context.Response.StatusCode = 200; + context.Response.KeepAlive = true; + context.Response.ContentLength64 = b.Length; + await context.Response.OutputStream.WriteAsync(b, 0, b.Length); + context.Response.Close(); + } + } + + private async ValueTask Http401(HttpListenerContext context) + { + byte[] b = Encoding.UTF8.GetBytes("auth error"); + context.Response.StatusCode = (int)HttpStatusCode.Unauthorized; + context.Response.KeepAlive = false; + context.Response.ContentLength64 = b.Length; + var output = context.Response.OutputStream; + await output.WriteAsync(b, 0, b.Length); + context.Response.Close(); + } + + private void Http404(HttpListenerContext context) + { + context.Response.StatusCode = (int)HttpStatusCode.NotFound; + context.Response.KeepAlive = false; + context.Response.Close(); + } + + private async ValueTask Http500(HttpListenerContext context) + { + byte[] b = Encoding.UTF8.GetBytes("inner error"); + context.Response.StatusCode = (int)HttpStatusCode.InternalServerError; + context.Response.KeepAlive = false; + context.Response.ContentLength64 = b.Length; + var output = context.Response.OutputStream; + await output.WriteAsync(b, 0, b.Length); + context.Response.Close(); + } + + public Task StopAsync(CancellationToken cancellationToken) + { + try + { + listener.Stop(); + } + catch (System.ObjectDisposedException ex) + { + + } + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs index eb40eb5..dc1950c 100644 --- a/src/JT1078.Gateway/JT1078TcpServer.cs +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -132,6 +132,10 @@ namespace JT1078.Gateway break; } writer.Advance(bytesRead); + } + catch (System.ObjectDisposedException ex) + { + } catch (OperationCanceledException ex) { diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs index f021c06..d9a4715 100644 --- a/src/JT1078.Gateway/JT1078UdpServer.cs +++ b/src/JT1078.Gateway/JT1078UdpServer.cs @@ -99,6 +99,10 @@ namespace JT1078.Gateway var segment = new ArraySegment(buffer); var result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, server.LocalEndPoint); ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result); + } + catch (System.ObjectDisposedException ex) + { + } catch (AggregateException ex) { @@ -152,9 +156,16 @@ namespace JT1078.Gateway public Task StopAsync(CancellationToken cancellationToken) { Logger.LogInformation("JT1078 Udp Server Stop"); - if (server?.Connected ?? false) - server.Shutdown(SocketShutdown.Both); - server?.Close(); + try + { + if (server?.Connected ?? false) + server.Shutdown(SocketShutdown.Both); + server?.Close(); + } + catch (System.ObjectDisposedException ex) + { + + } return Task.CompletedTask; } } diff --git a/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs b/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs index f2ee348..38c6480 100644 --- a/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs +++ b/src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs @@ -47,8 +47,11 @@ namespace JT1078.Gateway.Jobs { SessionManager.RemoveBySessionId(item); } - Logger.LogInformation($"[Check Receive Timeout]"); - Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}"); + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Check Receive Timeout]"); + Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}"); + } } catch (Exception ex) { diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs new file mode 100644 index 0000000..a4cdc2a --- /dev/null +++ b/src/JT1078.Gateway/Metadata/JT1078HttpContext.cs @@ -0,0 +1,19 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Security.Principal; +using System.Text; + +namespace JT1078.Gateway.Metadata +{ + public class JT1078HttpContext + { + public HttpListenerContext Context { get; } + public IPrincipal User { get; } + public JT1078HttpContext(HttpListenerContext context, IPrincipal user) + { + Context = context; + User = user; + } + } +} diff --git a/src/JT1078.Gateway/Metadata/JT1078HttpSession.cs b/src/JT1078.Gateway/Metadata/JT1078HttpSession.cs deleted file mode 100644 index c310ed0..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078HttpSession.cs +++ /dev/null @@ -1,31 +0,0 @@ -using DotNetty.Transport.Channels; -using System; -using System.Net; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078HttpSession - { - public JT1078HttpSession( - IChannel channel, - string userId) - { - Channel = channel; - UserId = userId; - StartTime = DateTime.Now; - LastActiveTime = DateTime.Now; - } - - public JT1078HttpSession() { } - - public string UserId { get; set; } - - public string AttachInfo { get; set; } - - public IChannel Channel { get; set; } - - public DateTime LastActiveTime { get; set; } - - public DateTime StartTime { get; set; } - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078Request.cs b/src/JT1078.Gateway/Metadata/JT1078Request.cs deleted file mode 100644 index 85985ab..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078Request.cs +++ /dev/null @@ -1,20 +0,0 @@ -using JT1078.Protocol; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078Request - { - public JT1078Request(JT1078Package package,byte[] src) - { - Package = package; - Src = src; - } - - public JT1078Package Package { get; } - - public byte[] Src { get; } - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078Response.cs b/src/JT1078.Gateway/Metadata/JT1078Response.cs deleted file mode 100644 index 1397616..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078Response.cs +++ /dev/null @@ -1,10 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078Response - { - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078UdpPackage.cs b/src/JT1078.Gateway/Metadata/JT1078UdpPackage.cs deleted file mode 100644 index cbc5d36..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078UdpPackage.cs +++ /dev/null @@ -1,20 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Net; -using System.Text; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078UdpPackage - { - public JT1078UdpPackage(byte[] buffer, EndPoint sender) - { - Buffer = buffer; - Sender = sender; - } - - public byte[] Buffer { get; } - - public EndPoint Sender { get; } - } -} diff --git a/src/JT1078.Gateway/Session/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Session/JT1078HttpSessionManager.cs deleted file mode 100644 index c6633dd..0000000 --- a/src/JT1078.Gateway/Session/JT1078HttpSessionManager.cs +++ /dev/null @@ -1,64 +0,0 @@ -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using DotNetty.Transport.Channels; -using JT1078.Gateway.Metadata; - -namespace JT1078.Gateway.Session -{ - /// - /// JT1078 http会话管理 - /// - public class JT1078HttpSessionManager - { - private readonly ILogger logger; - - public JT1078HttpSessionManager( - ILoggerFactory loggerFactory) - { - logger = loggerFactory.CreateLogger(); - } - - private ConcurrentDictionary SessionDict = new ConcurrentDictionary(); - - public int SessionCount - { - get - { - return SessionDict.Count; - } - } - - public List GetSessions(string userId) - { - return SessionDict.Where(m => m.Value.UserId == userId).Select(m=>m.Value).ToList(); - } - - public void TryAdd(string userId,IChannel channel) - { - SessionDict.TryAdd(channel.Id.AsShortText(), new JT1078HttpSession(channel, userId)); - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation($">>>{userId},{channel.Id.AsShortText()} Channel Connection."); - } - } - - public void RemoveSessionByChannel(IChannel channel) - { - if (channel.Open&& SessionDict.TryRemove(channel.Id.AsShortText(), out var session)) - { - if (logger.IsEnabled(LogLevel.Information)) - { - logger.LogInformation($">>>{session.UserId},{session.Channel.Id.AsShortText()} Channel Remove."); - } - } - } - public List GetAll() - { - return SessionDict.Select(s => s.Value).ToList(); - } - } -} - diff --git a/src/JT1078.Gateway/Session/JT1078TcpSessionManager.cs b/src/JT1078.Gateway/Session/JT1078TcpSessionManager.cs deleted file mode 100644 index 9475cd9..0000000 --- a/src/JT1078.Gateway/Session/JT1078TcpSessionManager.cs +++ /dev/null @@ -1,100 +0,0 @@ -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using DotNetty.Transport.Channels; -using JT1078.Gateway.Metadata; - -namespace JT1078.Gateway.Session -{ - /// - /// JT1078 Tcp会话管理 - /// - public class JT1078TcpSessionManager - { - private readonly ILogger logger; - - public JT1078TcpSessionManager( - ILoggerFactory loggerFactory) - { - logger = loggerFactory.CreateLogger(); - } - - private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - - public int SessionCount - { - get - { - return SessionIdDict.Count; - } - } - - public JT1078TcpSession GetSession(string terminalPhoneNo) - { - if (string.IsNullOrEmpty(terminalPhoneNo)) - return default; - if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession targetSession)) - { - return targetSession; - } - else - { - return default; - } - } - - public void TryAdd(string terminalPhoneNo,IChannel channel) - { - if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession oldSession)) - { - oldSession.LastActiveTime = DateTime.Now; - oldSession.Channel = channel; - SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); - } - else - { - JT1078TcpSession session = new JT1078TcpSession(channel, terminalPhoneNo); - if (SessionIdDict.TryAdd(terminalPhoneNo, session)) - { - - } - } - } - - public JT1078TcpSession RemoveSession(string terminalPhoneNo) - { - if (string.IsNullOrEmpty(terminalPhoneNo)) return default; - if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078TcpSession sessionRemove)) - { - logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); - return sessionRemove; - } - else - { - return default; - } - } - - public void RemoveSessionByChannel(IChannel channel) - { - var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); - if (terminalPhoneNos.Count > 0) - { - foreach (var key in terminalPhoneNos) - { - SessionIdDict.TryRemove(key, out JT1078TcpSession sessionRemove); - } - string nos = string.Join(",", terminalPhoneNos); - logger.LogInformation($">>>{nos} Channel Remove."); - } - } - - public IEnumerable GetAll() - { - return SessionIdDict.Select(s => s.Value).ToList(); - } - } -} - diff --git a/src/JT1078.Gateway/Session/JT1078UdpSessionManager.cs b/src/JT1078.Gateway/Session/JT1078UdpSessionManager.cs deleted file mode 100644 index 77f749e..0000000 --- a/src/JT1078.Gateway/Session/JT1078UdpSessionManager.cs +++ /dev/null @@ -1,115 +0,0 @@ -using Microsoft.Extensions.Logging; -using System; -using System.Collections.Concurrent; -using System.Collections.Generic; -using System.Linq; -using DotNetty.Transport.Channels; -using Microsoft.Extensions.Options; -using System.Net; -using JT1078.Gateway.Metadata; - -namespace JT1078.Gateway.Session -{ - /// - /// JT1078 udp会话管理 - /// 估计要轮询下 - /// - public class JT1078UdpSessionManager - { - private readonly ILogger logger; - - public JT1078UdpSessionManager( - ILoggerFactory loggerFactory) - { - logger = loggerFactory.CreateLogger(); - } - - private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - - public int SessionCount - { - get - { - return SessionIdDict.Count; - } - } - - public JT1078UdpSession GetSession(string terminalPhoneNo) - { - if (string.IsNullOrEmpty(terminalPhoneNo)) - return default; - if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession targetSession)) - { - return targetSession; - } - else - { - return default; - } - } - - public void TryAdd(IChannel channel,EndPoint sender,string terminalPhoneNo) - { - //1.先判断是否在缓存里面 - if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession UdpSession)) - { - UdpSession.LastActiveTime=DateTime.Now; - UdpSession.Sender = sender; - UdpSession.Channel = channel; - SessionIdDict.TryUpdate(terminalPhoneNo, UdpSession, UdpSession); - } - else - { - SessionIdDict.TryAdd(terminalPhoneNo, new JT1078UdpSession(channel, sender, terminalPhoneNo)); - } - } - - public void Heartbeat(string terminalPhoneNo) - { - if (string.IsNullOrEmpty(terminalPhoneNo)) return; - if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession oldSession)) - { - oldSession.LastActiveTime = DateTime.Now; - SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); - } - } - - public JT1078UdpSession RemoveSession(string terminalPhoneNo) - { - //设备离线可以进行通知 - //使用Redis 发布订阅 - if (string.IsNullOrEmpty(terminalPhoneNo)) return default; - if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078UdpSession SessionRemove)) - { - logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); - return SessionRemove; - } - else - { - return default; - } - } - - public void RemoveSessionByChannel(IChannel channel) - { - //设备离线可以进行通知 - //使用Redis 发布订阅 - var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); - if (terminalPhoneNos.Count > 0) - { - foreach (var key in terminalPhoneNos) - { - SessionIdDict.TryRemove(key, out JT1078UdpSession SessionRemove); - } - string nos = string.Join(",", terminalPhoneNos); - logger.LogInformation($">>>{nos} Channel Remove."); - } - } - - public IEnumerable GetAll() - { - return SessionIdDict.Select(s => s.Value).ToList(); - } - } -} - diff --git a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpConnectionHandler.cs b/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpConnectionHandler.cs deleted file mode 100644 index eb35ca5..0000000 --- a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpConnectionHandler.cs +++ /dev/null @@ -1,99 +0,0 @@ -using DotNetty.Handlers.Timeout; -using DotNetty.Transport.Channels; -using JT1078.Gateway.Session; -using Microsoft.Extensions.Logging; -using System; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Tcp.Handlers -{ - /// - /// JT1078服务通道处理程序 - /// - internal class JT1078TcpConnectionHandler : ChannelHandlerAdapter - { - private readonly ILogger logger; - - private readonly JT1078TcpSessionManager SessionManager; - - public JT1078TcpConnectionHandler( - JT1078TcpSessionManager sessionManager, - ILoggerFactory loggerFactory) - { - this.SessionManager = sessionManager; - logger = loggerFactory.CreateLogger(); - } - - /// - /// 通道激活 - /// - /// - public override void ChannelActive(IChannelHandlerContext context) - { - string channelId = context.Channel.Id.AsShortText(); - if (logger.IsEnabled(LogLevel.Debug)) - logger.LogDebug($"<<<{ channelId } Successful client connection to server."); - base.ChannelActive(context); - } - - /// - /// 设备主动断开 - /// - /// - public override void ChannelInactive(IChannelHandlerContext context) - { - string channelId = context.Channel.Id.AsShortText(); - if (logger.IsEnabled(LogLevel.Debug)) - logger.LogDebug($">>>{ channelId } The client disconnects from the server."); - SessionManager.RemoveSessionByChannel(context.Channel); - base.ChannelInactive(context); - } - - /// - /// 服务器主动断开 - /// - /// - /// - public override Task CloseAsync(IChannelHandlerContext context) - { - string channelId = context.Channel.Id.AsShortText(); - if (logger.IsEnabled(LogLevel.Debug)) - logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); - SessionManager.RemoveSessionByChannel(context.Channel); - return base.CloseAsync(context); - } - - public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush(); - - /// - /// 超时策略 - /// - /// - /// - public override void UserEventTriggered(IChannelHandlerContext context, object evt) - { - IdleStateEvent idleStateEvent = evt as IdleStateEvent; - if (idleStateEvent != null) - { - if(idleStateEvent.State== IdleState.ReaderIdle) - { - string channelId = context.Channel.Id.AsShortText(); - logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); - // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 - SessionManager.RemoveSessionByChannel(context.Channel); - context.CloseAsync(); - } - } - base.UserEventTriggered(context, evt); - } - - public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) - { - string channelId = context.Channel.Id.AsShortText(); - logger.LogError(exception,$"{channelId} {exception.Message}" ); - SessionManager.RemoveSessionByChannel(context.Channel); - context.CloseAsync(); - } - } -} - diff --git a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs b/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs deleted file mode 100644 index 0e31e1c..0000000 --- a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Metadata; -using JT1078.Protocol; - -namespace JT1078.Gateway.Tcp.Handlers -{ - class JT1078TcpMessageProcessorEmptyImpl : IJT1078TcpMessageHandlers - { - public Task Processor(JT1078Request request) - { - return Task.FromResult(default); - } - } -} diff --git a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpServerHandler.cs b/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpServerHandler.cs deleted file mode 100644 index 3110a25..0000000 --- a/src/JT1078.Gateway/Tcp/Handlers/JT1078TcpServerHandler.cs +++ /dev/null @@ -1,69 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Transport.Channels; -using System; -using Microsoft.Extensions.Logging; -using JT1078.Protocol; -using JT1078.Gateway.Session; -using JT1078.Gateway.Session.Services; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Metadata; -using JT1078.Gateway.Enums; - -namespace JT1078.Gateway.Tcp.Handlers -{ - /// - /// JT1078 服务端处理程序 - /// - internal class JT1078TcpServerHandler : SimpleChannelInboundHandler - { - private readonly JT1078TcpSessionManager SessionManager; - - private readonly JT1078AtomicCounterService AtomicCounterService; - - private readonly ILogger logger; - - private readonly IJT1078TcpMessageHandlers handlers; - - public JT1078TcpServerHandler( - IJT1078TcpMessageHandlers handlers, - ILoggerFactory loggerFactory, - JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, - JT1078TcpSessionManager sessionManager) - { - this.handlers = handlers; - this.SessionManager = sessionManager; - this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp); - logger = loggerFactory.CreateLogger(); - } - - - protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) - { - try - { - if (logger.IsEnabled(LogLevel.Trace)) - { - logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); - logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg)); - } - JT1078Package package = JT1078Serializer.Deserialize(msg); - AtomicCounterService.MsgSuccessIncrement(); - SessionManager.TryAdd(package.SIM, ctx.Channel); - handlers.Processor(new JT1078Request(package, msg)); - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); - } - } - catch (Exception ex) - { - AtomicCounterService.MsgFailIncrement(); - if (logger.IsEnabled(LogLevel.Error)) - { - logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString()); - logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); - } - } - } - } -} diff --git a/src/JT1078.Gateway/Tcp/JT1078TcpBuilderDefault.cs b/src/JT1078.Gateway/Tcp/JT1078TcpBuilderDefault.cs deleted file mode 100644 index 4cdf481..0000000 --- a/src/JT1078.Gateway/Tcp/JT1078TcpBuilderDefault.cs +++ /dev/null @@ -1,30 +0,0 @@ -using JT1078.Gateway.Interfaces; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Tcp -{ - class JT1078TcpBuilderDefault : IJT1078TcpBuilder - { - public IJT1078Builder Instance { get; } - - public JT1078TcpBuilderDefault(IJT1078Builder builder) - { - Instance = builder; - } - - public IJT1078Builder Builder() - { - return Instance; - } - - public IJT1078TcpBuilder Replace() where T : IJT1078TcpMessageHandlers - { - Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078TcpMessageHandlers), typeof(T), ServiceLifetime.Singleton)); - return this; - } - } -} diff --git a/src/JT1078.Gateway/Tcp/JT1078TcpExtensions.cs b/src/JT1078.Gateway/Tcp/JT1078TcpExtensions.cs deleted file mode 100644 index 8cf17dc..0000000 --- a/src/JT1078.Gateway/Tcp/JT1078TcpExtensions.cs +++ /dev/null @@ -1,26 +0,0 @@ -using JT1078.Gateway.Codecs; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Session; -using JT1078.Gateway.Tcp; -using JT1078.Gateway.Tcp.Handlers; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System.Runtime.CompilerServices; - - -namespace JT1078.Gateway -{ - public static class JT1078TcpExtensions - { - public static IJT1078TcpBuilder AddTcpHost(this IJT1078Builder builder) - { - builder.Services.TryAddSingleton(); - builder.Services.TryAddScoped(); - builder.Services.TryAddScoped(); - builder.Services.TryAddSingleton(); - builder.Services.TryAddScoped(); - builder.Services.AddHostedService(); - return new JT1078TcpBuilderDefault(builder); - } - } -} \ No newline at end of file diff --git a/src/JT1078.Gateway/Tcp/JT1078TcpServerHost.cs b/src/JT1078.Gateway/Tcp/JT1078TcpServerHost.cs deleted file mode 100644 index 7982b29..0000000 --- a/src/JT1078.Gateway/Tcp/JT1078TcpServerHost.cs +++ /dev/null @@ -1,94 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Codecs; -using DotNetty.Handlers.Timeout; -using DotNetty.Transport.Bootstrapping; -using DotNetty.Transport.Channels; -using DotNetty.Transport.Libuv; -using JT1078.Gateway.Codecs; -using JT1078.Gateway.Configurations; -using JT1078.Gateway.Tcp.Handlers; -using JT1078.Protocol; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Net; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Tcp -{ - /// - /// JT1078 Tcp网关服务 - /// - internal class JT1078TcpServerHost : IHostedService - { - private readonly IServiceProvider serviceProvider; - private readonly JT1078Configuration configuration; - private readonly ILogger logger; - private DispatcherEventLoopGroup bossGroup; - private WorkerEventLoopGroup workerGroup; - private IChannel bootstrapChannel; - private IByteBufferAllocator serverBufferAllocator; - - public JT1078TcpServerHost( - IServiceProvider provider, - ILoggerFactory loggerFactory, - IOptions configurationAccessor) - { - serviceProvider = provider; - configuration = configurationAccessor.Value; - logger=loggerFactory.CreateLogger(); - } - - public Task StartAsync(CancellationToken cancellationToken) - { - bossGroup = new DispatcherEventLoopGroup(); - workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); - serverBufferAllocator = new PooledByteBufferAllocator(); - ServerBootstrap bootstrap = new ServerBootstrap(); - bootstrap.Group(bossGroup, workerGroup); - bootstrap.Channel(); - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) - || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - bootstrap - .Option(ChannelOption.SoReuseport, true) - .ChildOption(ChannelOption.SoReuseaddr, true); - } - bootstrap - .Option(ChannelOption.SoBacklog, configuration.SoBacklog) - .ChildOption(ChannelOption.Allocator, serverBufferAllocator) - .ChildHandler(new ActionChannelInitializer(channel => - { - IChannelPipeline pipeline = channel.Pipeline; - using (var scope = serviceProvider.CreateScope()) - { - channel.Pipeline.AddLast("JT1078TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,true, - Unpooled.CopiedBuffer(JT1078Package.FH_Bytes))); - channel.Pipeline.AddLast("JT1078TcpDecode", scope.ServiceProvider.GetRequiredService()); - channel.Pipeline.AddLast("JT1078SystemIdleState", new IdleStateHandler( - configuration.ReaderIdleTimeSeconds, - configuration.WriterIdleTimeSeconds, - configuration.AllIdleTimeSeconds)); - channel.Pipeline.AddLast("JT1078TcpConnection", scope.ServiceProvider.GetRequiredService()); - channel.Pipeline.AddLast("JT1078TcpService", scope.ServiceProvider.GetRequiredService()); - } - })); - logger.LogInformation($"JT1078 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); - return bootstrap.BindAsync(configuration.TcpPort) - .ContinueWith(i => bootstrapChannel = i.Result); - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - await bootstrapChannel.CloseAsync(); - var quietPeriod = configuration.QuietPeriodTimeSpan; - var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; - await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); - await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); - } - } -} diff --git a/src/JT1078.Gateway/Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs b/src/JT1078.Gateway/Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs deleted file mode 100644 index b00b8f6..0000000 --- a/src/JT1078.Gateway/Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs +++ /dev/null @@ -1,18 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading.Tasks; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Metadata; -using JT1078.Protocol; - -namespace JT1078.Gateway.Udp.Handlers -{ - class JT1078UdpMessageProcessorEmptyImpl : IJT1078UdpMessageHandlers - { - public Task Processor(JT1078Request request) - { - return Task.FromResult(default); - } - } -} diff --git a/src/JT1078.Gateway/Udp/Handlers/JT1078UdpServerHandler.cs b/src/JT1078.Gateway/Udp/Handlers/JT1078UdpServerHandler.cs deleted file mode 100644 index f931cf2..0000000 --- a/src/JT1078.Gateway/Udp/Handlers/JT1078UdpServerHandler.cs +++ /dev/null @@ -1,70 +0,0 @@ -using DotNetty.Buffers; -using DotNetty.Transport.Channels; -using System; -using Microsoft.Extensions.Logging; -using JT1078.Protocol; -using JT1078.Gateway.Metadata; -using JT1078.Gateway.Session; -using JT1078.Gateway.Session.Services; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Enums; - -namespace JT1078.Gateway.Udp.Handlers -{ - /// - /// JT1078 Udp服务端处理程序 - /// - internal class JT1078UdpServerHandler : SimpleChannelInboundHandler - { - private readonly ILogger logger; - - private readonly JT1078UdpSessionManager SessionManager; - - private readonly JT1078AtomicCounterService AtomicCounterService; - - private readonly IJT1078UdpMessageHandlers handlers; - public JT1078UdpServerHandler( - ILoggerFactory loggerFactory, - JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, - IJT1078UdpMessageHandlers handlers, - JT1078UdpSessionManager sessionManager) - { - this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp); - this.SessionManager = sessionManager; - logger = loggerFactory.CreateLogger(); - this.handlers = handlers; - } - - protected override void ChannelRead0(IChannelHandlerContext ctx, JT1078UdpPackage msg) - { - try - { - if (logger.IsEnabled(LogLevel.Trace)) - { - logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); - logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg.Buffer)); - } - JT1078Package package = JT1078Serializer.Deserialize(msg.Buffer); - AtomicCounterService.MsgSuccessIncrement(); - SessionManager.TryAdd(ctx.Channel, msg.Sender, package.SIM); - handlers.Processor(new JT1078Request(package, msg.Buffer)); - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); - } - } - catch (Exception ex) - { - AtomicCounterService.MsgFailIncrement(); - if (logger.IsEnabled(LogLevel.Error)) - { - logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString()); - logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); - } - } - } - - public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); - - } -} diff --git a/src/JT1078.Gateway/Udp/JT1078UdpBuilderDefault.cs b/src/JT1078.Gateway/Udp/JT1078UdpBuilderDefault.cs deleted file mode 100644 index 2685544..0000000 --- a/src/JT1078.Gateway/Udp/JT1078UdpBuilderDefault.cs +++ /dev/null @@ -1,30 +0,0 @@ -using JT1078.Gateway.Interfaces; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Udp -{ - class JT1078UdpBuilderDefault : IJT1078UdpBuilder - { - public IJT1078Builder Instance { get; } - - public JT1078UdpBuilderDefault(IJT1078Builder builder) - { - Instance = builder; - } - - public IJT1078Builder Builder() - { - return Instance; - } - - public IJT1078UdpBuilder Replace() where T : IJT1078UdpMessageHandlers - { - Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078UdpMessageHandlers), typeof(T), ServiceLifetime.Singleton)); - return this; - } - } -} diff --git a/src/JT1078.Gateway/Udp/JT1078UdpExtensions.cs b/src/JT1078.Gateway/Udp/JT1078UdpExtensions.cs deleted file mode 100644 index 5dedb66..0000000 --- a/src/JT1078.Gateway/Udp/JT1078UdpExtensions.cs +++ /dev/null @@ -1,24 +0,0 @@ -using JT1078.Gateway.Codecs; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Session; -using JT1078.Gateway.Udp; -using JT1078.Gateway.Udp.Handlers; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; -using System.Runtime.CompilerServices; - -namespace JT1078.Gateway -{ - public static class JT1078UdpExtensions - { - public static IJT1078UdpBuilder AddUdpHost(this IJT1078Builder builder) - { - builder.Services.TryAddSingleton(); - builder.Services.TryAddSingleton(); - builder.Services.TryAddScoped(); - builder.Services.TryAddScoped(); - builder.Services.AddHostedService(); - return new JT1078UdpBuilderDefault(builder); - } - } -} \ No newline at end of file diff --git a/src/JT1078.Gateway/Udp/JT1078UdpServerHost.cs b/src/JT1078.Gateway/Udp/JT1078UdpServerHost.cs deleted file mode 100644 index f07e8fb..0000000 --- a/src/JT1078.Gateway/Udp/JT1078UdpServerHost.cs +++ /dev/null @@ -1,76 +0,0 @@ -using DotNetty.Transport.Bootstrapping; -using DotNetty.Transport.Channels; -using DotNetty.Transport.Channels.Sockets; -using JT1078.Gateway.Codecs; -using JT1078.Gateway.Configurations; -using JT1078.Gateway.Udp.Handlers; -using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.Hosting; -using Microsoft.Extensions.Logging; -using Microsoft.Extensions.Options; -using System; -using System.Net; -using System.Runtime.InteropServices; -using System.Threading; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Udp -{ - /// - /// JT1078 Udp网关服务 - /// - internal class JT1078UdpServerHost : IHostedService - { - private readonly IServiceProvider serviceProvider; - private readonly JT1078Configuration configuration; - private readonly ILogger logger; - private MultithreadEventLoopGroup group; - private IChannel bootstrapChannel; - - public JT1078UdpServerHost( - IServiceProvider provider, - ILoggerFactory loggerFactory, - IOptions jT808ConfigurationAccessor) - { - serviceProvider = provider; - configuration = jT808ConfigurationAccessor.Value; - logger=loggerFactory.CreateLogger(); - } - - public Task StartAsync(CancellationToken cancellationToken) - { - group = new MultithreadEventLoopGroup(); - Bootstrap bootstrap = new Bootstrap(); - bootstrap.Group(group); - bootstrap.Channel(); - if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) - || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) - { - bootstrap - .Option(ChannelOption.SoReuseport, true); - } - bootstrap - .Option(ChannelOption.SoBroadcast, true) - .Handler(new ActionChannelInitializer(channel => - { - IChannelPipeline pipeline = channel.Pipeline; - using (var scope = serviceProvider.CreateScope()) - { - pipeline.AddLast("JT1078UdpDecoder", scope.ServiceProvider.GetRequiredService()); - pipeline.AddLast("JT1078UdpService", scope.ServiceProvider.GetRequiredService()); - } - })); - logger.LogInformation($"JT1078 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}."); - return bootstrap.BindAsync(configuration.UdpPort) - .ContinueWith(i => bootstrapChannel = i.Result); - } - - public async Task StopAsync(CancellationToken cancellationToken) - { - await bootstrapChannel.CloseAsync(); - var quietPeriod = configuration.QuietPeriodTimeSpan; - var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; - await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); - } - } -}