Pārlūkot izejas kodu

添加http服务和websocket服务

master
SmallChi(Koike) pirms 4 gadiem
vecāks
revīzija
f2d0a8cd15
38 mainītis faili ar 286 papildinājumiem un 1351 dzēšanām
  1. +13
    -0
      src/JT1078.Gateway.Abstractions/IJT1078Authorization.cs
  2. +1
    -1
      src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
  3. +1
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  4. +3
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json
  5. +0
    -20
      src/JT1078.Gateway/Codecs/JT1078TcpDecoder.cs
  6. +0
    -21
      src/JT1078.Gateway/Codecs/JT1078UdpDecoder.cs
  7. +0
    -12
      src/JT1078.Gateway/Configurations/JT1078RemoteServerOptions.cs
  8. +0
    -32
      src/JT1078.Gateway/Http/Authorization/JT1078AuthorizationDefault.cs
  9. +0
    -183
      src/JT1078.Gateway/Http/Handlers/JT1078HttpServerHandler.cs
  10. +0
    -36
      src/JT1078.Gateway/Http/JT1078HttpBuilderDefault.cs
  11. +0
    -24
      src/JT1078.Gateway/Http/JT1078HttpDotnettyExtensions.cs
  12. +0
    -98
      src/JT1078.Gateway/Http/JT1078HttpServerHost.cs
  13. +28
    -0
      src/JT1078.Gateway/Impl/JT1078AuthorizationDefault.cs
  14. +0
    -5
      src/JT1078.Gateway/JT1078.Gateway.csproj
  15. +7
    -0
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  16. +191
    -0
      src/JT1078.Gateway/JT1078HttpServer.cs
  17. +4
    -0
      src/JT1078.Gateway/JT1078TcpServer.cs
  18. +14
    -3
      src/JT1078.Gateway/JT1078UdpServer.cs
  19. +5
    -2
      src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs
  20. +19
    -0
      src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
  21. +0
    -31
      src/JT1078.Gateway/Metadata/JT1078HttpSession.cs
  22. +0
    -20
      src/JT1078.Gateway/Metadata/JT1078Request.cs
  23. +0
    -10
      src/JT1078.Gateway/Metadata/JT1078Response.cs
  24. +0
    -20
      src/JT1078.Gateway/Metadata/JT1078UdpPackage.cs
  25. +0
    -64
      src/JT1078.Gateway/Session/JT1078HttpSessionManager.cs
  26. +0
    -100
      src/JT1078.Gateway/Session/JT1078TcpSessionManager.cs
  27. +0
    -115
      src/JT1078.Gateway/Session/JT1078UdpSessionManager.cs
  28. +0
    -99
      src/JT1078.Gateway/Tcp/Handlers/JT1078TcpConnectionHandler.cs
  29. +0
    -18
      src/JT1078.Gateway/Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs
  30. +0
    -69
      src/JT1078.Gateway/Tcp/Handlers/JT1078TcpServerHandler.cs
  31. +0
    -30
      src/JT1078.Gateway/Tcp/JT1078TcpBuilderDefault.cs
  32. +0
    -26
      src/JT1078.Gateway/Tcp/JT1078TcpExtensions.cs
  33. +0
    -94
      src/JT1078.Gateway/Tcp/JT1078TcpServerHost.cs
  34. +0
    -18
      src/JT1078.Gateway/Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs
  35. +0
    -70
      src/JT1078.Gateway/Udp/Handlers/JT1078UdpServerHandler.cs
  36. +0
    -30
      src/JT1078.Gateway/Udp/JT1078UdpBuilderDefault.cs
  37. +0
    -24
      src/JT1078.Gateway/Udp/JT1078UdpExtensions.cs
  38. +0
    -76
      src/JT1078.Gateway/Udp/JT1078UdpServerHost.cs

+ 13
- 0
src/JT1078.Gateway.Abstractions/IJT1078Authorization.cs Parādīt failu

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Security.Principal;
using System.Text;

namespace JT1078.Gateway.Abstractions
{
public interface IJT1078Authorization
{
bool Authorization(HttpListenerContext context, out IPrincipal principal);
}
}

+ 1
- 1
src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj Parādīt failu

@@ -25,7 +25,7 @@
</ItemGroup>

<ItemGroup>
<PackageReference Include="JT1078" Version="1.0.2" />
<PackageReference Include="JT1078" Version="1.0.3" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.5" />


+ 1
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs Parādīt failu

@@ -30,6 +30,7 @@ namespace JT1078.Gateway.TestNormalHosting
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration)
.AddHttp()
.AddUdp()
.AddTcp()
.AddNormal()


+ 3
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json Parādīt failu

@@ -11,5 +11,8 @@
"Default": "Trace"
}
}
},
"JT1078Configuration": {
"HttpPort": 15555
}
}

+ 0
- 20
src/JT1078.Gateway/Codecs/JT1078TcpDecoder.cs Parādīt failu

@@ -1,20 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using System.Collections.Generic;
using DotNetty.Transport.Channels;
using JT1078.Protocol;
using System;

namespace JT1078.Gateway.Codecs
{
public class JT1078TcpDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] buffer = new byte[input.Capacity+4];
input.ReadBytes(buffer, 4, input.Capacity);
Array.Copy(JT1078Package.FH_Bytes, 0,buffer, 0, 4);
output.Add(buffer);
}
}
}

+ 0
- 21
src/JT1078.Gateway/Codecs/JT1078UdpDecoder.cs Parādīt failu

@@ -1,21 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using System.Collections.Generic;
using DotNetty.Transport.Channels.Sockets;
using JT1078.Gateway.Metadata;

namespace JT1078.Gateway.Codecs
{
public class JT1078UdpDecoder : MessageToMessageDecoder<DatagramPacket>
{
protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List<object> output)
{
if (!message.Content.IsReadable()) return;
IByteBuffer byteBuffer = message.Content;
byte[] buffer = new byte[byteBuffer.ReadableBytes];
byteBuffer.ReadBytes(buffer);
output.Add(new JT1078UdpPackage(buffer, message.Sender));
}
}
}

+ 0
- 12
src/JT1078.Gateway/Configurations/JT1078RemoteServerOptions.cs Parādīt failu

@@ -1,12 +0,0 @@
using Microsoft.Extensions.Options;
using System.Collections.Generic;

namespace JT1078.Gateway.Configurations
{
public class JT1078RemoteServerOptions:IOptions<JT1078RemoteServerOptions>
{
public List<string> RemoteServers { get; set; }

public JT1078RemoteServerOptions Value => this;
}
}

+ 0
- 32
src/JT1078.Gateway/Http/Authorization/JT1078AuthorizationDefault.cs Parādīt failu

@@ -1,32 +0,0 @@
using DotNetty.Codecs.Http;
using JT1078.Gateway.Interfaces;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Security.Claims;
using System.Security.Principal;
using System.Text;

namespace JT1078.Gateway.Http.Authorization
{
class JT1078AuthorizationDefault : IJT1078Authorization
{
public bool Authorization(IFullHttpRequest request, out IPrincipal principal)
{
var uriSpan = request.Uri.AsSpan();
var uriParamStr = uriSpan.Slice(uriSpan.IndexOf('?')+1).ToString().ToLower();
var uriParams = uriParamStr.Split('&');
var tokenParam = uriParams.FirstOrDefault(m => m.Contains("token"));
if (!string.IsNullOrEmpty(tokenParam))
{
principal = new ClaimsPrincipal(new GenericIdentity(tokenParam.Split('=')[1]));
return true;
}
else
{
principal = null;
return false;
}
}
}
}

+ 0
- 183
src/JT1078.Gateway/Http/Handlers/JT1078HttpServerHandler.cs Parādīt failu

@@ -1,183 +0,0 @@
using System;
using System.Diagnostics;
using System.Text;
using System.Threading.Tasks;
using DotNetty.Buffers;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.WebSockets;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using static DotNetty.Codecs.Http.HttpVersion;
using static DotNetty.Codecs.Http.HttpResponseStatus;
using Microsoft.Extensions.Logging;
using System.Text.RegularExpressions;
using JT1078.Gateway.Session;
using JT1078.Gateway.Interfaces;

namespace JT1078.Gateway.Http.Handlers
{
public sealed class JT1078HttpServerHandler : SimpleChannelInboundHandler<object>
{
const string WebsocketPath = "/jt1078live";
WebSocketServerHandshaker handshaker;
private static readonly AsciiString ServerName = AsciiString.Cached("JT1078Netty");
private static readonly AsciiString DateEntity = HttpHeaderNames.Date;
private static readonly AsciiString ServerEntity = HttpHeaderNames.Server;

private readonly ILogger<JT1078HttpServerHandler> logger;

private readonly JT1078HttpSessionManager jT1078HttpSessionManager;

private readonly IJT1078Authorization iJT1078Authorization;

private readonly IHttpMiddleware httpMiddleware;

public JT1078HttpServerHandler(
JT1078HttpSessionManager jT1078HttpSessionManager,
IJT1078Authorization iJT1078Authorization,
ILoggerFactory loggerFactory,
IHttpMiddleware httpMiddleware = null)
{
this.jT1078HttpSessionManager = jT1078HttpSessionManager;
this.iJT1078Authorization = iJT1078Authorization;
this.httpMiddleware = httpMiddleware;
logger = loggerFactory.CreateLogger<JT1078HttpServerHandler>();
}

public override void ChannelInactive(IChannelHandlerContext context)
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation(context.Channel.Id.AsShortText());
}
jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel);
base.ChannelInactive(context);
}

protected override void ChannelRead0(IChannelHandlerContext ctx, object msg)
{
if (msg is IFullHttpRequest request)
{
this.HandleHttpRequest(ctx, request);
}
else if (msg is WebSocketFrame frame)
{
this.HandleWebSocketFrame(ctx, frame);
}
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

void HandleHttpRequest(IChannelHandlerContext ctx, IFullHttpRequest req)
{
// Handle a bad request.
if (!req.Result.IsSuccess)
{
SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, BadRequest));
return;
}
if ("/favicon.ico".Equals(req.Uri))
{
var res = new DefaultFullHttpResponse(Http11, NotFound);
SendHttpResponse(ctx, req, res);
return;
}
if (iJT1078Authorization.Authorization(req, out var principal))
{
if (req.Uri.StartsWith(WebsocketPath))
{
// Handshake
var wsFactory = new WebSocketServerHandshakerFactory(GetWebSocketLocation(req), null, true, 5 * 1024 * 1024);
this.handshaker = wsFactory.NewHandshaker(req);
if (this.handshaker == null)
{
WebSocketServerHandshakerFactory.SendUnsupportedVersionResponse(ctx.Channel);
}
else
{
this.handshaker.HandshakeAsync(ctx.Channel, req);
jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel);
httpMiddleware?.Next(ctx, req, principal);
}
}
else
{
jT1078HttpSessionManager.TryAdd(principal.Identity.Name, ctx.Channel);
httpMiddleware?.Next(ctx, req, principal);
}
}
else {
SendHttpResponse(ctx, req, new DefaultFullHttpResponse(Http11, Unauthorized));
return;
}
}

void HandleWebSocketFrame(IChannelHandlerContext ctx, WebSocketFrame frame)
{
// Check for closing frame
if (frame is CloseWebSocketFrame)
{
this.handshaker.CloseAsync(ctx.Channel, (CloseWebSocketFrame)frame.Retain());
return;
}
if (frame is PingWebSocketFrame)
{
ctx.WriteAsync(new PongWebSocketFrame((IByteBuffer)frame.Content.Retain()));
return;
}
if (frame is TextWebSocketFrame)
{
// Echo the frame
ctx.WriteAsync(frame.Retain());
return;
}
if (frame is BinaryWebSocketFrame)
{
// Echo the frame
ctx.WriteAsync(frame.Retain());
}
}

static void SendHttpResponse(IChannelHandlerContext ctx, IFullHttpRequest req, IFullHttpResponse res)
{
// Generate an error page if response getStatus code is not OK (200).
if (res.Status.Code != 200)
{
res.Headers.Set(ServerEntity, ServerName);
res.Headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
IByteBuffer buf = Unpooled.CopiedBuffer(Encoding.UTF8.GetBytes(res.Status.ToString()));
res.Content.WriteBytes(buf);
buf.Release();
HttpUtil.SetContentLength(res, res.Content.ReadableBytes);
}
// Send the response and close the connection if necessary.
Task task = ctx.Channel.WriteAndFlushAsync(res);
if (!HttpUtil.IsKeepAlive(req) || res.Status.Code != 200)
{
task.ContinueWith((t, c) => ((IChannelHandlerContext)c).CloseAsync(), ctx, TaskContinuationOptions.ExecuteSynchronously);
}
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
logger.LogError(exception, context.Channel.Id.AsShortText());
context.Channel.WriteAndFlushAsync(new DefaultFullHttpResponse(Http11, InternalServerError));
jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel);
CloseAsync(context);
base.ExceptionCaught(context, exception);
}

public override Task CloseAsync(IChannelHandlerContext context)
{
jT1078HttpSessionManager.RemoveSessionByChannel(context.Channel);
return base.CloseAsync(context);
}

static string GetWebSocketLocation(IFullHttpRequest req)
{
bool result = req.Headers.TryGet(HttpHeaderNames.Host, out ICharSequence value);
string location= value.ToString() + WebsocketPath;
return "ws://" + location;
}
}
}

+ 0
- 36
src/JT1078.Gateway/Http/JT1078HttpBuilderDefault.cs Parādīt failu

@@ -1,36 +0,0 @@
using JT1078.Gateway.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Http
{
class JT1078HttpBuilderDefault : IJT1078HttpBuilder
{
public IJT1078Builder Instance { get; }

public JT1078HttpBuilderDefault(IJT1078Builder builder)
{
Instance = builder;
}

public IJT1078Builder Builder()
{
return Instance;
}

public IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078Authorization), typeof(T), ServiceLifetime.Singleton));
return this;
}

public IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware
{
Instance.Services.TryAdd(new ServiceDescriptor(typeof(IHttpMiddleware), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 0
- 24
src/JT1078.Gateway/Http/JT1078HttpDotnettyExtensions.cs Parādīt failu

@@ -1,24 +0,0 @@
using JT1078.Gateway.Http;
using JT1078.Gateway.Http.Authorization;
using JT1078.Gateway.Http.Handlers;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Session;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;


namespace JT1078.Gateway
{
public static class JT1078HttpDotnettyExtensions
{
public static IJT1078HttpBuilder AddHttpHost(this IJT1078Builder builder)
{
builder.Services.TryAddSingleton<JT1078HttpSessionManager>();
builder.Services.TryAddSingleton<IJT1078Authorization, JT1078AuthorizationDefault>();
builder.Services.AddScoped<JT1078HttpServerHandler>();
builder.Services.AddHostedService<JT1078HttpServerHost>();
return new JT1078HttpBuilderDefault(builder);
}
}
}

+ 0
- 98
src/JT1078.Gateway/Http/JT1078HttpServerHost.cs Parādīt failu

@@ -1,98 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Codecs.Http;
using DotNetty.Codecs.Http.Cors;
using DotNetty.Common.Utilities;
using DotNetty.Handlers.Streams;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Http.Handlers;
using JT1078.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Http
{
/// <summary>
/// JT1078 http服务
/// </summary>
internal class JT1078HttpServerHost : IHostedService
{
private readonly JT1078Configuration configuration;
private readonly ILogger<JT1078HttpServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;
private IByteBufferAllocator serverBufferAllocator;
private readonly IServiceProvider serviceProvider;
public JT1078HttpServerHost(
IServiceProvider serviceProvider,
ILoggerFactory loggerFactory,
IOptions<JT1078Configuration> configurationAccessor)
{
this.serviceProvider = serviceProvider;
configuration = configurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT1078HttpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, 8192)
.ChildOption(ChannelOption.Allocator, serverBufferAllocator)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new HttpServerCodec());
pipeline.AddLast(new CorsHandler(CorsConfigBuilder
.ForAnyOrigin()
.AllowNullOrigin()
.AllowedRequestMethods(HttpMethod.Get, HttpMethod.Post, HttpMethod.Options, HttpMethod.Delete)
.AllowedRequestHeaders((AsciiString)"origin", (AsciiString)"range", (AsciiString)"accept-encoding", (AsciiString)"referer", (AsciiString)"Cache-Control", (AsciiString)"X-Proxy-Authorization", (AsciiString)"X-Requested-With", (AsciiString)"Content-Type")
.ExposeHeaders((StringCharSequence)"Server", (StringCharSequence)"range", (StringCharSequence)"Content-Length", (StringCharSequence)"Content-Range")
.AllowCredentials()
.Build()));
pipeline.AddLast(new HttpObjectAggregator(int.MaxValue));
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("JT1078HttpServerHandler", scope.ServiceProvider.GetRequiredService<JT1078HttpServerHandler>());
}
}));
logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{configuration.HttpPort}.");
return bootstrap.BindAsync(configuration.HttpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 28
- 0
src/JT1078.Gateway/Impl/JT1078AuthorizationDefault.cs Parādīt failu

@@ -0,0 +1,28 @@
using JT1078.Gateway.Abstractions;
using System;
using System.Collections.Generic;
using System.Net;
using System.Security.Claims;
using System.Security.Principal;
using System.Text;

namespace JT1078.Gateway.Impl
{
class JT1078AuthorizationDefault : IJT1078Authorization
{
public bool Authorization(HttpListenerContext context, out IPrincipal principal)
{
var token = context.Request.QueryString.Get("token");
if (!string.IsNullOrEmpty(token))
{
principal = new ClaimsPrincipal(new GenericIdentity(token));
return true;
}
else
{
principal = null;
return false;
}
}
}
}

+ 0
- 5
src/JT1078.Gateway/JT1078.Gateway.csproj Parādīt failu

@@ -38,11 +38,6 @@
<None Remove="Tcp\**" />
<None Remove="Udp\**" />
</ItemGroup>
<ItemGroup>
<Compile Remove="Metadata\JT1078HttpSession.cs" />
<Compile Remove="Metadata\JT1078Request.cs" />
<Compile Remove="Metadata\JT1078Response.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="System.IO.Pipelines" Version="4.7.2" />


+ 7
- 0
src/JT1078.Gateway/JT1078GatewayExtensions.cs Parādīt failu

@@ -50,6 +50,13 @@ namespace JT1078.Gateway
return builder;
}

public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton<IJT1078Authorization, JT1078AuthorizationDefault>();
builder.JT1078Builder.Services.AddHostedService<JT1078HttpServer>();
return builder;
}

public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder)
{
return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder);


+ 191
- 0
src/JT1078.Gateway/JT1078HttpServer.cs Parādīt failu

@@ -0,0 +1,191 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Metadata;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway
{
public class JT1078HttpServer : IHostedService
{
private readonly ILogger Logger;

private readonly JT1078Configuration Configuration;

private readonly IJT1078Authorization authorization;

private HttpListener listener;

public JT1078HttpServer(
IOptions<JT1078Configuration> jT1078ConfigurationAccessor,
IJT1078Authorization authorization,
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
Configuration = jT1078ConfigurationAccessor.Value;
this.authorization = authorization;
}

public Task StartAsync(CancellationToken cancellationToken)
{
if (!HttpListener.IsSupported)
{
Logger.LogWarning("Windows XP SP2 or Server 2003 is required to use the HttpListener class.");
return Task.CompletedTask;
}
listener = new HttpListener();
listener.AuthenticationSchemes = AuthenticationSchemes.Anonymous;
listener.Prefixes.Add($"http://*:{Configuration.HttpPort}/");
listener.Start();
Logger.LogInformation($"JT1078 Http Server start at {IPAddress.Any}:{Configuration.HttpPort}.");
Task.Factory.StartNew(async() =>
{
while (listener.IsListening)
{
var context = await listener.GetContextAsync();
try
{
if (authorization.Authorization(context,out var principal))
{
//new JT1078HttpContext(context, principal);
//todo:session manager
await ProcessRequestAsync(context);
}
else
{
await Http401(context);
}
}
catch (Exception ex)
{
await Http500(context);
Logger.LogError(ex, ex.StackTrace);
}
}
}, cancellationToken);
return Task.CompletedTask;
}

private async ValueTask ProcessRequestAsync(HttpListenerContext context)
{
if(context.Request.RawUrl.StartsWith("/favicon.ico"))
{
Http404(context);
}
if (Logger.IsEnabled(LogLevel.Trace))
{
Logger.LogTrace($"[http RequestTraceIdentifier]:{context.Request.RequestTraceIdentifier.ToString()}");
}
if (context.Request.IsWebSocketRequest)
{
HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null);
//todo:websocket context 管理
await wsContext.WebSocket.SendAsync(Encoding.UTF8.GetBytes("hello,jt1078"), WebSocketMessageType.Text, true, CancellationToken.None);
await Task.Factory.StartNew(async(state) =>
{
//https://www.bejson.com/httputil/websocket/
//ws://127.0.0.1:15555?token=22
var websocketContext = state as HttpListenerWebSocketContext;
while(websocketContext.WebSocket.State == WebSocketState.Open ||
websocketContext.WebSocket.State == WebSocketState.Connecting)
{
var buffer = ArrayPool<byte>.Shared.Rent(256);
try
{
WebSocketReceiveResult receiveResult = await websocketContext.WebSocket.ReceiveAsync(buffer, CancellationToken.None);
if (receiveResult.EndOfMessage)
{
if (receiveResult.Count > 0)
{
var data = buffer.AsSpan().Slice(0, receiveResult.Count).ToArray();
if (Logger.IsEnabled(LogLevel.Trace))
{
Logger.LogTrace($"[ws receive]:{Encoding.UTF8.GetString(data)}");
}
await websocketContext.WebSocket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
}
}
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
if (Logger.IsEnabled(LogLevel.Trace))
{
Logger.LogTrace($"[ws close]:{wsContext}");
}
//todo:session close notice
await wsContext.WebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "normal", CancellationToken.None);
}, wsContext);
}
else
{
//todo:set http chunk
//todo:session close notice
//var body = await new StreamReader(context.Request.InputStream).ReadToEndAsync();
var options = context.Request.QueryString;
//var keys = options.AllKeys;
byte[] b = Encoding.UTF8.GetBytes("ack");
context.Response.StatusCode = 200;
context.Response.KeepAlive = true;
context.Response.ContentLength64 = b.Length;
await context.Response.OutputStream.WriteAsync(b, 0, b.Length);
context.Response.Close();
}
}

private async ValueTask Http401(HttpListenerContext context)
{
byte[] b = Encoding.UTF8.GetBytes("auth error");
context.Response.StatusCode = (int)HttpStatusCode.Unauthorized;
context.Response.KeepAlive = false;
context.Response.ContentLength64 = b.Length;
var output = context.Response.OutputStream;
await output.WriteAsync(b, 0, b.Length);
context.Response.Close();
}

private void Http404(HttpListenerContext context)
{
context.Response.StatusCode = (int)HttpStatusCode.NotFound;
context.Response.KeepAlive = false;
context.Response.Close();
}

private async ValueTask Http500(HttpListenerContext context)
{
byte[] b = Encoding.UTF8.GetBytes("inner error");
context.Response.StatusCode = (int)HttpStatusCode.InternalServerError;
context.Response.KeepAlive = false;
context.Response.ContentLength64 = b.Length;
var output = context.Response.OutputStream;
await output.WriteAsync(b, 0, b.Length);
context.Response.Close();
}

public Task StopAsync(CancellationToken cancellationToken)
{
try
{
listener.Stop();
}
catch (System.ObjectDisposedException ex)
{

}
return Task.CompletedTask;
}
}
}

+ 4
- 0
src/JT1078.Gateway/JT1078TcpServer.cs Parādīt failu

@@ -132,6 +132,10 @@ namespace JT1078.Gateway
break;
}
writer.Advance(bytesRead);
}
catch (System.ObjectDisposedException ex)
{

}
catch (OperationCanceledException ex)
{


+ 14
- 3
src/JT1078.Gateway/JT1078UdpServer.cs Parādīt failu

@@ -99,6 +99,10 @@ namespace JT1078.Gateway
var segment = new ArraySegment<byte>(buffer);
var result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, server.LocalEndPoint);
ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result);
}
catch (System.ObjectDisposedException ex)
{

}
catch (AggregateException ex)
{
@@ -152,9 +156,16 @@ namespace JT1078.Gateway
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("JT1078 Udp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();
try
{
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();
}
catch (System.ObjectDisposedException ex)
{

}
return Task.CompletedTask;
}
}

+ 5
- 2
src/JT1078.Gateway/Jobs/JT1078UdpReceiveTimeoutJob.cs Parādīt failu

@@ -47,8 +47,11 @@ namespace JT1078.Gateway.Jobs
{
SessionManager.RemoveBySessionId(item);
}
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}");
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}");
}
}
catch (Exception ex)
{


+ 19
- 0
src/JT1078.Gateway/Metadata/JT1078HttpContext.cs Parādīt failu

@@ -0,0 +1,19 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Security.Principal;
using System.Text;

namespace JT1078.Gateway.Metadata
{
public class JT1078HttpContext
{
public HttpListenerContext Context { get; }
public IPrincipal User { get; }
public JT1078HttpContext(HttpListenerContext context, IPrincipal user)
{
Context = context;
User = user;
}
}
}

+ 0
- 31
src/JT1078.Gateway/Metadata/JT1078HttpSession.cs Parādīt failu

@@ -1,31 +0,0 @@
using DotNetty.Transport.Channels;
using System;
using System.Net;

namespace JT1078.Gateway.Metadata
{
public class JT1078HttpSession
{
public JT1078HttpSession(
IChannel channel,
string userId)
{
Channel = channel;
UserId = userId;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
}

public JT1078HttpSession() { }

public string UserId { get; set; }

public string AttachInfo { get; set; }

public IChannel Channel { get; set; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; set; }
}
}

+ 0
- 20
src/JT1078.Gateway/Metadata/JT1078Request.cs Parādīt failu

@@ -1,20 +0,0 @@
using JT1078.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Metadata
{
public class JT1078Request
{
public JT1078Request(JT1078Package package,byte[] src)
{
Package = package;
Src = src;
}

public JT1078Package Package { get; }

public byte[] Src { get; }
}
}

+ 0
- 10
src/JT1078.Gateway/Metadata/JT1078Response.cs Parādīt failu

@@ -1,10 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Metadata
{
public class JT1078Response
{
}
}

+ 0
- 20
src/JT1078.Gateway/Metadata/JT1078UdpPackage.cs Parādīt failu

@@ -1,20 +0,0 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT1078.Gateway.Metadata
{
public class JT1078UdpPackage
{
public JT1078UdpPackage(byte[] buffer, EndPoint sender)
{
Buffer = buffer;
Sender = sender;
}

public byte[] Buffer { get; }

public EndPoint Sender { get; }
}
}

+ 0
- 64
src/JT1078.Gateway/Session/JT1078HttpSessionManager.cs Parādīt failu

@@ -1,64 +0,0 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Transport.Channels;
using JT1078.Gateway.Metadata;

namespace JT1078.Gateway.Session
{
/// <summary>
/// JT1078 http会话管理
/// </summary>
public class JT1078HttpSessionManager
{
private readonly ILogger<JT1078HttpSessionManager> logger;

public JT1078HttpSessionManager(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078HttpSessionManager>();
}

private ConcurrentDictionary<string, JT1078HttpSession> SessionDict = new ConcurrentDictionary<string, JT1078HttpSession>();

public int SessionCount
{
get
{
return SessionDict.Count;
}
}

public List<JT1078HttpSession> GetSessions(string userId)
{
return SessionDict.Where(m => m.Value.UserId == userId).Select(m=>m.Value).ToList();
}

public void TryAdd(string userId,IChannel channel)
{
SessionDict.TryAdd(channel.Id.AsShortText(), new JT1078HttpSession(channel, userId));
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($">>>{userId},{channel.Id.AsShortText()} Channel Connection.");
}
}

public void RemoveSessionByChannel(IChannel channel)
{
if (channel.Open&& SessionDict.TryRemove(channel.Id.AsShortText(), out var session))
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($">>>{session.UserId},{session.Channel.Id.AsShortText()} Channel Remove.");
}
}
}
public List<JT1078HttpSession> GetAll()
{
return SessionDict.Select(s => s.Value).ToList();
}
}
}


+ 0
- 100
src/JT1078.Gateway/Session/JT1078TcpSessionManager.cs Parādīt failu

@@ -1,100 +0,0 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Transport.Channels;
using JT1078.Gateway.Metadata;

namespace JT1078.Gateway.Session
{
/// <summary>
/// JT1078 Tcp会话管理
/// </summary>
public class JT1078TcpSessionManager
{
private readonly ILogger<JT1078TcpSessionManager> logger;

public JT1078TcpSessionManager(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078TcpSessionManager>();
}

private ConcurrentDictionary<string, JT1078TcpSession> SessionIdDict = new ConcurrentDictionary<string, JT1078TcpSession>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public JT1078TcpSession GetSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession targetSession))
{
return targetSession;
}
else
{
return default;
}
}

public void TryAdd(string terminalPhoneNo,IChannel channel)
{
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession oldSession))
{
oldSession.LastActiveTime = DateTime.Now;
oldSession.Channel = channel;
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession);
}
else
{
JT1078TcpSession session = new JT1078TcpSession(channel, terminalPhoneNo);
if (SessionIdDict.TryAdd(terminalPhoneNo, session))
{

}
}
}

public JT1078TcpSession RemoveSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo)) return default;
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078TcpSession sessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
return sessionRemove;
}
else
{
return default;
}
}

public void RemoveSessionByChannel(IChannel channel)
{
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var key in terminalPhoneNos)
{
SessionIdDict.TryRemove(key, out JT1078TcpSession sessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
}
}

public IEnumerable<JT1078TcpSession> GetAll()
{
return SessionIdDict.Select(s => s.Value).ToList();
}
}
}


+ 0
- 115
src/JT1078.Gateway/Session/JT1078UdpSessionManager.cs Parādīt failu

@@ -1,115 +0,0 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Options;
using System.Net;
using JT1078.Gateway.Metadata;

namespace JT1078.Gateway.Session
{
/// <summary>
/// JT1078 udp会话管理
/// 估计要轮询下
/// </summary>
public class JT1078UdpSessionManager
{
private readonly ILogger<JT1078UdpSessionManager> logger;

public JT1078UdpSessionManager(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078UdpSessionManager>();
}

private ConcurrentDictionary<string, JT1078UdpSession> SessionIdDict = new ConcurrentDictionary<string, JT1078UdpSession>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public JT1078UdpSession GetSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession targetSession))
{
return targetSession;
}
else
{
return default;
}
}

public void TryAdd(IChannel channel,EndPoint sender,string terminalPhoneNo)
{
//1.先判断是否在缓存里面
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession UdpSession))
{
UdpSession.LastActiveTime=DateTime.Now;
UdpSession.Sender = sender;
UdpSession.Channel = channel;
SessionIdDict.TryUpdate(terminalPhoneNo, UdpSession, UdpSession);
}
else
{
SessionIdDict.TryAdd(terminalPhoneNo, new JT1078UdpSession(channel, sender, terminalPhoneNo));
}
}

public void Heartbeat(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo)) return;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession oldSession))
{
oldSession.LastActiveTime = DateTime.Now;
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession);
}
}

public JT1078UdpSession RemoveSession(string terminalPhoneNo)
{
//设备离线可以进行通知
//使用Redis 发布订阅
if (string.IsNullOrEmpty(terminalPhoneNo)) return default;
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078UdpSession SessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
return SessionRemove;
}
else
{
return default;
}
}

public void RemoveSessionByChannel(IChannel channel)
{
//设备离线可以进行通知
//使用Redis 发布订阅
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var key in terminalPhoneNos)
{
SessionIdDict.TryRemove(key, out JT1078UdpSession SessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
}
}

public IEnumerable<JT1078UdpSession> GetAll()
{
return SessionIdDict.Select(s => s.Value).ToList();
}
}
}


+ 0
- 99
src/JT1078.Gateway/Tcp/Handlers/JT1078TcpConnectionHandler.cs Parādīt failu

@@ -1,99 +0,0 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT1078.Gateway.Session;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT1078.Gateway.Tcp.Handlers
{
/// <summary>
/// JT1078服务通道处理程序
/// </summary>
internal class JT1078TcpConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT1078TcpConnectionHandler> logger;

private readonly JT1078TcpSessionManager SessionManager;

public JT1078TcpConnectionHandler(
JT1078TcpSessionManager sessionManager,
ILoggerFactory loggerFactory)
{
this.SessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT1078TcpConnectionHandler>();
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 设备主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
SessionManager.RemoveSessionByChannel(context.Channel);
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
SessionManager.RemoveSessionByChannel(context.Channel);
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if(idleStateEvent.State== IdleState.ReaderIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception,$"{channelId} {exception.Message}" );
SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
}
}


+ 0
- 18
src/JT1078.Gateway/Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs Parādīt failu

@@ -1,18 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Metadata;
using JT1078.Protocol;

namespace JT1078.Gateway.Tcp.Handlers
{
class JT1078TcpMessageProcessorEmptyImpl : IJT1078TcpMessageHandlers
{
public Task<JT1078Response> Processor(JT1078Request request)
{
return Task.FromResult<JT1078Response>(default);
}
}
}

+ 0
- 69
src/JT1078.Gateway/Tcp/Handlers/JT1078TcpServerHandler.cs Parādīt failu

@@ -1,69 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using Microsoft.Extensions.Logging;
using JT1078.Protocol;
using JT1078.Gateway.Session;
using JT1078.Gateway.Session.Services;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Metadata;
using JT1078.Gateway.Enums;

namespace JT1078.Gateway.Tcp.Handlers
{
/// <summary>
/// JT1078 服务端处理程序
/// </summary>
internal class JT1078TcpServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT1078TcpSessionManager SessionManager;

private readonly JT1078AtomicCounterService AtomicCounterService;

private readonly ILogger<JT1078TcpServerHandler> logger;

private readonly IJT1078TcpMessageHandlers handlers;

public JT1078TcpServerHandler(
IJT1078TcpMessageHandlers handlers,
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
JT1078TcpSessionManager sessionManager)
{
this.handlers = handlers;
this.SessionManager = sessionManager;
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp);
logger = loggerFactory.CreateLogger<JT1078TcpServerHandler>();
}


protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg));
}
JT1078Package package = JT1078Serializer.Deserialize(msg);
AtomicCounterService.MsgSuccessIncrement();
SessionManager.TryAdd(package.SIM, ctx.Channel);
handlers.Processor(new JT1078Request(package, msg));
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
}
}
catch (Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
}
}
}

+ 0
- 30
src/JT1078.Gateway/Tcp/JT1078TcpBuilderDefault.cs Parādīt failu

@@ -1,30 +0,0 @@
using JT1078.Gateway.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Tcp
{
class JT1078TcpBuilderDefault : IJT1078TcpBuilder
{
public IJT1078Builder Instance { get; }

public JT1078TcpBuilderDefault(IJT1078Builder builder)
{
Instance = builder;
}

public IJT1078Builder Builder()
{
return Instance;
}

public IJT1078TcpBuilder Replace<T>() where T : IJT1078TcpMessageHandlers
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078TcpMessageHandlers), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 0
- 26
src/JT1078.Gateway/Tcp/JT1078TcpExtensions.cs Parādīt failu

@@ -1,26 +0,0 @@
using JT1078.Gateway.Codecs;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Session;
using JT1078.Gateway.Tcp;
using JT1078.Gateway.Tcp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;


namespace JT1078.Gateway
{
public static class JT1078TcpExtensions
{
public static IJT1078TcpBuilder AddTcpHost(this IJT1078Builder builder)
{
builder.Services.TryAddSingleton<JT1078TcpSessionManager>();
builder.Services.TryAddScoped<JT1078TcpConnectionHandler>();
builder.Services.TryAddScoped<JT1078TcpDecoder>();
builder.Services.TryAddSingleton<IJT1078TcpMessageHandlers, JT1078TcpMessageProcessorEmptyImpl>();
builder.Services.TryAddScoped<JT1078TcpServerHandler>();
builder.Services.AddHostedService<JT1078TcpServerHost>();
return new JT1078TcpBuilderDefault(builder);
}
}
}

+ 0
- 94
src/JT1078.Gateway/Tcp/JT1078TcpServerHost.cs Parādīt failu

@@ -1,94 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT1078.Gateway.Codecs;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Tcp.Handlers;
using JT1078.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Tcp
{
/// <summary>
/// JT1078 Tcp网关服务
/// </summary>
internal class JT1078TcpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT1078Configuration configuration;
private readonly ILogger<JT1078TcpServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;
private IByteBufferAllocator serverBufferAllocator;

public JT1078TcpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT1078Configuration> configurationAccessor)
{
serviceProvider = provider;
configuration = configurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT1078TcpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, configuration.SoBacklog)
.ChildOption(ChannelOption.Allocator, serverBufferAllocator)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
channel.Pipeline.AddLast("JT1078TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,true,
Unpooled.CopiedBuffer(JT1078Package.FH_Bytes)));
channel.Pipeline.AddLast("JT1078TcpDecode", scope.ServiceProvider.GetRequiredService<JT1078TcpDecoder>());
channel.Pipeline.AddLast("JT1078SystemIdleState", new IdleStateHandler(
configuration.ReaderIdleTimeSeconds,
configuration.WriterIdleTimeSeconds,
configuration.AllIdleTimeSeconds));
channel.Pipeline.AddLast("JT1078TcpConnection", scope.ServiceProvider.GetRequiredService<JT1078TcpConnectionHandler>());
channel.Pipeline.AddLast("JT1078TcpService", scope.ServiceProvider.GetRequiredService<JT1078TcpServerHandler>());
}
}));
logger.LogInformation($"JT1078 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}.");
return bootstrap.BindAsync(configuration.TcpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 0
- 18
src/JT1078.Gateway/Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs Parādīt failu

@@ -1,18 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Metadata;
using JT1078.Protocol;

namespace JT1078.Gateway.Udp.Handlers
{
class JT1078UdpMessageProcessorEmptyImpl : IJT1078UdpMessageHandlers
{
public Task<JT1078Response> Processor(JT1078Request request)
{
return Task.FromResult<JT1078Response>(default);
}
}
}

+ 0
- 70
src/JT1078.Gateway/Udp/Handlers/JT1078UdpServerHandler.cs Parādīt failu

@@ -1,70 +0,0 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using Microsoft.Extensions.Logging;
using JT1078.Protocol;
using JT1078.Gateway.Metadata;
using JT1078.Gateway.Session;
using JT1078.Gateway.Session.Services;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Enums;

namespace JT1078.Gateway.Udp.Handlers
{
/// <summary>
/// JT1078 Udp服务端处理程序
/// </summary>
internal class JT1078UdpServerHandler : SimpleChannelInboundHandler<JT1078UdpPackage>
{
private readonly ILogger<JT1078UdpServerHandler> logger;

private readonly JT1078UdpSessionManager SessionManager;

private readonly JT1078AtomicCounterService AtomicCounterService;

private readonly IJT1078UdpMessageHandlers handlers;
public JT1078UdpServerHandler(
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
IJT1078UdpMessageHandlers handlers,
JT1078UdpSessionManager sessionManager)
{
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp);
this.SessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT1078UdpServerHandler>();
this.handlers = handlers;
}

protected override void ChannelRead0(IChannelHandlerContext ctx, JT1078UdpPackage msg)
{
try
{
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg.Buffer));
}
JT1078Package package = JT1078Serializer.Deserialize(msg.Buffer);
AtomicCounterService.MsgSuccessIncrement();
SessionManager.TryAdd(ctx.Channel, msg.Sender, package.SIM);
handlers.Processor(new JT1078Request(package, msg.Buffer));
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
}
}
catch (Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer));
}
}
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

}
}

+ 0
- 30
src/JT1078.Gateway/Udp/JT1078UdpBuilderDefault.cs Parādīt failu

@@ -1,30 +0,0 @@
using JT1078.Gateway.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Udp
{
class JT1078UdpBuilderDefault : IJT1078UdpBuilder
{
public IJT1078Builder Instance { get; }

public JT1078UdpBuilderDefault(IJT1078Builder builder)
{
Instance = builder;
}

public IJT1078Builder Builder()
{
return Instance;
}

public IJT1078UdpBuilder Replace<T>() where T : IJT1078UdpMessageHandlers
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078UdpMessageHandlers), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 0
- 24
src/JT1078.Gateway/Udp/JT1078UdpExtensions.cs Parādīt failu

@@ -1,24 +0,0 @@
using JT1078.Gateway.Codecs;
using JT1078.Gateway.Interfaces;
using JT1078.Gateway.Session;
using JT1078.Gateway.Udp;
using JT1078.Gateway.Udp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;

namespace JT1078.Gateway
{
public static class JT1078UdpExtensions
{
public static IJT1078UdpBuilder AddUdpHost(this IJT1078Builder builder)
{
builder.Services.TryAddSingleton<JT1078UdpSessionManager>();
builder.Services.TryAddSingleton<IJT1078UdpMessageHandlers, JT1078UdpMessageProcessorEmptyImpl>();
builder.Services.TryAddScoped<JT1078UdpDecoder>();
builder.Services.TryAddScoped<JT1078UdpServerHandler>();
builder.Services.AddHostedService<JT1078UdpServerHost>();
return new JT1078UdpBuilderDefault(builder);
}
}
}

+ 0
- 76
src/JT1078.Gateway/Udp/JT1078UdpServerHost.cs Parādīt failu

@@ -1,76 +0,0 @@
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT1078.Gateway.Codecs;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Udp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Udp
{
/// <summary>
/// JT1078 Udp网关服务
/// </summary>
internal class JT1078UdpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT1078Configuration configuration;
private readonly ILogger<JT1078UdpServerHost> logger;
private MultithreadEventLoopGroup group;
private IChannel bootstrapChannel;

public JT1078UdpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT1078Configuration> jT808ConfigurationAccessor)
{
serviceProvider = provider;
configuration = jT808ConfigurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT1078UdpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
group = new MultithreadEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.Group(group);
bootstrap.Channel<SocketDatagramChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true);
}
bootstrap
.Option(ChannelOption.SoBroadcast, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("JT1078UdpDecoder", scope.ServiceProvider.GetRequiredService<JT1078UdpDecoder>());
pipeline.AddLast("JT1078UdpService", scope.ServiceProvider.GetRequiredService<JT1078UdpServerHandler>());
}
}));
logger.LogInformation($"JT1078 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}.");
return bootstrap.BindAsync(configuration.UdpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

Notiek ielāde…
Atcelt
Saglabāt