Procházet zdrojové kódy

增加源包转发默认实现

tags/v1.0.0
SmallChi před 6 roky
rodič
revize
e669485b8c
10 změnil soubory, kde provedl 266 přidání a 13 odebrání
  1. +8
    -1
      src/JT808.DotNetty/Codecs/JT808Decoder.cs
  2. +1
    -1
      src/JT808.DotNetty/Configurations/JT808ClientConfiguration.cs
  3. +4
    -0
      src/JT808.DotNetty/Configurations/JT808Configuration.cs
  4. +0
    -11
      src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherConfiguration.cs
  5. +71
    -0
      src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs
  6. +3
    -0
      src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs
  7. +173
    -0
      src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs
  8. +1
    -0
      src/JT808.DotNetty/JT808.DotNetty.csproj
  9. +2
    -0
      src/JT808.DotNetty/JT808DotnettyExtensions.cs
  10. +3
    -0
      src/JT808.DotNetty/JT808MsgIdHandlerBase.cs

+ 8
- 1
src/JT808.DotNetty/Codecs/JT808Decoder.cs Zobrazit soubor

@@ -7,6 +7,7 @@ using System.Collections.Generic;
using System.Text;
using JT808.Protocol;
using JT808.DotNetty.Internal;
using JT808.DotNetty.Interfaces;

namespace JT808.DotNetty.Codecs
{
@@ -17,9 +18,14 @@ namespace JT808.DotNetty.Codecs
{
private readonly ILogger<JT808Decoder> logger;

public JT808Decoder(ILoggerFactory loggerFactory)
private readonly IJT808SourcePackageDispatcher jT808SourcePackageDispatcher;

public JT808Decoder(
IJT808SourcePackageDispatcher jT808SourcePackageDispatcher,
ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<JT808Decoder>();
this.jT808SourcePackageDispatcher = jT808SourcePackageDispatcher;
}

private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter();
@@ -34,6 +40,7 @@ namespace JT808.DotNetty.Codecs
input.ReadBytes(buffer, 1, input.Capacity);
buffer[0] = JT808Package.BeginFlag;
buffer[input.Capacity + 1] = JT808Package.EndFlag;
jT808SourcePackageDispatcher?.SendAsync(buffer);
JT808Package jT808Package = JT808Serializer.Deserialize<JT808Package>(buffer);
output.Add(jT808Package);
MsgSuccessCounter.Increment();


src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherInfo.cs → src/JT808.DotNetty/Configurations/JT808ClientConfiguration.cs Zobrazit soubor

@@ -5,7 +5,7 @@ using System.Text;

namespace JT808.DotNetty.Configurations
{
public class JT808SourcePackageDispatcherInfo
public class JT808ClientConfiguration
{
public string Host { get; set; }


+ 4
- 0
src/JT808.DotNetty/Configurations/JT808Configuration.cs Zobrazit soubor

@@ -30,5 +30,9 @@ namespace JT808.DotNetty.Configurations
/// 默认5分钟
/// </summary>
public int SessionReportTime { get; set; } = 30000;
/// <summary>
/// 源包分发器配置
/// </summary>
public List<JT808ClientConfiguration> SourcePackageDispatcherClientConfigurations { get; set; }
}
}

+ 0
- 11
src/JT808.DotNetty/Configurations/JT808SourcePackageDispatcherConfiguration.cs Zobrazit soubor

@@ -1,11 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Configurations
{
public class JT808SourcePackageDispatcherConfiguration
{
public IList<JT808SourcePackageDispatcherInfo> SourcePackageDispatchers { get; set; }
}
}

+ 71
- 0
src/JT808.DotNetty/Handlers/JT808SourcePackageDispatcherHandler.cs Zobrazit soubor

@@ -0,0 +1,71 @@
using DotNetty.Transport.Channels;
using JT808.DotNetty.Internal;
using Microsoft.Extensions.Logging;
using Polly;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;

namespace JT808.DotNetty.Handlers
{
internal class JT808SourcePackageDispatcherHandler: ChannelHandlerAdapter
{
private readonly ILogger<JT808SourcePackageDispatcherHandler> logger;

private readonly JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl;

public JT808SourcePackageDispatcherHandler(JT808SourcePackageDispatcherDefaultImpl jT808SourcePackageDispatcherDefaultImpl)
{
logger=jT808SourcePackageDispatcherDefaultImpl.loggerFactory.CreateLogger<JT808SourcePackageDispatcherHandler>();
this.jT808SourcePackageDispatcherDefaultImpl = jT808SourcePackageDispatcherDefaultImpl;
}

public override void ChannelInactive(IChannelHandlerContext context)
{
Policy.HandleResult<bool>(context.Channel.Open)
.WaitAndRetryForeverAsync(retryAttempt =>
{
return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,接近12个小时链接一次
},(exception, timespan, ctx) =>
{
logger.LogError($"服务端断开{context.Channel.RemoteAddress.ToString()},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}");
})
.ExecuteAsync(async () =>
{
try
{
var newChannel = jT808SourcePackageDispatcherDefaultImpl.channels.FirstOrDefault(m => m.Value == context.Channel);
if (default(KeyValuePair<EndPoint, IChannel>).Equals(newChannel))
{
if(logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"服务器已经删除了{context.Channel.RemoteAddress.ToString()}远程服务器配置");
return true;
}
var channel = await jT808SourcePackageDispatcherDefaultImpl.bootstrap.ConnectAsync(context.Channel.RemoteAddress);
jT808SourcePackageDispatcherDefaultImpl.channels.AddOrUpdate(newChannel.Key, channel, (x, y) => channel);
return channel.Open;
}
catch (Exception ex)
{
logger.LogError($"服务端断开后{context.Channel.RemoteAddress.ToString()},重连异常:{ex}");
return false;
}
});
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
if(logger.IsEnabled(LogLevel.Debug))
logger.LogError($"服务端返回消息{message.ToString()}");
throw new Exception("test");
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
logger.LogError(exception, context.Channel.RemoteAddress.ToString());
context.CloseAsync();
}
}
}

+ 3
- 0
src/JT808.DotNetty/Interfaces/IJT808SourcePackageDispatcher.cs Zobrazit soubor

@@ -7,6 +7,9 @@ namespace JT808.DotNetty.Interfaces
{
/// <summary>
/// 源包分发器
/// 自定义源包分发器业务
/// ConfigureServices:
/// services.Replace(new ServiceDescriptor(typeof(IJT808SourcePackageDispatcher),typeof(JT808SourcePackageDispatcherDefaultImpl),ServiceLifetime.Singleton));
/// </summary>
public interface IJT808SourcePackageDispatcher
{


+ 173
- 0
src/JT808.DotNetty/Internal/JT808SourcePackageDispatcherDefaultImpl.cs Zobrazit soubor

@@ -0,0 +1,173 @@
using DotNetty.Buffers;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Handlers;
using JT808.DotNetty.Interfaces;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.DotNetty.Internal
{
/// <summary>
/// 源包分发器默认实现
/// </summary>
internal class JT808SourcePackageDispatcherDefaultImpl : IJT808SourcePackageDispatcher, IDisposable
{
private readonly MultithreadEventLoopGroup group = new MultithreadEventLoopGroup();
internal readonly Bootstrap bootstrap = new Bootstrap();
internal readonly ConcurrentDictionary<EndPoint, IChannel> channels = new ConcurrentDictionary<EndPoint, IChannel>();
private readonly ILogger<JT808SourcePackageDispatcherDefaultImpl> logger;
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor;
internal readonly ILoggerFactory loggerFactory;

public JT808SourcePackageDispatcherDefaultImpl(ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor)
{
this.loggerFactory = loggerFactory;
this.logger = loggerFactory.CreateLogger<JT808SourcePackageDispatcherDefaultImpl>();
this.jT808ConfigurationOptionsMonitor = jT808ConfigurationOptionsMonitor;
StartAsync();
}

public async Task SendAsync(byte[] data)
{
foreach (var item in channels)
{
try
{
if (item.Value.Open && item.Value.Active)
{
await item.Value.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else
{
logger.LogError($"{item}链接已关闭");
}
}
catch (Exception ex)
{
logger.LogError($"{item}发送数据出现异常:{ex}");
}
}
await Task.CompletedTask;
}

public void StartAsync()
{
bootstrap
.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast(new JT808SourcePackageDispatcherHandler(this));
}));
jT808ConfigurationOptionsMonitor.OnChange(options =>
{
List<JT808ClientConfiguration> chgRemoteServers = new List<JT808ClientConfiguration>();
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
chgRemoteServers = options.SourcePackageDispatcherClientConfigurations;
}
DelRemoteServsers(chgRemoteServers);
AddRemoteServsers(chgRemoteServers);
});
if (jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations != null && jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations.Count > 0)
{
foreach (var item in jT808ConfigurationOptionsMonitor.CurrentValue.SourcePackageDispatcherClientConfigurations)
{
try
{
Task.Run(async () =>
{
IChannel clientChannel = await bootstrap.ConnectAsync(item.EndPoint);
channels.TryAdd(item.EndPoint, clientChannel);
logger.LogInformation($"初始化链接远程服务端{item.EndPoint.ToString()}");
});
}
catch (Exception ex)
{
logger.LogError($"初始化链接远程服务端{item},链接异常:{ex}");
}
}
}
}

public Task StopAsync()
{
foreach (var channel in channels)
{
try
{
channel.Value.CloseAsync();
}
catch
{

}
}
group.ShutdownGracefullyAsync(jT808ConfigurationOptionsMonitor.CurrentValue.QuietPeriodTimeSpan, jT808ConfigurationOptionsMonitor.CurrentValue.ShutdownTimeoutTimeSpan);
return Task.CompletedTask;
}

/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="chgRemoteServers"></param>
private void DelRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var delChannels = channels.Keys.Except(chgRemoteServers.Select(s=>s.EndPoint)).ToList();
foreach (var item in delChannels)
{
try
{
channels.TryRemove(item, out var channel);
channel.CloseAsync();
}
catch
{
}
}
}

/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="chgRemoteServers"></param>
private async void AddRemoteServsers(List<JT808ClientConfiguration> chgRemoteServers)
{
var addChannels = chgRemoteServers.Select(s=>s.EndPoint).Except(channels.Keys).ToList();
foreach (var item in addChannels)
{
try
{
IChannel clientChannel =await bootstrap.ConnectAsync(item);
channels.TryAdd(item, clientChannel);
logger.LogInformation($"变更后链接远程服务端{item}");
}
catch (Exception ex)
{
logger.LogError($"变更后链接远程服务端{item},重连异常:{ex}");
}
}
}

public void Dispose()
{
StopAsync();
}
}
}

+ 1
- 0
src/JT808.DotNetty/JT808.DotNetty.csproj Zobrazit soubor

@@ -14,6 +14,7 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.1" />
<PackageReference Include="Polly" Version="6.1.0" />
</ItemGroup>

</Project>

+ 2
- 0
src/JT808.DotNetty/JT808DotnettyExtensions.cs Zobrazit soubor

@@ -1,6 +1,7 @@
using JT808.DotNetty.Codecs;
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Handlers;
using JT808.DotNetty.Interfaces;
using JT808.DotNetty.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
@@ -19,6 +20,7 @@ namespace JT808.DotNetty
services.Configure<JT808Configuration>(hostContext.Configuration.GetSection("JT808Configuration"));
services.TryAddSingleton<JT808SessionManager>();
services.TryAddSingleton<JT808MsgIdHandlerBase,JT808MsgIdDefaultHandler>();
services.TryAddSingleton<IJT808SourcePackageDispatcher, JT808SourcePackageDispatcherDefaultImpl>();
services.TryAddScoped<JT808ConnectionHandler>();
services.TryAddScoped<JT808Decoder>();
services.TryAddScoped<JT808ServerHandler>();


+ 3
- 0
src/JT808.DotNetty/JT808MsgIdHandlerBase.cs Zobrazit soubor

@@ -11,6 +11,9 @@ namespace JT808.DotNetty
{
/// <summary>
/// 抽象消息处理业务
/// 自定义消息处理业务
/// ConfigureServices:
/// services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHandlerBase),typeof(JT808MsgIdCustomHandlerImpl),ServiceLifetime.Singleton));
/// </summary>
public abstract class JT808MsgIdHandlerBase
{


Načítá se…
Zrušit
Uložit