Sfoglia il codice sorgente

1.了解基于dotnetty的http服务

2.去掉会话服务job任务
3.增加webapi服务(待测试)
tags/v1.0.0
SmallChi 6 anni fa
parent
commit
c2f7c5dfde
11 ha cambiato i file con 312 aggiunte e 97 eliminazioni
  1. +15
    -0
      src/JT808.DotNetty/Dtos/JT808UnificationSendRequestDto.cs
  2. +1
    -4
      src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs
  3. +25
    -63
      src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs
  4. +1
    -1
      src/JT808.DotNetty/Interfaces/IJT808SessionService.cs
  5. +1
    -1
      src/JT808.DotNetty/Interfaces/IJT808UnificationSendService.cs
  6. +2
    -1
      src/JT808.DotNetty/JT808DotnettyExtensions.cs
  7. +4
    -22
      src/JT808.DotNetty/JT808SessionManager.cs
  8. +5
    -5
      src/JT808.DotNetty/JT808WebAPIServerHost.cs
  9. +214
    -0
      src/JT808.DotNetty/JT808WebAPIService.cs
  10. +22
    -0
      src/JT808.DotNetty/Metadata/JT808HttpRequest.cs
  11. +22
    -0
      src/JT808.DotNetty/Metadata/JT808HttpResponse.cs

+ 15
- 0
src/JT808.DotNetty/Dtos/JT808UnificationSendRequestDto.cs Vedi File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Dtos
{
/// <summary>
/// 统一下发请求参数
/// </summary>
public class JT808UnificationSendRequestDto
{
public string TerminalPhoneNo { get; set; }
public byte[] Data { get; set; }
}
}

+ 1
- 4
src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs Vedi File

@@ -64,10 +64,7 @@ namespace JT808.DotNetty.Handlers
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)
{
context.Flush();
}
public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush();

/// <summary>
/// 超时策略


+ 25
- 63
src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs Vedi File

@@ -1,97 +1,58 @@
using DotNetty.Buffers;
using DotNetty.Codecs.Http;
using DotNetty.Common;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using JT808.DotNetty.Dtos;
using JT808.DotNetty.Interfaces;
using JT808.DotNetty.Metadata;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Handlers
{
/// <summary>
/// jt808 webapi服务
/// 请求量不大,只支持JSON格式
/// 请求量不大,只支持JSON格式并且只支持post发数据
/// ref: dotnetty HttpServer
/// </summary>
internal class JT808WebAPIServerHandler : ChannelHandlerAdapter
internal class JT808WebAPIServerHandler : SimpleChannelInboundHandler<IFullHttpRequest>
{
private static readonly ThreadLocalCache Cache = new ThreadLocalCache();

sealed class ThreadLocalCache : FastThreadLocal<AsciiString>
{
protected override AsciiString GetInitialValue()
{
DateTime dateTime = DateTime.UtcNow;
return AsciiString.Cached($"{dateTime.DayOfWeek}, {dateTime:dd MMM yyyy HH:mm:ss z}");
}
}

private static readonly AsciiString TypeJson = AsciiString.Cached("application/json");
private static readonly AsciiString ServerName = AsciiString.Cached("JT808WebAPINetty");
private static readonly AsciiString ContentTypeEntity = HttpHeaderNames.ContentType;
private static readonly AsciiString DateEntity = HttpHeaderNames.Date;
private static readonly AsciiString ContentLengthEntity = HttpHeaderNames.ContentLength;
private static readonly AsciiString ServerEntity = HttpHeaderNames.Server;

volatile ICharSequence date = Cache.Value;

private readonly JT808WebAPIService jT808WebAPIService;
private readonly ILogger<JT808WebAPIServerHandler> logger;

private readonly IJT808SessionService jT808SessionService;

private readonly IJT808UnificationSendService jT808UnificationSendService;

public JT808WebAPIServerHandler(
IJT808SessionService jT808SessionService,
IJT808UnificationSendService jT808UnificationSendService,
JT808WebAPIService jT808WebAPIService,
ILoggerFactory loggerFactory)
{
this.jT808SessionService = jT808SessionService;
this.jT808UnificationSendService = jT808UnificationSendService;
this.jT808WebAPIService = jT808WebAPIService;
logger = loggerFactory.CreateLogger<JT808WebAPIServerHandler>();
}

public override void ChannelRead(IChannelHandlerContext ctx, object message)
protected override void ChannelRead0(IChannelHandlerContext ctx, IFullHttpRequest msg)
{
if (message is IHttpRequest request)
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Uri:{msg.Uri}");
logger.LogDebug($"Content:{msg.Content.ToString(Encoding.UTF8)}");
}
JT808HttpResponse jT808HttpResponse = null;
if (jT808WebAPIService.HandlerDict.TryGetValue(msg.Uri,out var funcHandler))
{
try
{
Process(ctx, request);
}
finally
{
ReferenceCountUtil.Release(message);
}
jT808HttpResponse = funcHandler( new JT808HttpRequest() { Json = msg.Content.ToString(Encoding.UTF8)});
}
else
{
ctx.FireChannelRead(message);
jT808HttpResponse = jT808WebAPIService.NotFoundHttpResponse();
}
}

private void Process(IChannelHandlerContext ctx, IHttpRequest request)
{
string uri = request.Uri;
//switch (uri)
//{
// //case "/json":
// // byte[] json = Encoding.UTF8.GetBytes(NewMessage().ToJsonFormat());
// // this.WriteResponse(ctx, Unpooled.WrappedBuffer(json), TypeJson, JsonClheaderValue);
// // break;
// default:
// var response = new DefaultFullHttpResponse(HttpVersion.Http11, HttpResponseStatus.NotFound, Unpooled.Empty, false);
// ctx.WriteAndFlushAsync(response);
// ctx.CloseAsync();
// break;
//}
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808DefaultResultDto()));
this.WriteResponse(ctx, Unpooled.WrappedBuffer(json), TypeJson, json.Length);
if (jT808HttpResponse != null)
{
WriteResponse(ctx, Unpooled.WrappedBuffer(jT808HttpResponse.Data), TypeJson, jT808HttpResponse.Data.Length);
}
}

private void WriteResponse(IChannelHandlerContext ctx, IByteBuffer buf, ICharSequence contentType, int contentLength)
@@ -101,16 +62,17 @@ namespace JT808.DotNetty.Handlers
HttpHeaders headers = response.Headers;
headers.Set(ContentTypeEntity, contentType);
headers.Set(ServerEntity, ServerName);
headers.Set(DateEntity, this.date);
headers.Set(DateEntity, DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff"));
headers.Set(ContentLengthEntity, contentLength);
// Close the non-keep-alive connection after the write operation is done.
ctx.WriteAsync(response);
ctx.WriteAndFlushAsync(response);
ctx.CloseAsync();
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception, $"{channelId} {exception.Message}");
WriteResponse(context, Unpooled.WrappedBuffer(jT808WebAPIService.ErrorHttpResponse(exception).Data), TypeJson, jT808WebAPIService.ErrorHttpResponse(exception).Data.Length);
logger.LogError(exception, exception.Message);
context.CloseAsync();
}



+ 1
- 1
src/JT808.DotNetty/Interfaces/IJT808SessionService.cs Vedi File

@@ -8,7 +8,7 @@ namespace JT808.DotNetty.Interfaces
/// <summary>
/// JT808会话服务
/// </summary>
public interface IJT808SessionService
internal interface IJT808SessionService
{
/// <summary>
/// 获取真实连接数


+ 1
- 1
src/JT808.DotNetty/Interfaces/IJT808UnificationSendService.cs Vedi File

@@ -8,7 +8,7 @@ namespace JT808.DotNetty.Interfaces
/// <summary>
/// JT808统一下发命令
/// </summary>
public interface IJT808UnificationSendService
internal interface IJT808UnificationSendService
{
JT808ResultDto<bool> Send(string terminalPhoneNo, byte[] data);
}


+ 2
- 1
src/JT808.DotNetty/JT808DotnettyExtensions.cs Vedi File

@@ -24,9 +24,10 @@ namespace JT808.DotNetty
services.TryAddScoped<JT808ConnectionHandler>();
services.TryAddScoped<JT808Decoder>();
services.TryAddScoped<JT808ServerHandler>();
services.TryAddScoped<JT808WebAPIServerHandler>();
services.TryAddSingleton<IJT808SessionService, JT808SessionServiceDefaultImpl>();
services.TryAddSingleton<IJT808UnificationSendService, JT808UnificationSendServiceDefaultImpl>();
services.TryAddScoped<JT808WebAPIService>();
services.TryAddScoped<JT808WebAPIServerHandler>();
services.AddHostedService<JT808ServerHost>();
services.AddHostedService<JT808WebAPIServerHost>();
});


+ 4
- 22
src/JT808.DotNetty/JT808SessionManager.cs Vedi File

@@ -13,31 +13,18 @@ using JT808.DotNetty.Metadata;

namespace JT808.DotNetty
{
public class JT808SessionManager: IDisposable
public class JT808SessionManager
{
private readonly ILogger<JT808SessionManager> logger;

private readonly JT808Configuration configuration;
private readonly CancellationTokenSource cancellationTokenSource;
public JT808SessionManager(
IOptions<JT808Configuration> jT808ConfigurationAccessor,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT808SessionManager>();
configuration = jT808ConfigurationAccessor.Value;
cancellationTokenSource = new CancellationTokenSource();
Task.Run(() =>
{
while (!cancellationTokenSource.IsCancellationRequested)
{
logger.LogInformation($"Online Count>>>{RealSessionCount}");
if (RealSessionCount > 0)
{
logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}");
logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", TerminalPhoneNo_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}");
}
Thread.Sleep(configuration.SessionReportTime);
}
}, cancellationTokenSource.Token);
}

/// <summary>
@@ -212,12 +199,7 @@ namespace JT808.DotNetty
{
return SessionIdDict.Join(TerminalPhoneNo_SessionId_Dict, m => m.Key, s => s.Value, (m, s) => m.Value).ToList();
}
public void Dispose()
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
}
}
}


+ 5
- 5
src/JT808.DotNetty/JT808WebAPIServerHost.cs Vedi File

@@ -9,10 +9,8 @@ using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Net;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

@@ -61,9 +59,11 @@ namespace JT808.DotNetty
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("encoder", new HttpResponseEncoder());
pipeline.AddLast("decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
pipeline.AddLast("jt808webapihandler", scope.ServiceProvider.GetRequiredService<JT808WebAPIServerHandler>());
pipeline.AddLast("http_encoder", new HttpResponseEncoder());
pipeline.AddLast("http_decoder", new HttpRequestDecoder(4096, 8192, 8192, false));
//将多个消息转换为单一的request或者response对象 =>IFullHttpRequest
pipeline.AddLast("http_aggregator", new HttpObjectAggregator(65536));
pipeline.AddLast("http_jt808webapihandler", scope.ServiceProvider.GetRequiredService<JT808WebAPIServerHandler>());
}
}));
logger.LogInformation($"WebAPI Server start at {IPAddress.Any}:{configuration.WebAPIPort}.");


+ 214
- 0
src/JT808.DotNetty/JT808WebAPIService.cs Vedi File

@@ -0,0 +1,214 @@
using JT808.DotNetty.Dtos;
using JT808.DotNetty.Interfaces;
using JT808.DotNetty.Metadata;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty
{
internal class JT808WebAPIService
{
public Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>> HandlerDict { get; protected set; }

private const string RouteTablePrefix = "/jt808api";

private const string sessionRoutePrefix = "Session";

private readonly IJT808SessionService jT808SessionService;

private readonly IJT808UnificationSendService jT808UnificationSendService;

/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT808WebAPIService(
IJT808SessionService jT808SessionService,
IJT808UnificationSendService jT808UnificationSendService)
{
this.jT808SessionService = jT808SessionService;
this.jT808UnificationSendService = jT808UnificationSendService;
HandlerDict = new Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>>
{
{$"{RouteTablePrefix}/UnificationSend", UnificationSend},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetRealLinkCount", GetRealLinkCount},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetRelevanceLinkCount", GetRelevanceLinkCount},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetRealAll", GetRealAll},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetRelevanceAll", GetRelevanceAll},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByChannelId", RemoveByChannelId},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByTerminalPhoneNo", RemoveByTerminalPhoneNo},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetByChannelId", GetByChannelId},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetByTerminalPhoneNo", GetByTerminalPhoneNo},
};
}

/// <summary>
/// 统一下发信息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse UnificationSend(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
JT808UnificationSendRequestDto jT808UnificationSendRequestDto = JsonConvert.DeserializeObject<JT808UnificationSendRequestDto>(request.Json);
var result = jT808UnificationSendService.Send(jT808UnificationSendRequestDto.TerminalPhoneNo, jT808UnificationSendRequestDto.Data);
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-获取真实连接数
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetRealLinkCount(JT808HttpRequest request)
{
var result = jT808SessionService.GetRealAll();
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-获取设备相关连的连接数
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetRelevanceLinkCount(JT808HttpRequest request)
{
var result = jT808SessionService.GetRelevanceLinkCount();
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-获取实际会话集合
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetRealAll(JT808HttpRequest request)
{
var result = jT808SessionService.GetRealAll();
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-获取设备相关连会话集合
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetRelevanceAll(JT808HttpRequest request)
{
var result = jT808SessionService.GetRelevanceAll();
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-通过通道Id移除对应会话
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse RemoveByChannelId(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
var result = jT808SessionService.RemoveByChannelId(request.Json);
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-通过设备终端号移除对应会话
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse RemoveByTerminalPhoneNo(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
var result = jT808SessionService.RemoveByTerminalPhoneNo(request.Json);
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-通过通道Id获取会话信息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetByChannelId(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
var result = jT808SessionService.GetByChannelId(request.Json);
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 会话服务-通过设备终端号获取会话信息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetByTerminalPhoneNo(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
var result = jT808SessionService.GetByTerminalPhoneNo(request.Json);
return CreateJT808HttpResponse(result);
}

private JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject)
{
byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject));
return new JT808HttpResponse()
{
Data = data
};
}

public JT808HttpResponse DefaultHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808DefaultResultDto()));
return new JT808HttpResponse(json);
}

public JT808HttpResponse EmptyHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code=201,
Message="内容为空",
Data="Content Empty"
}));
return new JT808HttpResponse(json);
}

public JT808HttpResponse NotFoundHttpResponse()
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code=404,
Message="没有该服务",
Data= "没有该服务"
}));
return new JT808HttpResponse(json);
}

public JT808HttpResponse ErrorHttpResponse(Exception ex)
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code = 500,
Message = JsonConvert.SerializeObject(ex),
Data= ex.Message
}));
return new JT808HttpResponse(json);
}
}
}

+ 22
- 0
src/JT808.DotNetty/Metadata/JT808HttpRequest.cs Vedi File

@@ -0,0 +1,22 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.DotNetty.Metadata
{
public class JT808HttpRequest
{
public string Json { get; set; }

public JT808HttpRequest()
{

}

public JT808HttpRequest(string json)
{
Json = json;
}
}
}

+ 22
- 0
src/JT808.DotNetty/Metadata/JT808HttpResponse.cs Vedi File

@@ -0,0 +1,22 @@
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Reflection;

namespace JT808.DotNetty.Metadata
{
public class JT808HttpResponse
{
public byte[] Data { get; set; }

public JT808HttpResponse()
{

}

public JT808HttpResponse(byte[] data)
{
this.Data = data;
}
}
}

Caricamento…
Annulla
Salva