Przeglądaj źródła

1.增加包计数服务

2.修改原包转发服务
3.增加远程地址转发配置服务
tags/v1.0.0
SmallChi 6 lat temu
rodzic
commit
cf8d8354d0
19 zmienionych plików z 635 dodań i 185 usunięć
  1. +1
    -1
      README.md
  2. +12
    -2
      src/JT808.DotNetty/Configurations/JT808Configuration.cs
  3. +16
    -0
      src/JT808.DotNetty/Dtos/JT808AtomicCounterDto.cs
  4. +1
    -1
      src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs
  5. +35
    -0
      src/JT808.DotNetty/Dtos/JT808IPAddressDto.cs
  6. +8
    -0
      src/JT808.DotNetty/Dtos/JT808ResultDto.cs
  7. +33
    -0
      src/JT808.DotNetty/Dtos/JT808SourcePackageChannelInfoDto.cs
  8. +13
    -5
      src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs
  9. +13
    -2
      src/JT808.DotNetty/Handlers/JT808ServerHandler.cs
  10. +11
    -12
      src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs
  11. +1
    -0
      src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs
  12. +96
    -0
      src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs
  13. +8
    -8
      src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs
  14. +300
    -0
      src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs
  15. +6
    -141
      src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs
  16. +3
    -3
      src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs
  17. +75
    -8
      src/JT808.DotNetty/Internal/JT808WebAPIService.cs
  18. +1
    -0
      src/JT808.DotNetty/JT808DotnettyExtensions.cs
  19. +2
    -2
      src/JT808.DotNetty/JT808WebAPIServerHost.cs

+ 1
- 1
README.md Wyświetl plik

@@ -1,6 +1,6 @@
# JT808DotNetty

基于DotNetty封装的JT808DotNetty专注消息业务处理
基于DotNetty封装的JT808DotNetty通用消息业务处理

[了解JT808协议进这边](https://github.com/SmallChi/JT808)



+ 12
- 2
src/JT808.DotNetty/Configurations/JT808Configuration.cs Wyświetl plik

@@ -27,14 +27,24 @@ namespace JT808.DotNetty.Configurations
public int AllIdleTimeSeconds { get; set; } = 3600;

/// <summary>
/// WebAPI服务
/// WebApi服务
/// 默认828端口
/// </summary>
public int WebAPIPort { get; set; } = 828;
public int WebApiPort { get; set; } = 828;

/// <summary>
/// 源包分发器配置
/// </summary>
public List<JT808ClientConfiguration> SourcePackageDispatcherClientConfigurations { get; set; }

/// <summary>
/// 转发远程地址 (可选项)知道转发的地址有利于提升性能
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括:
// 1.消息的序列化
// 2.消息的下发
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle
// 就跟神兽貔貅一样。。。
/// </summary>
public List<JT808ClientConfiguration> ForwardingRemoteAddress { get; set; }
}
}

+ 16
- 0
src/JT808.DotNetty/Dtos/JT808AtomicCounterDto.cs Wyświetl plik

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Dtos
{
/// <summary>
/// 包计数器服务
/// </summary>
public class JT808AtomicCounterDto
{
public long MsgSuccessCount { get; set; }

public long MsgFailCount { get; set; }
}
}

+ 1
- 1
src/JT808.DotNetty/Dtos/JT808DefaultResultDto.cs Wyświetl plik

@@ -9,7 +9,7 @@ namespace JT808.DotNetty.Dtos
public JT808DefaultResultDto()
{
Data = "Hello,JT808 WebAPI";
Code = 200;
Code = JT808ResultCode.Ok;
}
}
}

+ 35
- 0
src/JT808.DotNetty/Dtos/JT808IPAddressDto.cs Wyświetl plik

@@ -0,0 +1,35 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT808.DotNetty.Dtos
{
public class JT808IPAddressDto
{
public string Host { get; set; }

public int Port { get; set; }

private EndPoint endPoint;

public EndPoint EndPoint
{
get
{
if (endPoint == null)
{
if (IPAddress.TryParse(Host, out IPAddress ip))
{
endPoint = new IPEndPoint(ip, Port);
}
else
{
endPoint = new DnsEndPoint(Host, Port);
}
}
return endPoint;
}
}
}
}

+ 8
- 0
src/JT808.DotNetty/Dtos/JT808ResultDto.cs Wyświetl plik

@@ -12,4 +12,12 @@ namespace JT808.DotNetty.Dtos

public T Data { get; set; }
}

internal class JT808ResultCode
{
public const int Ok = 200;
public const int Empty = 201;
public const int NotFound = 404;
public const int Error = 500;
}
}

+ 33
- 0
src/JT808.DotNetty/Dtos/JT808SourcePackageChannelInfoDto.cs Wyświetl plik

@@ -0,0 +1,33 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Dtos
{
/// <summary>
/// 原包通道信息
/// </summary>
public class JT808SourcePackageChannelInfoDto
{
/// <summary>
/// 远程地址
/// </summary>
public string RemoteAddress { get; set; }
/// <summary>
/// 本地地址
/// </summary>
public string LocalAddress { get; set; }
/// <summary>
/// 是否注册
/// </summary>
public bool Registered { get; set; }
/// <summary>
/// 是否活动
/// </summary>
public bool Active { get; set; }
/// <summary>
/// 是否打开
/// </summary>
public bool Open { get; set; }
}
}

+ 13
- 5
src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs Wyświetl plik

@@ -75,11 +75,19 @@ namespace JT808.DotNetty.Handlers
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
jT808SessionManager.RemoveSessionByID(channelId);
context.CloseAsync();
if(idleStateEvent.State== IdleState.ReaderIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
jT808SessionManager.RemoveSessionByID(channelId);
context.CloseAsync();
}
// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括:
// 1.消息的序列化
// 2.消息的下发
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle
// 就跟神兽貔貅一样。。。
}
base.UserEventTriggered(context, evt);
}


+ 13
- 2
src/JT808.DotNetty/Handlers/JT808ServerHandler.cs Wyświetl plik

@@ -5,6 +5,7 @@ using System;
using System.Collections.Generic;
using System.Text;
using JT808.DotNetty.Metadata;
using JT808.DotNetty.Internal;

namespace JT808.DotNetty.Handlers
{
@@ -14,12 +15,19 @@ namespace JT808.DotNetty.Handlers
private readonly JT808SessionManager jT808SessionManager;

public JT808ServerHandler(JT808MsgIdHandlerBase handler, JT808SessionManager jT808SessionManager)
private readonly JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService;

public JT808ServerHandler(
JT808RemoteAddressTransmitConfigurationService jT808RemoteAddressTransmitConfigurationService,
JT808MsgIdHandlerBase handler,
JT808SessionManager jT808SessionManager)
{
this.jT808RemoteAddressTransmitConfigurationService = jT808RemoteAddressTransmitConfigurationService;
this.handler = handler;
this.jT808SessionManager = jT808SessionManager;
}


protected override void ChannelRead0(IChannelHandlerContext ctx, JT808Package msg)
{
try
@@ -31,7 +39,10 @@ namespace JT808.DotNetty.Handlers
JT808Response jT808Package = handlerFunc(new JT808Request(msg));
if (jT808Package != null)
{
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize)));
if (!jT808RemoteAddressTransmitConfigurationService.Contains(ctx.Channel.RemoteAddress))
{
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package.Package, jT808Package.MinBufferSize)));
}
}
}
}


+ 11
- 12
src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs Wyświetl plik

@@ -14,12 +14,12 @@ namespace JT808.DotNetty.Handlers
{
private readonly ILogger<JT808SourcePackageDispatcherHandler> logger;

private readonly JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl;
private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService;

public JT808SourcePackageDispatcherHandler(JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl)
public JT808SourcePackageDispatcherHandler(JT808SourcePackageChannelService jT808SourcePackageChannelService)
{
logger=jT808SourcePackageDispatcherDefaultImpl.loggerFactory.CreateLogger<JT808SourcePackageDispatcherHandler>();
this.jT808SourcePackageDispatcherDefaultImpl = jT808SourcePackageDispatcherDefaultImpl;
logger= jT808SourcePackageChannelService.LoggerFactory.CreateLogger<JT808SourcePackageDispatcherHandler>();
this.jT808SourcePackageChannelService = jT808SourcePackageChannelService;
}

public override void ChannelInactive(IChannelHandlerContext context)
@@ -30,26 +30,26 @@ namespace JT808.DotNetty.Handlers
return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,接近12个小时链接一次
},(exception, timespan, ctx) =>
{
logger.LogError($"服务端断开{context.Channel.RemoteAddress.ToString()},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}");
logger.LogError($"Server Disconnection {context.Channel.RemoteAddress.ToString()},Retry Results{exception.Result},Retry Number{timespan},Next Retry Interval(s){ctx.TotalSeconds}");
})
.ExecuteAsync(async () =>
{
try
{
var newChannel = jT808SourcePackageDispatcherDefaultImpl.channels.FirstOrDefault(m => m.Value == context.Channel);
var newChannel = jT808SourcePackageChannelService.channels.FirstOrDefault(m => m.Value == context.Channel);
if (default(KeyValuePair<EndPoint, IChannel>).Equals(newChannel))
{
if(logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"服务器已经删除了{context.Channel.RemoteAddress.ToString()}远程服务器配置");
logger.LogDebug($"Server already deleted {context.Channel.RemoteAddress.ToString()} remote server configuration");
return true;
}
var channel = await jT808SourcePackageDispatcherDefaultImpl.bootstrap.ConnectAsync(context.Channel.RemoteAddress);
jT808SourcePackageDispatcherDefaultImpl.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel);
var channel = await jT808SourcePackageChannelService.bootstrap.ConnectAsync(context.Channel.RemoteAddress);
jT808SourcePackageChannelService.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel);
return channel.Open;
}
catch (Exception ex)
{
logger.LogError($"服务端断开后{context.Channel.RemoteAddress.ToString()},重连异常:{ex}");
logger.LogError(ex,$"Reconnection abnormal:After the server is disconnected {context.Channel.RemoteAddress.ToString()}");
return false;
}
});
@@ -58,8 +58,7 @@ namespace JT808.DotNetty.Handlers
public override void ChannelRead(IChannelHandlerContext context, object message)
{
if(logger.IsEnabled(LogLevel.Debug))
logger.LogError($"服务端返回消息{message.ToString()}");
//throw new Exception("test");
logger.LogError($"The server returns a message {message.ToString()}");
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)


+ 1
- 0
src/JT808.DotNetty/Handlers/JT808WebAPIServerHandler.cs Wyświetl plik

@@ -2,6 +2,7 @@
using DotNetty.Codecs.Http;
using DotNetty.Common.Utilities;
using DotNetty.Transport.Channels;
using JT808.DotNetty.Internal;
using JT808.DotNetty.Metadata;
using Microsoft.Extensions.Logging;
using System;


+ 96
- 0
src/JT808.DotNetty/Internal/JT808RemoteAddressTransmitConfigurationService.cs Wyświetl plik

@@ -0,0 +1,96 @@
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Dtos;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;

namespace JT808.DotNetty.Internal
{
/// <summary>
/// JT808远程地址转发配置服务
/// 按照808的消息,有些请求必须要应答,但是转发可以不需要有应答可以节省部分资源包括:
// 1.消息的序列化
// 2.消息的下发
// 都有一定的性能损耗,那么不需要判断写超时 IdleState.WriterIdle
// 就跟神兽貔貅一样。。。
/// </summary>
public class JT808RemoteAddressTransmitConfigurationService : IDisposable
{
private readonly IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor;

private ConcurrentBag<string> ForwardingRemoteAddresss;

private IDisposable jT808ConfigurationOptionsMonitorDisposable;

public JT808RemoteAddressTransmitConfigurationService(
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor)
{
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
ForwardingRemoteAddresss = new ConcurrentBag<string>();
InitForwardingRemoteAddress(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress);
//OnChange 源码多播委托
jT808ConfigurationOptionsMonitorDisposable = this.jT808ConfigurationOptionsMonitor.OnChange(options =>
{
InitForwardingRemoteAddress(options.ForwardingRemoteAddress);
});
}

private void InitForwardingRemoteAddress(List<JT808ClientConfiguration> jT808ClientConfigurations)
{
if (jT808ClientConfigurations != null && jT808ClientConfigurations.Count > 0)
{
foreach (var item in jT808ClientConfigurations)
{
string host = item.EndPoint.ToString();
if (!ForwardingRemoteAddresss.Contains(host))
{
ForwardingRemoteAddresss.Add(host);
}
}
}
}

public bool Contains(EndPoint endPoint)
{
return ForwardingRemoteAddresss.Contains(endPoint.ToString());
}

public JT808ResultDto<bool> Add(JT808IPAddressDto jT808IPAddressDto)
{
string host = jT808IPAddressDto.EndPoint.ToString();
if (!ForwardingRemoteAddresss.Contains(host))
{
ForwardingRemoteAddresss.Add(host);
}
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = true };
}

public JT808ResultDto<bool> Remove(JT808IPAddressDto jT808IPAddressDto)
{
string host = jT808IPAddressDto.EndPoint.ToString();
if(jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress!=null &&
jT808ConfigurationOptionsMonitor.CurrentValue.ForwardingRemoteAddress.Any(w=>w.EndPoint.ToString()== jT808IPAddressDto.ToString()))
{
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = false,Message="不能删除服务器配置的地址" };
}
else
{
return new JT808ResultDto<bool>() { Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.TryTake(out var temp) };
}
}

public JT808ResultDto<List<string>> GetAll()
{
return new JT808ResultDto<List<string>>(){ Code = JT808ResultCode.Ok, Data = ForwardingRemoteAddresss.ToList() };
}

public void Dispose()
{
jT808ConfigurationOptionsMonitorDisposable.Dispose();
}
}
}

+ 8
- 8
src/JT808.DotNetty/Internal/JT808SessionServiceDefaultImpl.cs Wyświetl plik

@@ -35,16 +35,16 @@ namespace JT808.DotNetty.Internal
LastActiveTime = s.LastActiveTime,
StartTime = s.StartTime,
TerminalPhoneNo = s.TerminalPhoneNo,
WebApiPort = jT808Configuration.WebAPIPort,
WebApiPort = jT808Configuration.WebApiPort,
LoaclAddressIP = s.Channel.LocalAddress.ToString(),
RemoteAddressIP = s.Channel.RemoteAddress.ToString(),
}).ToList();
resultDto.Code = 200;
resultDto.Code = JT808ResultCode.Ok;
}
catch (Exception ex)
{
resultDto.Data = null;
resultDto.Code = 500;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
@@ -60,19 +60,19 @@ namespace JT808.DotNetty.Internal
{
session.Channel.CloseAsync();
}
resultDto.Code = 200;
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
catch (AggregateException ex)
{
resultDto.Data = false;
resultDto.Code = 500;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = 500;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;
@@ -88,7 +88,7 @@ namespace JT808.DotNetty.Internal
{
session.Channel.CloseAsync();
}
resultDto.Code = 200;
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
catch (AggregateException ex)
@@ -100,7 +100,7 @@ namespace JT808.DotNetty.Internal
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = 500;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;


+ 300
- 0
src/JT808.DotNetty/Internal/JT808SourcePackageChannelService.cs Wyświetl plik

@@ -0,0 +1,300 @@
using DotNetty.Buffers;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Dtos;
using JT808.DotNetty.Handlers;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace JT808.DotNetty.Internal
{
/// <summary>
/// 原包分发器通道服务
/// </summary>
internal class JT808SourcePackageChannelService:IDisposable
{
private readonly IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor;

internal readonly ConcurrentDictionary<EndPoint, IChannel> channels;

private readonly ILogger<JT808SourcePackageChannelService> logger;

private readonly MultithreadEventLoopGroup group;

internal readonly ILoggerFactory LoggerFactory;

internal readonly Bootstrap bootstrap;

private IDisposable jT808ConfigurationOptionsMonitorDisposable;

public JT808SourcePackageChannelService(
ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor)
{
this.LoggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<JT808SourcePackageChannelService>();
this.channels = new ConcurrentDictionary<EndPoint, IChannel>();
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
this.group = new MultithreadEventLoopGroup();
this.bootstrap = new Bootstrap();
jT808ConfigurationOptionsMonitorDisposable = jT808ConfigurationOptionsMonitor.OnChange(options =>
{
List<JT808ClientConfiguration> chgRemoteServers = new List<JT808ClientConfiguration>();
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
chgRemoteServers = options.SourcePackageDispatcherClientConfigurations;
}
DelRemoteServsers(chgRemoteServers);
AddRemoteServsers(chgRemoteServers);
});
StartAsync();
}

/// <summary>
/// 下发数据
/// </summary>
/// <param name="data"></param>
/// <returns></returns>
public Task SendAsync(byte[] data)
{
foreach (var item in channels)
{
try
{
if (item.Value.Open && item.Value.Active)
{
item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else
{
logger.LogInformation($"{item} link closed.");
}
}
catch (AggregateException ex)
{
logger.LogError(ex, $"{item} Send Data Error.");
}
catch (Exception ex)
{
logger.LogError(ex, $"{item} Send Data Error.");
}
}
return Task.CompletedTask;
}

/// <summary>
/// 获取通道信息集合
/// </summary>
/// <returns></returns>
public JT808ResultDto<List<JT808SourcePackageChannelInfoDto>> GetAll()
{
JT808ResultDto<List<JT808SourcePackageChannelInfoDto>> jT808ResultDto = new JT808ResultDto<List<JT808SourcePackageChannelInfoDto>>();
jT808ResultDto.Data = new List<JT808SourcePackageChannelInfoDto>();
jT808ResultDto.Code = JT808ResultCode.Ok;
foreach (var item in channels)
{
JT808SourcePackageChannelInfoDto jT808SourcePackageChannelInfoDto = new JT808SourcePackageChannelInfoDto();
jT808SourcePackageChannelInfoDto.Active = item.Value.Active;
jT808SourcePackageChannelInfoDto.Open = item.Value.Open;
jT808SourcePackageChannelInfoDto.Registered = item.Value.Registered;
jT808SourcePackageChannelInfoDto.LocalAddress = item.Value.LocalAddress.ToString();
jT808SourcePackageChannelInfoDto.RemoteAddress = item.Value.RemoteAddress.ToString();
jT808ResultDto.Data.Add(jT808SourcePackageChannelInfoDto);
}
return jT808ResultDto;
}

/// <summary>
/// 添加地址
/// </summary>
/// <returns></returns>
public async Task<JT808ResultDto<bool>> Add(JT808IPAddressDto jT808IPAddressDto)
{
JT808ResultDto<bool> jT808ResultDto = new JT808ResultDto<bool>();
jT808ResultDto.Code= JT808ResultCode.Ok;
jT808ResultDto.Data = true;
if (!channels.ContainsKey(jT808IPAddressDto.EndPoint))
{
try
{
IChannel clientChannel = await bootstrap.ConnectAsync(jT808IPAddressDto.EndPoint);
channels.TryAdd(jT808IPAddressDto.EndPoint, clientChannel);
}
catch (AggregateException ex)
{
jT808ResultDto.Data = false;
jT808ResultDto.Code = JT808ResultCode.Error;
jT808ResultDto.Message = JsonConvert.SerializeObject(ex);
}
catch (Exception ex)
{
jT808ResultDto.Data = false;
jT808ResultDto.Code= JT808ResultCode.Error;
jT808ResultDto.Message = JsonConvert.SerializeObject(ex);
}
}
return jT808ResultDto;
}

/// <summary>
/// 删除地址
/// </summary>
/// <returns></returns>
public async Task<JT808ResultDto<bool>> Remove(JT808IPAddressDto jT808IPAddressDto)
{
JT808ResultDto<bool> jT808ResultDto = new JT808ResultDto<bool>();
jT808ResultDto.Code = JT808ResultCode.Ok;
jT808ResultDto.Data = true;

if(jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations!=null &&
jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Any(a=>a.EndPoint.ToString()== jT808IPAddressDto.EndPoint.ToString())
)
{
jT808ResultDto.Data = false;
jT808ResultDto.Message = "不能删除服务器配置的地址";
}
else
{
if (channels.TryRemove(jT808IPAddressDto.EndPoint, out var channel))
{
try
{
await channel.CloseAsync();
}
catch (AggregateException ex)
{
jT808ResultDto.Data = false;
jT808ResultDto.Code = JT808ResultCode.Error;
jT808ResultDto.Message = JsonConvert.SerializeObject(ex);
}
catch (Exception ex)
{
jT808ResultDto.Data = false;
jT808ResultDto.Code = JT808ResultCode.Error;
jT808ResultDto.Message = JsonConvert.SerializeObject(ex);
}
}
}
return jT808ResultDto;
}

private void StartAsync()
{
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this));
}));
jT808ConfigurationOptionsMonitor.OnChange(options =>
{
List<JT808ClientConfiguration> chgRemoteServers = new List<JT808ClientConfiguration>();
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
chgRemoteServers = options.SourcePackageDispatcherClientConfigurations;
}
DelRemoteServsers(chgRemoteServers);
AddRemoteServsers(chgRemoteServers);
});
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null &&
jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations)
{
try
{
Task.Run(async () =>
{
IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint);
channels.TryAdd(item.EndPoint, clientChannel);
logger.LogInformation($"init remote link {item.EndPoint.ToString()}.");
});
}
catch (Exception ex)
{
logger.LogError(ex, $"there is an exception in sending data {item}.");
}
}
}
}

private Task StopAsync()
{
foreach (var channel in channels)
{
try
{
channel.Value.CloseAsync();
}
catch
{

}
}
group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan);
return Task.CompletedTask;
}

/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="chgRemoteServers"></param>
private void DelRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var delChannels = channels.Keys.Except(chgRemoteServers.Select(s => s.EndPoint)).ToList();
foreach (var item in delChannels)
{
try
{
channels.TryRemove(item, out var channel);
channel.CloseAsync();
}
catch
{

}
}
}

/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="chgRemoteServers"></param>
private async void AddRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var addChannels = chgRemoteServers.Select(s => s.EndPoint).Except(channels.Keys).ToList();
foreach (var item in addChannels)
{
try
{
IChannel clientChannel = await bootstrap.ConnectAsync(item);
channels.TryAdd(item, clientChannel);
logger.LogInformation($"link to the remote server after the change {item}.");
}
catch (Exception ex)
{
logger.LogError(ex, $"reconnect the remote server after the exception changes {item}.");
}
}
}

public void Dispose()
{
jT808ConfigurationOptionsMonitorDisposable.Dispose();
StopAsync();
}
}
}

+ 6
- 141
src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs Wyświetl plik

@@ -20,155 +20,20 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Internal
{
/// <summary>
/// 包分发器默认实现
/// 包分发器默认实现
/// </summary>
internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher, IDisposable
internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher
{
private readonly MultithreadEventLoopGroup group = new MultithreadEventLoopGroup();
internal readonly Bootstrap bootstrap = new Bootstrap();
internal readonly ConcurrentDictionary<EndPoint, IChannel> channels = new ConcurrentDictionary<EndPoint, IChannel>();
private readonly ILogger<JT808SourcePackageDispatcherDefaultImpl> logger;
private IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor;
internal readonly ILoggerFactory loggerFactory;
private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService;

public JT808SourcePackageDispatcherDefaultImpl(ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor)
public JT808SourcePackageDispatcherDefaultImpl(JT808SourcePackageChannelService jT808SourcePackageChannelService)
{
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<JT808SourcePackageDispatcherDefaultImpl>();
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
StartAsync();
this.jT808SourcePackageChannelService = jT808SourcePackageChannelService;
}

public async Task SendAsync(byte[] data)
{
foreach (var item in channels)
{
try
{
if (item.Value.Open && item.Value.Active)
{
await item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else
{
logger.LogInformation($"{item} link closed.");
}
}
catch (Exception ex)
{
logger.LogError(ex,$"{item} Send Data Error.");
}
}
await Task.CompletedTask;
}

public void StartAsync()
{
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this));
}));
jT808ConfigurationOptionsMonitor.OnChange(options =>
{
List<JT808ClientConfiguration> chgRemoteServers = new List<JT808ClientConfiguration>();
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
chgRemoteServers = options.SourcePackageDispatcherClientConfigurations;
}
DelRemoteServsers(chgRemoteServers);
AddRemoteServsers(chgRemoteServers);
});
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null &&
jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations)
{
try
{
Task.Run(async () =>
{
IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint);
channels.TryAdd(item.EndPoint, clientChannel);
logger.LogInformation($"init remote link {item.EndPoint.ToString()}.");
});
}
catch (Exception ex)
{
logger.LogError(ex,$"there is an exception in sending data {item}.");
}
}
}
}

public Task StopAsync()
{
foreach (var channel in channels)
{
try
{
channel.Value.CloseAsync();
}
catch
{

}
}
group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan);
return Task.CompletedTask;
}

/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="chgRemoteServers"></param>
private void DelRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var delChannels = channels.Keys.Except(chgRemoteServers.Select(s=>s.EndPoint)).ToList();
foreach (var item in delChannels)
{
try
{
channels.TryRemove(item, out var channel);
channel.CloseAsync();
}
catch
{
}
}
}

/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="chgRemoteServers"></param>
private async void AddRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var addChannels = chgRemoteServers.Select(s=>s.EndPoint).Except(channels.Keys).ToList();
foreach (var item in addChannels)
{
try
{
IChannel clientChannel =await bootstrap.ConnectAsync(item);
channels.TryAdd(item, clientChannel);
logger.LogInformation($"link to the remote server after the change {item}.");
}
catch (Exception ex)
{
logger.LogError(ex,$"reconnect the remote server after the exception changes {item}.");
}
}
}

public void Dispose()
{
StopAsync();
await jT808SourcePackageChannelService.SendAsync(data);
}
}
}

+ 3
- 3
src/JT808.DotNetty/Internal/JT808UnificationSendServiceDefaultImpl.cs Wyświetl plik

@@ -25,12 +25,12 @@ namespace JT808.DotNetty.Internal
if (session != null)
{
session.Channel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
resultDto.Code = 200;
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = true;
}
else
{
resultDto.Code = 200;
resultDto.Code = JT808ResultCode.Ok;
resultDto.Data = false;
resultDto.Message = "not session";
}
@@ -38,7 +38,7 @@ namespace JT808.DotNetty.Internal
catch (Exception ex)
{
resultDto.Data = false;
resultDto.Code = 500;
resultDto.Code = JT808ResultCode.Error;
resultDto.Message = Newtonsoft.Json.JsonConvert.SerializeObject(ex);
}
return resultDto;


src/JT808.DotNetty/JT808WebAPIService.cs → src/JT808.DotNetty/Internal/JT808WebAPIService.cs Wyświetl plik

@@ -6,7 +6,7 @@ using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty
namespace JT808.DotNetty.Internal
{
internal class JT808WebAPIService
{
@@ -16,25 +16,39 @@ namespace JT808.DotNetty

private const string sessionRoutePrefix = "Session";

private const string sourcePackagePrefix = "SourcePackage";

private readonly IJT808SessionService jT808SessionService;

private readonly IJT808UnificationSendService jT808UnificationSendService;

private readonly JT808AtomicCounterService jT808AtomicCounterService;

private readonly JT808SourcePackageChannelService jT808SourcePackageChannelService;

/// <summary>
/// 初始化消息处理业务
/// </summary>
public JT808WebAPIService(
public JT808WebAPIService(
JT808AtomicCounterService jT808AtomicCounterService,
JT808SourcePackageChannelService jT808SourcePackageChannelService,
IJT808SessionService jT808SessionService,
IJT808UnificationSendService jT808UnificationSendService)
{
this.jT808AtomicCounterService = jT808AtomicCounterService;
this.jT808SourcePackageChannelService = jT808SourcePackageChannelService;
this.jT808SessionService = jT808SessionService;
this.jT808UnificationSendService = jT808UnificationSendService;
HandlerDict = new Dictionary<string, Func<JT808HttpRequest, JT808HttpResponse>>
{
{$"{RouteTablePrefix}/UnificationSend", UnificationSend},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetAll", GetAll},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/GetAll", GetSessionAll},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByChannelId", RemoveByChannelId},
{$"{RouteTablePrefix}/{sessionRoutePrefix}/RemoveByTerminalPhoneNo", RemoveByTerminalPhoneNo},
{$"{RouteTablePrefix}/GetAtomicCounter", GetAtomicCounter},
{$"{RouteTablePrefix}/{sourcePackagePrefix}/Add", AddSourcePackageAddress},
{$"{RouteTablePrefix}/{sourcePackagePrefix}/Remove", RemoveSourcePackageAddress},
{$"{RouteTablePrefix}/{sourcePackagePrefix}/GetAll", GetSourcePackageAll}
};
}

@@ -59,7 +73,7 @@ namespace JT808.DotNetty
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetAll(JT808HttpRequest request)
public JT808HttpResponse GetSessionAll(JT808HttpRequest request)
{
var result = jT808SessionService.GetAll();
return CreateJT808HttpResponse(result);
@@ -95,6 +109,59 @@ namespace JT808.DotNetty
return CreateJT808HttpResponse(result);
}

/// <summary>
/// 获取包计数器
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetAtomicCounter(JT808HttpRequest request)
{
JT808AtomicCounterDto jT808AtomicCounterDto = new JT808AtomicCounterDto();
jT808AtomicCounterDto.MsgFailCount = jT808AtomicCounterService.MsgFailCount;
jT808AtomicCounterDto.MsgSuccessCount = jT808AtomicCounterService.MsgSuccessCount;
return CreateJT808HttpResponse(jT808AtomicCounterDto);
}

/// <summary>
/// 添加原包转发地址
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse AddSourcePackageAddress(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
JT808IPAddressDto jT808IPAddressDto = JsonConvert.DeserializeObject<JT808IPAddressDto>(request.Json);
return CreateJT808HttpResponse(jT808SourcePackageChannelService.Add(jT808IPAddressDto).Result);
}

/// <summary>
/// 删除原包转发地址(不能删除在网关服务器配置文件配的地址)
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse RemoveSourcePackageAddress(JT808HttpRequest request)
{
if (string.IsNullOrEmpty(request.Json))
{
return EmptyHttpResponse();
}
JT808IPAddressDto jT808IPAddressDto = JsonConvert.DeserializeObject<JT808IPAddressDto>(request.Json);
return CreateJT808HttpResponse(jT808SourcePackageChannelService.Remove(jT808IPAddressDto).Result);
}

/// <summary>
/// 获取原包信息集合
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public JT808HttpResponse GetSourcePackageAll(JT808HttpRequest request)
{
return CreateJT808HttpResponse(jT808SourcePackageChannelService.GetAll());
}

private JT808HttpResponse CreateJT808HttpResponse(dynamic dynamicObject)
{
byte[] data = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(dynamicObject));
@@ -114,8 +181,8 @@ namespace JT808.DotNetty
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code=201,
Message="内容为空",
Code = JT808ResultCode.Empty,
Message ="内容为空",
Data="Content Empty"
}));
return new JT808HttpResponse(json);
@@ -125,7 +192,7 @@ namespace JT808.DotNetty
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code=404,
Code= JT808ResultCode.NotFound,
Message="没有该服务",
Data= "没有该服务"
}));
@@ -136,7 +203,7 @@ namespace JT808.DotNetty
{
byte[] json = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(new JT808ResultDto<string>()
{
Code = 500,
Code = JT808ResultCode.Error,
Message = JsonConvert.SerializeObject(ex),
Data= ex.Message
}));

+ 1
- 0
src/JT808.DotNetty/JT808DotnettyExtensions.cs Wyświetl plik

@@ -38,6 +38,7 @@ namespace JT808.DotNetty
services.TryAddSingleton<JT808SessionManager>();
services.TryAddSingleton<JT808AtomicCounterService>();
services.TryAddSingleton<JT808MsgIdHandlerBase,JT808MsgIdDefaultHandler>();
services.TryAddSingleton<JT808SourcePackageChannelService>();
services.TryAddSingleton<IJT808SourcePackageDispatcher, JT808SourcePackageDispatcherDefaultImpl>();
services.TryAddScoped<JT808ConnectionHandler>();
services.TryAddScoped<JT808Decoder>();


+ 2
- 2
src/JT808.DotNetty/JT808WebAPIServerHost.cs Wyświetl plik

@@ -66,8 +66,8 @@ namespace JT808.DotNetty
pipeline.AddLast("http_jt808webapihandler", scope.ServiceProvider.GetRequiredService<JT808WebAPIServerHandler>());
}
}));
logger.LogInformation($"WebAPI Server start at {IPAddress.Any}:{configuration.WebAPIPort}.");
return bootstrap.BindAsync(configuration.WebAPIPort).ContinueWith(i => bootstrapChannel = i.Result);
logger.LogInformation($"WebAPI Server start at {IPAddress.Any}:{configuration.WebApiPort}.");
return bootstrap.BindAsync(configuration.WebApiPort).ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)


Ładowanie…
Anuluj
Zapisz