Browse Source

增加808转发服务

tags/v2.2.0
smallchi 5 years ago
parent
commit
d1b6022532
10 changed files with 465 additions and 6 deletions
  1. +15
    -0
      src/JT808.DotNetty.Abstractions/IJT808ClientBuilder.cs
  2. +0
    -6
      src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs
  3. +13
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/DataTransferOptions.cs
  4. +11
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/RemoteServerOptions.cs
  5. +76
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Handlers/ClientConnectionHandler.cs
  6. +30
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808.DotNetty.Transmit.csproj
  7. +23
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitExtensions.cs
  8. +52
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs
  9. +236
    -0
      src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs
  10. +9
    -0
      src/JT808.DotNetty.sln

+ 15
- 0
src/JT808.DotNetty.Abstractions/IJT808ClientBuilder.cs View File

@@ -0,0 +1,15 @@
using JT808.DotNetty.Abstractions;
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Abstractions
{
public interface IJT808ClientBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 0
- 6
src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs View File

@@ -22,11 +22,5 @@ namespace JT808.DotNetty.Core.Impls
{ {
return JT808Builder; return JT808Builder;
} }

//public IJT808NettyBuilder ReplaceSessionPublishing<T>() where T : IJT808SessionPublishing
//{
// JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionPublishing), typeof(T), ServiceLifetime.Singleton));
// return this;
//}
} }
} }

+ 13
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/DataTransferOptions.cs View File

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Transmit.Configs
{
public class DataTransferOptions
{
public string Host { get; set; }

public List<string> TerminalNos { get; set; }
}
}

+ 11
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Configs/RemoteServerOptions.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Transmit.Configs
{
public class RemoteServerOptions
{
public List<DataTransferOptions> DataTransfer { get; set; }
}
}

+ 76
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/Handlers/ClientConnectionHandler.cs View File

@@ -0,0 +1,76 @@
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using Polly;
using System;
using System.Linq;
using System.Collections.Generic;
using System.Net;
using System.Text;
using Microsoft.Extensions.Logging;

namespace JT808.DotNetty.Transmit.Handlers
{
public class ClientConnectionHandler : ChannelHandlerAdapter
{
private readonly Bootstrap bootstrap;
public Dictionary<string, IChannel> channeldic;
private readonly ILogger<ClientConnectionHandler> logger;
public ClientConnectionHandler(Bootstrap bootstrap,
Dictionary<string, IChannel> channeldic,
ILoggerFactory loggerFactory)
{
this.bootstrap = bootstrap;
this.channeldic = channeldic;
logger = loggerFactory.CreateLogger<ClientConnectionHandler>();
}
public override void ChannelInactive(IChannelHandlerContext context)
{
Policy.HandleResult<bool>(context.Channel.Open)
.WaitAndRetryForeverAsync(retryAttempt =>
{
return retryAttempt > 20 ? TimeSpan.FromSeconds(Math.Pow(2, 50)) : TimeSpan.FromSeconds(Math.Pow(2, retryAttempt));//超过重试20次,之后重试都是接近12个小时重试一次
},
(exception, timespan, ctx) =>
{
logger.LogError($"服务端断开{context.Channel.RemoteAddress},重试结果{exception.Result},重试次数{timespan},下次重试间隔(s){ctx.TotalSeconds}");
})
.ExecuteAsync(async () =>
{
try
{
var oldChannel = channeldic.FirstOrDefault(m => m.Value == context.Channel);
if (default(KeyValuePair<string, IChannel>).Equals(oldChannel))
{
if(logger.IsEnabled( LogLevel.Debug))
logger.LogDebug($"服务器已经删除了{oldChannel.Key}远程服务器配置");
return true;
}
var channel = await bootstrap.ConnectAsync(context.Channel.RemoteAddress);
channeldic.Remove(oldChannel.Key);
channeldic.Add(oldChannel.Key, channel);
return channel.Open;
}
catch (Exception ex)
{
logger.LogError($"服务端断开后{context.Channel.RemoteAddress},重连异常:{ex}");
return false;
}
});
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogError($"服务端返回消息{message}");
}
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
logger.LogError($"服务端Exception: {exception}");
context.CloseAsync();
}
}
}

+ 30
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808.DotNetty.Transmit.csproj View File

@@ -0,0 +1,30 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\..\SharedProperties.props" />
<PropertyGroup>
<PackageId>JT808.DotNetty.Transmit</PackageId>
<Product>JT808.DotNetty.Transmit</Product>
<Description>基于DotNetty实现的JT808数据转发服务</Description>
<PackageReleaseNotes>基于DotNetty实现的JT808数据转发服务</PackageReleaseNotes>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="DotNetty.Buffers" Version="0.6.0" />
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport" Version="0.6.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Polly" Version="7.1.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE">
<Pack>True</Pack>
<PackagePath></PackagePath>
</None>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj" />
</ItemGroup>

</Project>

+ 23
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitExtensions.cs View File

@@ -0,0 +1,23 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Protocol;
using JT808.DotNetty.Abstractions;
using JT808.DotNetty.Transmit;
using Microsoft.Extensions.Configuration;
using JT808.DotNetty.Transmit.Configs;

namespace JT808.DotNetty.Client
{
public static class JT808DotNettyTransmitExtensions
{
public static IJT808ClientBuilder AddJT808Transmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808DotNettyTransmitService>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808DotNettyTransmitHostedService>();
return jT808ClientBuilder;
}
}
}

+ 52
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs View File

@@ -0,0 +1,52 @@
using DotNetty.Buffers;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;
using DotNetty.Handlers.Logging;
using Polly;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using JT808.DotNetty.Transmit.Configs;
using System.Linq;
using JT808.DotNetty.Transmit.Handlers;
using JT808.DotNetty.Abstractions;
using JT808.Protocol;
using JT808.Protocol.Interfaces;
using Microsoft.Extensions.Hosting;
using System.Threading;

namespace JT808.DotNetty.Transmit
{
public class JT808DotNettyTransmitHostedService:IHostedService
{
private readonly JT808DotNettyTransmitService jT808DotNettyTransmitService;
private readonly IJT808MsgConsumer jT808MsgConsumer;
public JT808DotNettyTransmitHostedService(
IJT808MsgConsumer jT808MsgConsumer,
JT808DotNettyTransmitService jT808DotNettyTransmitService)
{
this.jT808DotNettyTransmitService = jT808DotNettyTransmitService;
this.jT808MsgConsumer = jT808MsgConsumer;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(item=> {
jT808DotNettyTransmitService.SendAsync(item.TerminalNo,item.Data);
});
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 236
- 0
src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs View File

@@ -0,0 +1,236 @@
using DotNetty.Buffers;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using System;
using System.Collections.Generic;
using System.Net;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using JT808.DotNetty.Transmit.Configs;
using System.Linq;
using JT808.DotNetty.Transmit.Handlers;

namespace JT808.DotNetty.Transmit
{
public class JT808DotNettyTransmitService
{
private readonly ILogger logger;
private readonly ILoggerFactory loggerFactory;
private IOptionsMonitor<RemoteServerOptions> optionsMonitor;
public Dictionary<string, IChannel> channeldic = new Dictionary<string, IChannel>();
public JT808DotNettyTransmitService(ILoggerFactory loggerFactory,
IOptionsMonitor<RemoteServerOptions> optionsMonitor)
{
this.loggerFactory = loggerFactory;
logger = loggerFactory.CreateLogger("JT808DotNettyTransmitService");
this.optionsMonitor = optionsMonitor;
InitialDispatcherClient();
}
public void SendAsync(string terminalNo,byte[] data)
{
if (optionsMonitor.CurrentValue.DataTransfer != null)
{
foreach (var item in optionsMonitor.CurrentValue.DataTransfer)
{
if (channeldic.TryGetValue($"all_{item.Host}", out var allClientChannel))
{
try
{
if (allClientChannel.Open)
{
if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug))
{
logger.LogDebug($"转发所有数据到该网关{item.Host}");
}
allClientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else
{
logger.LogError($"{item.Host}链接已关闭");
}
}
catch (Exception ex)
{
logger.LogError($"{item.Host}发送数据出现异常:{ex}");
}
}
else
{
if (item.TerminalNos.Contains(terminalNo) && channeldic.TryGetValue($"{terminalNo}_{item.Host}", out var clientChannel))
{
try
{
if (clientChannel.Open)
{
if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug))
logger.LogDebug($"转发{terminalNo}到该网关{item.Host}");
clientChannel.WriteAndFlushAsync(Unpooled.WrappedBuffer(data));
}
else
{
logger.LogError($"{item.Host},{terminalNo}链接已关闭");
}
}
catch (Exception ex)
{
logger.LogError($"{item.Host},{terminalNo}发送数据出现异常:{ex}");
}
}
}
}
}
}

public void InitialDispatcherClient()
{
Task.Run(async () =>
{
var group = new MultithreadEventLoopGroup();
var bootstrap = new Bootstrap();
bootstrap.Group(group)
.Channel<TcpSocketChannel>()
.Option(ChannelOption.TcpNodelay, true)
.Handler(new ActionChannelInitializer<ISocketChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
pipeline.AddLast(new ClientConnectionHandler(bootstrap, channeldic, loggerFactory));
}));
optionsMonitor.OnChange(options =>
{
List<string> lastRemoteServers = new List<string>();
if (options.DataTransfer != null)
{
if (options.DataTransfer.Any())
{
foreach (var item in options.DataTransfer)
{
if (item.TerminalNos != null)
{
if (item.TerminalNos.Any())
{
foreach (var terminal in item.TerminalNos)
{
lastRemoteServers.Add($"{terminal}_{item.Host}");
}
}
else
{
lastRemoteServers.Add($"all_{item.Host}");
}
}
else
{
lastRemoteServers.Add($"all_{item.Host}");
}
}
}
}
DelRemoteServsers(lastRemoteServers);
AddRemoteServsers(bootstrap, lastRemoteServers);
});
await InitRemoteServsers(bootstrap);
});
}
/// <summary>
/// 初始化远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="remoteServers"></param>
/// <returns></returns>
private async Task InitRemoteServsers(Bootstrap bootstrap)
{
List<string> remoteServers = new List<string>();
if (optionsMonitor.CurrentValue.DataTransfer != null)
{
if (optionsMonitor.CurrentValue.DataTransfer.Any())
{
foreach (var item in optionsMonitor.CurrentValue.DataTransfer)
{
if (item.TerminalNos != null)
{
if (item.TerminalNos.Any())
{
foreach (var terminal in item.TerminalNos)
{
remoteServers.Add($"{terminal}_{item.Host}");
}
}
else
{
remoteServers.Add($"all_{item.Host}");
}
}
else
{
remoteServers.Add($"all_{item.Host}");
}
}
}
}
foreach (var item in remoteServers)
{
try
{
string ip_port = item.Split('_')[1];
IChannel clientChannel = await bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip_port.Split(':')[0]), int.Parse(ip_port.Split(':')[1])));
channeldic.Add(item, clientChannel);
if (clientChannel.Open)
{
if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug))
{
logger.LogDebug($"该终端{item.Replace("_", "已连接上该服务器")}");
}
}
}
catch (Exception ex)
{
logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}");
}
}
await Task.CompletedTask;
}
/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="lastRemoteServers"></param>
private void DelRemoteServsers(List<string> lastRemoteServers)
{
var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList();
foreach (var item in delChannels)
{
channeldic[item].CloseAsync();
channeldic.Remove(item);
}
}
/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="lastRemoteServers"></param>
private void AddRemoteServsers(Bootstrap bootstrap, List<string> lastRemoteServers)
{
var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList();
foreach (var item in addChannels)
{
try
{
var ip_port = item.Split('_')[1];
IChannel clientChannel = bootstrap.ConnectAsync(new IPEndPoint(IPAddress.Parse(ip_port.Split(':')[0]), int.Parse(ip_port.Split(':')[1]))).Result;
channeldic.Add(item, clientChannel);
if (clientChannel.Open) {
if (logger.IsEnabled(Microsoft.Extensions.Logging.LogLevel.Debug))
{
logger.LogDebug($"该终端{item.Replace("_", "已连接上该服务器")}");
}
}
}
catch (Exception ex)
{
logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}");
}
}
}
}
}

+ 9
- 0
src/JT808.DotNetty.sln View File

@@ -43,6 +43,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test",
EndProject EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}"
EndProject EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Services", "Services", "{7F077BD5-8E4C-402A-9E24-DECAF251A420}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Transmit", "JT808.DotNetty.Services\JT808.DotNetty.Transmit\JT808.DotNetty.Transmit.csproj", "{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}"
EndProject
Global Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU Debug|Any CPU = Debug|Any CPU
@@ -121,6 +125,10 @@ Global
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.Build.0 = Release|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.Build.0 = Release|Any CPU
{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Debug|Any CPU.Build.0 = Debug|Any CPU
{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Release|Any CPU.ActiveCfg = Release|Any CPU
{A3A0D0E1-F9AC-4004-A306-9491D2EA6883}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection EndGlobalSection
GlobalSection(SolutionProperties) = preSolution GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE HideSolutionNode = FALSE
@@ -135,6 +143,7 @@ Global
{CCE6AEFB-1AB0-4BD9-8EA2-8B4CDD097E88} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6} {CCE6AEFB-1AB0-4BD9-8EA2-8B4CDD097E88} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22}
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22}
{A3A0D0E1-F9AC-4004-A306-9491D2EA6883} = {7F077BD5-8E4C-402A-9E24-DECAF251A420}
EndGlobalSection EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B} SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B}


Loading…
Cancel
Save