Pārlūkot izejas kodu

添加JT1078DotNetty项目

old
SmallChi pirms 5 gadiem
vecāks
revīzija
c4888990dd
45 mainītis faili ar 1807 papildinājumiem un 0 dzēšanām
  1. +20
    -0
      src/JT1078.DotNetty.Core/Codecs/JT1078TcpDecoder.cs
  2. +21
    -0
      src/JT1078.DotNetty.Core/Codecs/JT1078UdpDecoder.cs
  3. +31
    -0
      src/JT1078.DotNetty.Core/Configurations/JT1078ClientConfiguration.cs
  4. +36
    -0
      src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs
  5. +12
    -0
      src/JT1078.DotNetty.Core/Configurations/JT1078RemoteServerOptions.cs
  6. +26
    -0
      src/JT1078.DotNetty.Core/Converters/JsonIPAddressConverter.cs
  7. +32
    -0
      src/JT1078.DotNetty.Core/Converters/JsonIPEndPointConverter.cs
  8. +15
    -0
      src/JT1078.DotNetty.Core/Enums/JT1078TransportProtocolType.cs
  9. +25
    -0
      src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs
  10. +195
    -0
      src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs
  11. +14
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs
  12. +12
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs
  13. +14
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpBuilder.cs
  14. +10
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpMessageHandlers.cs
  15. +14
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpBuilder.cs
  16. +10
    -0
      src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpMessageHandlers.cs
  17. +20
    -0
      src/JT1078.DotNetty.Core/JT1078.DotNetty.Core.csproj
  18. +79
    -0
      src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs
  19. +49
    -0
      src/JT1078.DotNetty.Core/Metadata/JT1078AtomicCounter.cs
  20. +29
    -0
      src/JT1078.DotNetty.Core/Metadata/JT1078TcpSession.cs
  21. +20
    -0
      src/JT1078.DotNetty.Core/Metadata/JT1078UdpPackage.cs
  22. +35
    -0
      src/JT1078.DotNetty.Core/Metadata/JT1078UdpSession.cs
  23. +52
    -0
      src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterService.cs
  24. +30
    -0
      src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterServiceFactory.cs
  25. +100
    -0
      src/JT1078.DotNetty.Core/Session/JT1078TcpSessionManager.cs
  26. +116
    -0
      src/JT1078.DotNetty.Core/Session/JT1078UdpSessionManager.cs
  27. +99
    -0
      src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpConnectionHandler.cs
  28. +17
    -0
      src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs
  29. +73
    -0
      src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs
  30. +22
    -0
      src/JT1078.DotNetty.Tcp/JT1078.DotNetty.Tcp.csproj
  31. +30
    -0
      src/JT1078.DotNetty.Tcp/JT1078TcpBuilderDefault.cs
  32. +26
    -0
      src/JT1078.DotNetty.Tcp/JT1078TcpDotnettyExtensions.cs
  33. +94
    -0
      src/JT1078.DotNetty.Tcp/JT1078TcpServerHost.cs
  34. +27
    -0
      src/JT1078.DotNetty.TestHosting/Handlers/JT1078TcpMessageHandlers.cs
  35. +24
    -0
      src/JT1078.DotNetty.TestHosting/Handlers/JT1078UdpMessageHandlers.cs
  36. +27
    -0
      src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj
  37. +51
    -0
      src/JT1078.DotNetty.TestHosting/Program.cs
  38. +22
    -0
      src/JT1078.DotNetty.TestHosting/appsettings.json
  39. +17
    -0
      src/JT1078.DotNetty.Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs
  40. +75
    -0
      src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs
  41. +11
    -0
      src/JT1078.DotNetty.Udp/JT1078.DotNetty.Udp.csproj
  42. +30
    -0
      src/JT1078.DotNetty.Udp/JT1078UdpBuilderDefault.cs
  43. +26
    -0
      src/JT1078.DotNetty.Udp/JT1078UdpDotnettyExtensions.cs
  44. +76
    -0
      src/JT1078.DotNetty.Udp/JT1078UdpServerHost.cs
  45. +43
    -0
      src/JT1078DotNetty.sln

+ 20
- 0
src/JT1078.DotNetty.Core/Codecs/JT1078TcpDecoder.cs Parādīt failu

@@ -0,0 +1,20 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using System.Collections.Generic;
using DotNetty.Transport.Channels;
using JT1078.Protocol;
using System;

namespace JT1078.DotNetty.Core.Codecs
{
public class JT1078TcpDecoder : ByteToMessageDecoder
{
protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] buffer = new byte[input.Capacity+4];
input.ReadBytes(buffer, 4, input.Capacity);
Array.Copy(JT1078Package.FH_Bytes, 0,buffer, 0, 4);
output.Add(buffer);
}
}
}

+ 21
- 0
src/JT1078.DotNetty.Core/Codecs/JT1078UdpDecoder.cs Parādīt failu

@@ -0,0 +1,21 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using System.Collections.Generic;
using DotNetty.Transport.Channels.Sockets;
using JT1078.DotNetty.Core.Metadata;

namespace JT1078.DotNetty.Core.Codecs
{
public class JT1078UdpDecoder : MessageToMessageDecoder<DatagramPacket>
{
protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List<object> output)
{
if (!message.Content.IsReadable()) return;
IByteBuffer byteBuffer = message.Content;
byte[] buffer = new byte[byteBuffer.ReadableBytes];
byteBuffer.ReadBytes(buffer);
output.Add(new JT1078UdpPackage(buffer, message.Sender));
}
}
}

+ 31
- 0
src/JT1078.DotNetty.Core/Configurations/JT1078ClientConfiguration.cs Parādīt failu

@@ -0,0 +1,31 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT1078.DotNetty.Core.Configurations
{
public class JT1078ClientConfiguration
{
public string Host { get; set; }

public int Port { get; set; }

private EndPoint endPoint;

public EndPoint EndPoint
{
get
{
if (endPoint == null)
{
if (IPAddress.TryParse(Host, out IPAddress ip))
{
endPoint = new IPEndPoint(ip, Port);
}
}
return endPoint;
}
}
}
}

+ 36
- 0
src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs Parādīt failu

@@ -0,0 +1,36 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Configurations
{
public class JT1078Configuration : IOptions<JT1078Configuration>
{
public int TcpPort { get; set; } = 1808;

public int UdpPort { get; set; } = 1808;

public int QuietPeriodSeconds { get; set; } = 1;

public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds);

public int ShutdownTimeoutSeconds { get; set; } = 3;

public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds);

public int SoBacklog { get; set; } = 8192;

public int EventLoopCount { get; set; } = Environment.ProcessorCount;

public int ReaderIdleTimeSeconds { get; set; } = 3600;

public int WriterIdleTimeSeconds { get; set; } = 3600;

public int AllIdleTimeSeconds { get; set; } = 3600;

public JT1078RemoteServerOptions RemoteServerOptions { get; set; }

public JT1078Configuration Value => this;
}
}

+ 12
- 0
src/JT1078.DotNetty.Core/Configurations/JT1078RemoteServerOptions.cs Parādīt failu

@@ -0,0 +1,12 @@
using Microsoft.Extensions.Options;
using System.Collections.Generic;

namespace JT1078.DotNetty.Core.Configurations
{
public class JT1078RemoteServerOptions:IOptions<JT1078RemoteServerOptions>
{
public List<string> RemoteServers { get; set; }

public JT1078RemoteServerOptions Value => this;
}
}

+ 26
- 0
src/JT1078.DotNetty.Core/Converters/JsonIPAddressConverter.cs Parādīt failu

@@ -0,0 +1,26 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT1078.DotNetty.Core.Converters
{
public class JsonIPAddressConverter : JsonConverter
{
public override bool CanConvert(Type objectType)
{
return (objectType == typeof(IPAddress));
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
writer.WriteValue(value.ToString());
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
return IPAddress.Parse((string)reader.Value);
}
}
}

+ 32
- 0
src/JT1078.DotNetty.Core/Converters/JsonIPEndPointConverter.cs Parādīt failu

@@ -0,0 +1,32 @@
using Newtonsoft.Json;
using Newtonsoft.Json.Linq;
using System;
using System.Net;

namespace JT1078.DotNetty.Core.Converters
{
public class JsonIPEndPointConverter: JsonConverter
{
public override bool CanConvert(Type objectType)
{
return (objectType == typeof(IPEndPoint));
}

public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
IPEndPoint ep = (IPEndPoint)value;
JObject jo = new JObject();
jo.Add("Host", JToken.FromObject(ep.Address, serializer));
jo.Add("Port", ep.Port);
jo.WriteTo(writer);
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
JObject jo = JObject.Load(reader);
IPAddress address = jo["Host"].ToObject<IPAddress>(serializer);
int port = (int)jo["Port"];
return new IPEndPoint(address, port);
}
}
}

+ 15
- 0
src/JT1078.DotNetty.Core/Enums/JT1078TransportProtocolType.cs Parādīt failu

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Enums
{
/// <summary>
/// 传输协议类型
/// </summary>
public enum JT1078TransportProtocolType
{
tcp=1,
udp = 2
}
}

+ 25
- 0
src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs Parādīt failu

@@ -0,0 +1,25 @@
using System;
using System.Collections.Generic;
using System.Text;
using JT1078.DotNetty.Core.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace JT1078.DotNetty.Core.Impl
{
sealed class JT1078BuilderDefault : IJT1078Builder
{
public IServiceCollection Services { get; }

public JT1078BuilderDefault(IServiceCollection services)
{
Services = services;
}

public IJT1078Builder Replace<T>() where T : IJT1078SourcePackageDispatcher
{
Services.Replace(new ServiceDescriptor(typeof(IJT1078SourcePackageDispatcher), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 195
- 0
src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs Parādīt failu

@@ -0,0 +1,195 @@
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Core.Interfaces;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Timers;

namespace JT1078.DotNetty.Core.Impl
{
class JT1078SourcePackageDispatcherDefault : IJT1078SourcePackageDispatcher,IDisposable
{
private readonly ILogger<JT1078SourcePackageDispatcherDefault> logger;
private IOptionsMonitor<JT1078Configuration> optionsMonitor;
private ConcurrentDictionary<string, TcpClient> channeldic = new ConcurrentDictionary<string, TcpClient>();
private Queue<string> reconnectionQueue = new Queue<string>();
public JT1078SourcePackageDispatcherDefault(ILoggerFactory loggerFactory,
IOptionsMonitor<JT1078Configuration> optionsMonitor)
{
logger = loggerFactory.CreateLogger<JT1078SourcePackageDispatcherDefault>();
this.optionsMonitor = optionsMonitor;
timer = new System.Timers.Timer(10000);
timer.Elapsed += new ElapsedEventHandler(timer_Elapsed);
timer.Start();
InitialDispatcherClient();
}

private System.Timers.Timer timer;

public Task SendAsync(byte[] data)
{
foreach (var item in channeldic)
{
try
{
if (item.Value.Connected)
{
item.Value.Client.Send(data);
}
else
{
logger.LogError($"{item}链接已关闭");
channeldic.TryRemove(item.Key, out _);
reconnectionQueue.Enqueue(item.Key);
}
}
catch (Exception ex)
{
logger.LogError($"{item}发送数据出现异常:{ex}");
reconnectionQueue.Enqueue(item.Key);
channeldic.TryRemove(item.Key, out _);
}
}
return Task.CompletedTask;
}

public void InitialDispatcherClient()
{
Task.Run(async () =>
{
optionsMonitor.OnChange(options =>
{
List<string> lastRemoteServers = new List<string>();
if (options.RemoteServerOptions.RemoteServers != null)
{
lastRemoteServers = options.RemoteServerOptions.RemoteServers;
}
DelRemoteServsers(lastRemoteServers);
AddRemoteServsers(lastRemoteServers);
});
await InitRemoteServsers();
});
}

void timer_Elapsed(object sender, ElapsedEventArgs e)
{
timer.Stop();
Thread.CurrentThread.IsBackground = true;
var ip = reconnectionQueue.Dequeue();
if (!string.IsNullOrEmpty(ip))
{
AddRemoteServsers(new List<string>() { ip });
}
timer.Start();
}

/// <summary>
/// 初始化远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="remoteServers"></param>
/// <returns></returns>
private async Task InitRemoteServsers()
{
List<string> remoteServers = new List<string>();
if (optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers != null)
{
remoteServers = optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers;
}
foreach (var item in remoteServers)
{
try
{
TcpClient client = new TcpClient();
client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1])));
if (client.Connected)
{
channeldic.TryAdd(item, client);
}
}
catch (Exception ex)
{
logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}");
}
}
await Task.CompletedTask;
}

/// <summary>
/// 动态删除远程服务器
/// </summary>
/// <param name="lastRemoteServers"></param>
private void DelRemoteServsers(List<string> lastRemoteServers)
{
var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList();
foreach (var item in delChannels)
{
channeldic[item].Close();
channeldic[item].Dispose();
channeldic.TryRemove(item, out var client);
}
}
/// <summary>
/// 动态添加远程服务器
/// </summary>
/// <param name="bootstrap"></param>
/// <param name="lastRemoteServers"></param>
private void AddRemoteServsers(List<string> lastRemoteServers)
{
var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList();
foreach (var item in addChannels)
{
try
{
TcpClient client = new TcpClient();
client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1])));
if (client.Connected)
{
channeldic.TryAdd(item, client);
}
else
{
reconnectionQueue.Enqueue(item);
}
}
catch (Exception ex)
{
logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}");
reconnectionQueue.Enqueue(item);
}
}
}

public void Dispose()
{
timer.Stop();
if (channeldic != null)
{
foreach (var item in channeldic)
{
try
{
if (item.Value.Connected)
{
item.Value.Close();
item.Value.Dispose();
}
}
catch (Exception)
{

}
}
}
timer.Dispose();
}
}
}

+ 14
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs Parādīt failu

@@ -0,0 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078Builder
{
IServiceCollection Services { get; }

IJT1078Builder Replace<T>() where T: IJT1078SourcePackageDispatcher;
}
}

+ 12
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs Parādīt failu

@@ -0,0 +1,12 @@
using System.Threading.Tasks;

namespace JT1078.DotNetty.Core.Interfaces
{
/// <summary>
/// 源包分发器
/// </summary>
public interface IJT1078SourcePackageDispatcher
{
Task SendAsync(byte[] data);
}
}

+ 14
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpBuilder.cs Parādīt failu

@@ -0,0 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078TcpBuilder
{
IJT1078Builder Instance { get;}
IJT1078Builder Builder();
IJT1078TcpBuilder Replace<T>() where T : IJT1078TcpMessageHandlers;
}
}

+ 10
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpMessageHandlers.cs Parādīt failu

@@ -0,0 +1,10 @@
using JT1078.Protocol;
using System.Threading.Tasks;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078TcpMessageHandlers
{
Task Processor(JT1078Package package);
}
}

+ 14
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpBuilder.cs Parādīt failu

@@ -0,0 +1,14 @@
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078UdpBuilder
{
IJT1078Builder Instance { get; }
IJT1078Builder Builder();
IJT1078UdpBuilder Replace<T>() where T : IJT1078UdpMessageHandlers;
}
}

+ 10
- 0
src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpMessageHandlers.cs Parādīt failu

@@ -0,0 +1,10 @@
using JT1078.Protocol;
using System.Threading.Tasks;

namespace JT1078.DotNetty.Core.Interfaces
{
public interface IJT1078UdpMessageHandlers
{
Task Processor(JT1078Package package);
}
}

+ 20
- 0
src/JT1078.DotNetty.Core/JT1078.DotNetty.Core.csproj Parādīt failu

@@ -0,0 +1,20 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>


<ItemGroup>
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
<PackageReference Include="JT1078" Version="1.0.0-preview1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
</ItemGroup>
</Project>

+ 79
- 0
src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs Parādīt failu

@@ -0,0 +1,79 @@
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Core.Converters;
using JT1078.DotNetty.Core.Impl;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.DotNetty.Core.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Options;
using Newtonsoft.Json;
using System;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("JT1078.DotNetty.Core.Test")]
[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp.Test")]
[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp.Test")]
[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp")]
[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp")]
namespace JT1078.DotNetty.Core
{
public static class JT1078CoreDotnettyExtensions
{
static JT1078CoreDotnettyExtensions()
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
Newtonsoft.Json.JsonSerializerSettings settings = new Newtonsoft.Json.JsonSerializerSettings();
//日期类型默认格式化处理
settings.DateFormatHandling = Newtonsoft.Json.DateFormatHandling.MicrosoftDateFormat;
settings.DateFormatString = "yyyy-MM-dd HH:mm:ss";
//空值处理
settings.NullValueHandling = NullValueHandling.Ignore;
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
return settings;
});
}

public static IJT1078Builder AddJT1078Core(this IServiceCollection serviceDescriptors, IConfiguration configuration, Newtonsoft.Json.JsonSerializerSettings settings=null)
{
if (settings != null)
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
return settings;
});
}
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration"));
builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>();
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>();
return builder;
}

public static IJT1078Builder AddJT1078Core(this IServiceCollection serviceDescriptors, Action<JT1078Configuration> jt1078Options, Newtonsoft.Json.JsonSerializerSettings settings = null)
{
if (settings != null)
{
JsonConvert.DefaultSettings = new Func<JsonSerializerSettings>(() =>
{
settings.Converters.Add(new JsonIPAddressConverter());
settings.Converters.Add(new JsonIPEndPointConverter());
settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore;
return settings;
});
}
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure(jt1078Options);
builder.Services.TryAddSingleton<IJT1078SourcePackageDispatcher, JT1078SourcePackageDispatcherDefault>();
builder.Services.TryAddSingleton<JT1078AtomicCounterService>();
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>();
return builder;
}
}
}

+ 49
- 0
src/JT1078.DotNetty.Core/Metadata/JT1078AtomicCounter.cs Parādīt failu

@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT1078.DotNetty.Core.Metadata
{
/// <summary>
///
/// <see cref="Grpc.Core.Internal"/>
/// </summary>
internal class JT1078AtomicCounter
{
long counter = 0;

public JT1078AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public void Reset()
{
Interlocked.Exchange(ref counter, 0);
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Add(long len)
{
return Interlocked.Add(ref counter,len);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 29
- 0
src/JT1078.DotNetty.Core/Metadata/JT1078TcpSession.cs Parādīt failu

@@ -0,0 +1,29 @@
using DotNetty.Transport.Channels;
using System;

namespace JT1078.DotNetty.Core.Metadata
{
public class JT1078TcpSession
{
public JT1078TcpSession(IChannel channel, string terminalPhoneNo)
{
Channel = channel;
TerminalPhoneNo = terminalPhoneNo;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
}

public JT1078TcpSession() { }

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }

public IChannel Channel { get; set; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; set; }
}
}

+ 20
- 0
src/JT1078.DotNetty.Core/Metadata/JT1078UdpPackage.cs Parādīt failu

@@ -0,0 +1,20 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT1078.DotNetty.Core.Metadata
{
public class JT1078UdpPackage
{
public JT1078UdpPackage(byte[] buffer, EndPoint sender)
{
Buffer = buffer;
Sender = sender;
}

public byte[] Buffer { get; }

public EndPoint Sender { get; }
}
}

+ 35
- 0
src/JT1078.DotNetty.Core/Metadata/JT1078UdpSession.cs Parādīt failu

@@ -0,0 +1,35 @@
using DotNetty.Transport.Channels;
using System;
using System.Net;

namespace JT1078.DotNetty.Core.Metadata
{
public class JT1078UdpSession
{
public JT1078UdpSession(IChannel channel,
EndPoint sender,
string terminalPhoneNo)
{
Channel = channel;
TerminalPhoneNo = terminalPhoneNo;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
Sender = sender;
}

public EndPoint Sender { get; set; }

public JT1078UdpSession() { }

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }

public IChannel Channel { get; set; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; set; }
}
}

+ 52
- 0
src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterService.cs Parādīt failu

@@ -0,0 +1,52 @@
using JT1078.DotNetty.Core.Metadata;

namespace JT1078.DotNetty.Core.Services
{
/// <summary>
/// 计数包服务
/// </summary>
public class JT1078AtomicCounterService
{
private readonly JT1078AtomicCounter MsgSuccessCounter;

private readonly JT1078AtomicCounter MsgFailCounter;

public JT1078AtomicCounterService()
{
MsgSuccessCounter=new JT1078AtomicCounter();
MsgFailCounter = new JT1078AtomicCounter();
}

public void Reset()
{
MsgSuccessCounter.Reset();
MsgFailCounter.Reset();
}

public long MsgSuccessIncrement()
{
return MsgSuccessCounter.Increment();
}

public long MsgSuccessCount
{
get
{
return MsgSuccessCounter.Count;
}
}

public long MsgFailIncrement()
{
return MsgFailCounter.Increment();
}

public long MsgFailCount
{
get
{
return MsgFailCounter.Count;
}
}
}
}

+ 30
- 0
src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterServiceFactory.cs Parādīt failu

@@ -0,0 +1,30 @@
using JT1078.DotNetty.Core.Enums;
using System;
using System.Collections.Concurrent;

namespace JT1078.DotNetty.Core.Services
{
public class JT1078AtomicCounterServiceFactory
{
private readonly ConcurrentDictionary<JT1078TransportProtocolType, JT1078AtomicCounterService> cache;

public JT1078AtomicCounterServiceFactory()
{
cache = new ConcurrentDictionary<JT1078TransportProtocolType, JT1078AtomicCounterService>();
}

public JT1078AtomicCounterService Create(JT1078TransportProtocolType type)
{
if(cache.TryGetValue(type,out var service))
{
return service;
}
else
{
var serviceNew = new JT1078AtomicCounterService();
cache.TryAdd(type, serviceNew);
return serviceNew;
}
}
}
}

+ 100
- 0
src/JT1078.DotNetty.Core/Session/JT1078TcpSessionManager.cs Parādīt failu

@@ -0,0 +1,100 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Transport.Channels;
using JT1078.DotNetty.Core.Metadata;

namespace JT1078.DotNetty.Core.Session
{
/// <summary>
/// JT1078 Tcp会话管理
/// </summary>
public class JT1078TcpSessionManager
{
private readonly ILogger<JT1078TcpSessionManager> logger;

public JT1078TcpSessionManager(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078TcpSessionManager>();
}

private ConcurrentDictionary<string, JT1078TcpSession> SessionIdDict = new ConcurrentDictionary<string, JT1078TcpSession>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public JT1078TcpSession GetSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession targetSession))
{
return targetSession;
}
else
{
return default;
}
}

public void TryAdd(string terminalPhoneNo,IChannel channel)
{
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession oldSession))
{
oldSession.LastActiveTime = DateTime.Now;
oldSession.Channel = channel;
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession);
}
else
{
JT1078TcpSession session = new JT1078TcpSession(channel, terminalPhoneNo);
if (SessionIdDict.TryAdd(terminalPhoneNo, session))
{

}
}
}

public JT1078TcpSession RemoveSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo)) return default;
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078TcpSession sessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
return sessionRemove;
}
else
{
return default;
}
}

public void RemoveSessionByChannel(IChannel channel)
{
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var key in terminalPhoneNos)
{
SessionIdDict.TryRemove(key, out JT1078TcpSession sessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
}
}

public IEnumerable<JT1078TcpSession> GetAll()
{
return SessionIdDict.Select(s => s.Value).ToList();
}
}
}


+ 116
- 0
src/JT1078.DotNetty.Core/Session/JT1078UdpSessionManager.cs Parādīt failu

@@ -0,0 +1,116 @@
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Options;
using System.Net;
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Core.Metadata;

namespace JT1078.DotNetty.Core.Session
{
/// <summary>
/// JT1078 udp会话管理
/// 估计要轮询下
/// </summary>
public class JT1078UdpSessionManager
{
private readonly ILogger<JT1078UdpSessionManager> logger;

public JT1078UdpSessionManager(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078UdpSessionManager>();
}

private ConcurrentDictionary<string, JT1078UdpSession> SessionIdDict = new ConcurrentDictionary<string, JT1078UdpSession>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public JT1078UdpSession GetSession(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession targetSession))
{
return targetSession;
}
else
{
return default;
}
}

public void TryAdd(IChannel channel,EndPoint sender,string terminalPhoneNo)
{
//1.先判断是否在缓存里面
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession UdpSession))
{
UdpSession.LastActiveTime=DateTime.Now;
UdpSession.Sender = sender;
UdpSession.Channel = channel;
SessionIdDict.TryUpdate(terminalPhoneNo, UdpSession, UdpSession);
}
else
{
SessionIdDict.TryAdd(terminalPhoneNo, new JT1078UdpSession(channel, sender, terminalPhoneNo));
}
}

public void Heartbeat(string terminalPhoneNo)
{
if (string.IsNullOrEmpty(terminalPhoneNo)) return;
if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession oldSession))
{
oldSession.LastActiveTime = DateTime.Now;
SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession);
}
}

public JT1078UdpSession RemoveSession(string terminalPhoneNo)
{
//设备离线可以进行通知
//使用Redis 发布订阅
if (string.IsNullOrEmpty(terminalPhoneNo)) return default;
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078UdpSession SessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
return SessionRemove;
}
else
{
return default;
}
}

public void RemoveSessionByChannel(IChannel channel)
{
//设备离线可以进行通知
//使用Redis 发布订阅
var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var key in terminalPhoneNos)
{
SessionIdDict.TryRemove(key, out JT1078UdpSession SessionRemove);
}
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
}
}

public IEnumerable<JT1078UdpSession> GetAll()
{
return SessionIdDict.Select(s => s.Value).ToList();
}
}
}


+ 99
- 0
src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpConnectionHandler.cs Parādīt failu

@@ -0,0 +1,99 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT1078.DotNetty.Core.Session;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT1078.DotNetty.Tcp.Handlers
{
/// <summary>
/// JT1078服务通道处理程序
/// </summary>
internal class JT1078TcpConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT1078TcpConnectionHandler> logger;

private readonly JT1078TcpSessionManager SessionManager;

public JT1078TcpConnectionHandler(
JT1078TcpSessionManager sessionManager,
ILoggerFactory loggerFactory)
{
this.SessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT1078TcpConnectionHandler>();
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 设备主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
SessionManager.RemoveSessionByChannel(context.Channel);
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
SessionManager.RemoveSessionByChannel(context.Channel);
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush();

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
if(idleStateEvent.State== IdleState.ReaderIdle)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception,$"{channelId} {exception.Message}" );
SessionManager.RemoveSessionByChannel(context.Channel);
context.CloseAsync();
}
}
}


+ 17
- 0
src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs Parādīt failu

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.Protocol;

namespace JT1078.DotNetty.Tcp.Handlers
{
class JT1078TcpMessageProcessorEmptyImpl : IJT1078TcpMessageHandlers
{
public Task Processor(JT1078Package package)
{
return Task.CompletedTask;
}
}
}

+ 73
- 0
src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs Parādīt failu

@@ -0,0 +1,73 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using Microsoft.Extensions.Logging;
using JT1078.DotNetty.Core.Enums;
using JT1078.DotNetty.Core.Services;
using JT1078.Protocol;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.DotNetty.Core.Session;

namespace JT1078.DotNetty.Tcp.Handlers
{
/// <summary>
/// JT1078 服务端处理程序
/// </summary>
internal class JT1078TcpServerHandler : SimpleChannelInboundHandler<byte[]>
{
private readonly JT1078TcpSessionManager SessionManager;

private readonly JT1078AtomicCounterService AtomicCounterService;

private readonly ILogger<JT1078TcpServerHandler> logger;

private readonly IJT1078TcpMessageHandlers handlers;

private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher;

public JT1078TcpServerHandler(
IJT1078SourcePackageDispatcher sourcePackageDispatcher,
IJT1078TcpMessageHandlers handlers,
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
JT1078TcpSessionManager sessionManager)
{
this.sourcePackageDispatcher = sourcePackageDispatcher;
this.handlers = handlers;
this.SessionManager = sessionManager;
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp);
logger = loggerFactory.CreateLogger<JT1078TcpServerHandler>();
}


protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
sourcePackageDispatcher.SendAsync(msg);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg));
}
JT1078Package package = JT1078Serializer.Deserialize(msg);
AtomicCounterService.MsgSuccessIncrement();
SessionManager.TryAdd(package.SIM, ctx.Channel);
handlers.Processor(package);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
}
}
catch (Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg));
}
}
}
}
}

+ 22
- 0
src/JT1078.DotNetty.Tcp/JT1078.DotNetty.Tcp.csproj Parādīt failu

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\JT1078.DotNetty.Core\JT1078.DotNetty.Core.csproj" />
</ItemGroup>


<ItemGroup>
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" />
</ItemGroup>

</Project>

+ 30
- 0
src/JT1078.DotNetty.Tcp/JT1078TcpBuilderDefault.cs Parādīt failu

@@ -0,0 +1,30 @@
using JT1078.DotNetty.Core.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Tcp
{
class JT1078TcpBuilderDefault : IJT1078TcpBuilder
{
public IJT1078Builder Instance { get; }

public JT1078TcpBuilderDefault(IJT1078Builder builder)
{
Instance = builder;
}

public IJT1078Builder Builder()
{
return Instance;
}

public IJT1078TcpBuilder Replace<T>() where T : IJT1078TcpMessageHandlers
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078TcpMessageHandlers), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 26
- 0
src/JT1078.DotNetty.Tcp/JT1078TcpDotnettyExtensions.cs Parādīt failu

@@ -0,0 +1,26 @@
using JT1078.DotNetty.Core.Codecs;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.DotNetty.Core.Session;
using JT1078.DotNetty.Tcp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp.Test")]

namespace JT1078.DotNetty.Tcp
{
public static class JT1078TcpDotnettyExtensions
{
public static IJT1078TcpBuilder AddJT1078TcpHost(this IJT1078Builder builder)
{
builder.Services.TryAddSingleton<JT1078TcpSessionManager>();
builder.Services.TryAddScoped<JT1078TcpConnectionHandler>();
builder.Services.TryAddScoped<JT1078TcpDecoder>();
builder.Services.TryAddSingleton<IJT1078TcpMessageHandlers, JT1078TcpMessageProcessorEmptyImpl>();
builder.Services.TryAddScoped<JT1078TcpServerHandler>();
builder.Services.AddHostedService<JT1078TcpServerHost>();
return new JT1078TcpBuilderDefault(builder);
}
}
}

+ 94
- 0
src/JT1078.DotNetty.Tcp/JT1078TcpServerHost.cs Parādīt failu

@@ -0,0 +1,94 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT1078.DotNetty.Core.Codecs;
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Tcp.Handlers;
using JT1078.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.DotNetty.Tcp
{
/// <summary>
/// JT1078 Tcp网关服务
/// </summary>
internal class JT1078TcpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT1078Configuration configuration;
private readonly ILogger<JT1078TcpServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;
private IByteBufferAllocator serverBufferAllocator;

public JT1078TcpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT1078Configuration> configurationAccessor)
{
serviceProvider = provider;
configuration = configurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT1078TcpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, configuration.SoBacklog)
.ChildOption(ChannelOption.Allocator, serverBufferAllocator)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
channel.Pipeline.AddLast("JT1078TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,true,
Unpooled.CopiedBuffer(JT1078Package.FH_Bytes)));
channel.Pipeline.AddLast("JT1078TcpDecode", scope.ServiceProvider.GetRequiredService<JT1078TcpDecoder>());
channel.Pipeline.AddLast("JT1078SystemIdleState", new IdleStateHandler(
configuration.ReaderIdleTimeSeconds,
configuration.WriterIdleTimeSeconds,
configuration.AllIdleTimeSeconds));
channel.Pipeline.AddLast("JT1078TcpConnection", scope.ServiceProvider.GetRequiredService<JT1078TcpConnectionHandler>());
channel.Pipeline.AddLast("JT1078TcpService", scope.ServiceProvider.GetRequiredService<JT1078TcpServerHandler>());
}
}));
logger.LogInformation($"JT1078 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}.");
return bootstrap.BindAsync(configuration.TcpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 27
- 0
src/JT1078.DotNetty.TestHosting/Handlers/JT1078TcpMessageHandlers.cs Parādīt failu

@@ -0,0 +1,27 @@
using JT1078.DotNetty.Core.Interfaces;
using JT1078.Protocol;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT1078.DotNetty.TestHosting.Handlers
{
public class JT1078TcpMessageHandlers : IJT1078TcpMessageHandlers
{
private readonly ILogger<JT1078TcpMessageHandlers> logger;

public JT1078TcpMessageHandlers(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078TcpMessageHandlers>();
}

public Task Processor(JT1078Package package)
{
logger.LogDebug(JsonConvert.SerializeObject(package));
return Task.CompletedTask;
}
}
}

+ 24
- 0
src/JT1078.DotNetty.TestHosting/Handlers/JT1078UdpMessageHandlers.cs Parādīt failu

@@ -0,0 +1,24 @@
using JT1078.DotNetty.Core.Interfaces;
using JT1078.Protocol;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using System.Threading.Tasks;

namespace JT1078.DotNetty.TestHosting.Handlers
{
public class JT1078UdpMessageHandlers : IJT1078UdpMessageHandlers
{
private readonly ILogger<JT1078UdpMessageHandlers> logger;

public JT1078UdpMessageHandlers(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078UdpMessageHandlers>();
}

public Task Processor(JT1078Package package)
{
logger.LogDebug(JsonConvert.SerializeObject(package));
return Task.CompletedTask;
}
}
}

+ 27
- 0
src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj Parādīt failu

@@ -0,0 +1,27 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.2</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.2.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT1078.DotNetty.Tcp\JT1078.DotNetty.Tcp.csproj" />
<ProjectReference Include="..\JT1078.DotNetty.Udp\JT1078.DotNetty.Udp.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>

+ 51
- 0
src/JT1078.DotNetty.TestHosting/Program.cs Parādīt failu

@@ -0,0 +1,51 @@
using JT1078.DotNetty.Core;
using JT1078.DotNetty.Tcp;
using JT1078.DotNetty.TestHosting.Handlers;
using JT1078.DotNetty.Udp;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.DotNetty.TestHosting
{
class Program
{
static async Task Main(string[] args)
{
//3031636481E2108801123456781001100000016BB392CA7C02800028002E0000000161E1A2BF0098CFC0EE1E17283407788E39A403FDDBD1D546BFB063013F59AC34C97A021AB96A28A42C08
var serverHostBuilder = new HostBuilder()
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
})
.ConfigureLogging((context, logging) =>
{
logging.SetMinimumLevel(LogLevel.Trace);
logging.AddConsole();
})
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT1078Core(hostContext.Configuration)
.AddJT1078TcpHost()
.Replace<JT1078TcpMessageHandlers>()
.Builder()
.AddJT1078UdpHost()
.Replace<JT1078UdpMessageHandlers>()
.Builder();
});

await serverHostBuilder.RunConsoleAsync();
}
}
}

+ 22
- 0
src/JT1078.DotNetty.TestHosting/appsettings.json Parādīt failu

@@ -0,0 +1,22 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
},
"JT1078Configuration": {
"TcpPort": 1808,
"UdpPort": 1808,
"RemoteServerOptions": {
"RemoteServers": ["172.16.19.209:16868"]
}
}
}

+ 17
- 0
src/JT1078.DotNetty.Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs Parādīt failu

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.Protocol;

namespace JT1078.DotNetty.Udp.Handlers
{
class JT1078UdpMessageProcessorEmptyImpl : IJT1078UdpMessageHandlers
{
public Task Processor(JT1078Package package)
{
return Task.CompletedTask;
}
}
}

+ 75
- 0
src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs Parādīt failu

@@ -0,0 +1,75 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using Microsoft.Extensions.Logging;
using JT1078.DotNetty.Core.Metadata;
using JT1078.DotNetty.Core.Session;
using JT1078.DotNetty.Core.Services;
using JT1078.DotNetty.Core.Enums;
using JT1078.Protocol;
using JT1078.DotNetty.Core.Interfaces;

namespace JT1078.DotNetty.Udp.Handlers
{
/// <summary>
/// JT1078 Udp服务端处理程序
/// </summary>
internal class JT1078UdpServerHandler : SimpleChannelInboundHandler<JT1078UdpPackage>
{
private readonly ILogger<JT1078UdpServerHandler> logger;

private readonly JT1078UdpSessionManager SessionManager;

private readonly JT1078AtomicCounterService AtomicCounterService;

private readonly IJT1078UdpMessageHandlers handlers;

private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher;
public JT1078UdpServerHandler(
IJT1078SourcePackageDispatcher sourcePackageDispatcher,
ILoggerFactory loggerFactory,
JT1078AtomicCounterServiceFactory atomicCounterServiceFactory,
IJT1078UdpMessageHandlers handlers,
JT1078UdpSessionManager sessionManager)
{
this.sourcePackageDispatcher = sourcePackageDispatcher;
this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp);
this.SessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT1078UdpServerHandler>();
this.handlers = handlers;
}

protected override void ChannelRead0(IChannelHandlerContext ctx, JT1078UdpPackage msg)
{
try
{
sourcePackageDispatcher.SendAsync(msg.Buffer);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg.Buffer));
}
JT1078Package package = JT1078Serializer.Deserialize(msg.Buffer);
AtomicCounterService.MsgSuccessIncrement();
SessionManager.TryAdd(ctx.Channel, msg.Sender, package.SIM);
handlers.Processor(package);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString());
}
}
catch (Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString());
logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer));
}
}
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();

}
}

+ 11
- 0
src/JT1078.DotNetty.Udp/JT1078.DotNetty.Udp.csproj Parādīt failu

@@ -0,0 +1,11 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<ProjectReference Include="..\JT1078.DotNetty.Core\JT1078.DotNetty.Core.csproj" />
</ItemGroup>

</Project>

+ 30
- 0
src/JT1078.DotNetty.Udp/JT1078UdpBuilderDefault.cs Parādīt failu

@@ -0,0 +1,30 @@
using JT1078.DotNetty.Core.Interfaces;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.DotNetty.Udp
{
class JT1078UdpBuilderDefault : IJT1078UdpBuilder
{
public IJT1078Builder Instance { get; }

public JT1078UdpBuilderDefault(IJT1078Builder builder)
{
Instance = builder;
}

public IJT1078Builder Builder()
{
return Instance;
}

public IJT1078UdpBuilder Replace<T>() where T : IJT1078UdpMessageHandlers
{
Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078UdpMessageHandlers), typeof(T), ServiceLifetime.Singleton));
return this;
}
}
}

+ 26
- 0
src/JT1078.DotNetty.Udp/JT1078UdpDotnettyExtensions.cs Parādīt failu

@@ -0,0 +1,26 @@
using JT1078.DotNetty.Core.Codecs;
using JT1078.DotNetty.Core.Interfaces;
using JT1078.DotNetty.Core.Session;
using JT1078.DotNetty.Udp;
using JT1078.DotNetty.Udp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp.Test")]

namespace JT1078.DotNetty.Udp
{
public static class JT1078UdpDotnettyExtensions
{
public static IJT1078UdpBuilder AddJT1078UdpHost(this IJT1078Builder builder)
{
builder.Services.TryAddSingleton<JT1078UdpSessionManager>();
builder.Services.TryAddSingleton<IJT1078UdpMessageHandlers, JT1078UdpMessageProcessorEmptyImpl>();
builder.Services.TryAddScoped<JT1078UdpDecoder>();
builder.Services.TryAddScoped<JT1078UdpServerHandler>();
builder.Services.AddHostedService<JT1078UdpServerHost>();
return new JT1078UdpBuilderDefault(builder);
}
}
}

+ 76
- 0
src/JT1078.DotNetty.Udp/JT1078UdpServerHost.cs Parādīt failu

@@ -0,0 +1,76 @@
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using JT1078.DotNetty.Core.Codecs;
using JT1078.DotNetty.Core.Configurations;
using JT1078.DotNetty.Udp.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Net;
using System.Runtime.InteropServices;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.DotNetty.Udp
{
/// <summary>
/// JT1078 Udp网关服务
/// </summary>
internal class JT1078UdpServerHost : IHostedService
{
private readonly IServiceProvider serviceProvider;
private readonly JT1078Configuration configuration;
private readonly ILogger<JT1078UdpServerHost> logger;
private MultithreadEventLoopGroup group;
private IChannel bootstrapChannel;

public JT1078UdpServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT1078Configuration> jT808ConfigurationAccessor)
{
serviceProvider = provider;
configuration = jT808ConfigurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT1078UdpServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
group = new MultithreadEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.Group(group);
bootstrap.Channel<SocketDatagramChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true);
}
bootstrap
.Option(ChannelOption.SoBroadcast, true)
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using (var scope = serviceProvider.CreateScope())
{
pipeline.AddLast("JT1078UdpDecoder", scope.ServiceProvider.GetRequiredService<JT1078UdpDecoder>());
pipeline.AddLast("JT1078UdpService", scope.ServiceProvider.GetRequiredService<JT1078UdpServerHandler>());
}
}));
logger.LogInformation($"JT1078 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}.");
return bootstrap.BindAsync(configuration.UdpPort)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 43
- 0
src/JT1078DotNetty.sln Parādīt failu

@@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29009.5
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Core", "JT1078.DotNetty.Core\JT1078.DotNetty.Core.csproj", "{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Tcp", "JT1078.DotNetty.Tcp\JT1078.DotNetty.Tcp.csproj", "{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.TestHosting", "JT1078.DotNetty.TestHosting\JT1078.DotNetty.TestHosting.csproj", "{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Udp", "JT1078.DotNetty.Udp\JT1078.DotNetty.Udp.csproj", "{6405D7FA-3B6E-4545-827E-BA13EB5BB268}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Debug|Any CPU.Build.0 = Debug|Any CPU
{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Release|Any CPU.ActiveCfg = Release|Any CPU
{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Release|Any CPU.Build.0 = Release|Any CPU
{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Release|Any CPU.Build.0 = Release|Any CPU
{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Release|Any CPU.Build.0 = Release|Any CPU
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {07ED058A-2CEB-43FD-8478-7EEC7E56F868}
EndGlobalSection
EndGlobal

Notiek ielāde…
Atcelt
Saglabāt