diff --git a/src/JT1078.DotNetty.Core/Codecs/JT1078TcpDecoder.cs b/src/JT1078.DotNetty.Core/Codecs/JT1078TcpDecoder.cs new file mode 100644 index 0000000..600623e --- /dev/null +++ b/src/JT1078.DotNetty.Core/Codecs/JT1078TcpDecoder.cs @@ -0,0 +1,20 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using System.Collections.Generic; +using DotNetty.Transport.Channels; +using JT1078.Protocol; +using System; + +namespace JT1078.DotNetty.Core.Codecs +{ + public class JT1078TcpDecoder : ByteToMessageDecoder + { + protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List output) + { + byte[] buffer = new byte[input.Capacity+4]; + input.ReadBytes(buffer, 4, input.Capacity); + Array.Copy(JT1078Package.FH_Bytes, 0,buffer, 0, 4); + output.Add(buffer); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Codecs/JT1078UdpDecoder.cs b/src/JT1078.DotNetty.Core/Codecs/JT1078UdpDecoder.cs new file mode 100644 index 0000000..0891cb9 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Codecs/JT1078UdpDecoder.cs @@ -0,0 +1,21 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Transport.Channels; +using System.Collections.Generic; +using DotNetty.Transport.Channels.Sockets; +using JT1078.DotNetty.Core.Metadata; + +namespace JT1078.DotNetty.Core.Codecs +{ + public class JT1078UdpDecoder : MessageToMessageDecoder + { + protected override void Decode(IChannelHandlerContext context, DatagramPacket message, List output) + { + if (!message.Content.IsReadable()) return; + IByteBuffer byteBuffer = message.Content; + byte[] buffer = new byte[byteBuffer.ReadableBytes]; + byteBuffer.ReadBytes(buffer); + output.Add(new JT1078UdpPackage(buffer, message.Sender)); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Configurations/JT1078ClientConfiguration.cs b/src/JT1078.DotNetty.Core/Configurations/JT1078ClientConfiguration.cs new file mode 100644 index 0000000..dae4820 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Configurations/JT1078ClientConfiguration.cs @@ -0,0 +1,31 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT1078.DotNetty.Core.Configurations +{ + public class JT1078ClientConfiguration + { + public string Host { get; set; } + + public int Port { get; set; } + + private EndPoint endPoint; + + public EndPoint EndPoint + { + get + { + if (endPoint == null) + { + if (IPAddress.TryParse(Host, out IPAddress ip)) + { + endPoint = new IPEndPoint(ip, Port); + } + } + return endPoint; + } + } + } +} diff --git a/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs b/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs new file mode 100644 index 0000000..a98f3e0 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Configurations/JT1078Configuration.cs @@ -0,0 +1,36 @@ +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Configurations +{ + public class JT1078Configuration : IOptions + { + public int TcpPort { get; set; } = 1808; + + public int UdpPort { get; set; } = 1808; + + public int QuietPeriodSeconds { get; set; } = 1; + + public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); + + public int ShutdownTimeoutSeconds { get; set; } = 3; + + public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); + + public int SoBacklog { get; set; } = 8192; + + public int EventLoopCount { get; set; } = Environment.ProcessorCount; + + public int ReaderIdleTimeSeconds { get; set; } = 3600; + + public int WriterIdleTimeSeconds { get; set; } = 3600; + + public int AllIdleTimeSeconds { get; set; } = 3600; + + public JT1078RemoteServerOptions RemoteServerOptions { get; set; } + + public JT1078Configuration Value => this; + } +} diff --git a/src/JT1078.DotNetty.Core/Configurations/JT1078RemoteServerOptions.cs b/src/JT1078.DotNetty.Core/Configurations/JT1078RemoteServerOptions.cs new file mode 100644 index 0000000..7c9d47e --- /dev/null +++ b/src/JT1078.DotNetty.Core/Configurations/JT1078RemoteServerOptions.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.Options; +using System.Collections.Generic; + +namespace JT1078.DotNetty.Core.Configurations +{ + public class JT1078RemoteServerOptions:IOptions + { + public List RemoteServers { get; set; } + + public JT1078RemoteServerOptions Value => this; + } +} diff --git a/src/JT1078.DotNetty.Core/Converters/JsonIPAddressConverter.cs b/src/JT1078.DotNetty.Core/Converters/JsonIPAddressConverter.cs new file mode 100644 index 0000000..46f1e9f --- /dev/null +++ b/src/JT1078.DotNetty.Core/Converters/JsonIPAddressConverter.cs @@ -0,0 +1,26 @@ +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT1078.DotNetty.Core.Converters +{ + public class JsonIPAddressConverter : JsonConverter + { + public override bool CanConvert(Type objectType) + { + return (objectType == typeof(IPAddress)); + } + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + writer.WriteValue(value.ToString()); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + return IPAddress.Parse((string)reader.Value); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Converters/JsonIPEndPointConverter.cs b/src/JT1078.DotNetty.Core/Converters/JsonIPEndPointConverter.cs new file mode 100644 index 0000000..178a0a1 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Converters/JsonIPEndPointConverter.cs @@ -0,0 +1,32 @@ +using Newtonsoft.Json; +using Newtonsoft.Json.Linq; +using System; +using System.Net; + +namespace JT1078.DotNetty.Core.Converters +{ + public class JsonIPEndPointConverter: JsonConverter + { + public override bool CanConvert(Type objectType) + { + return (objectType == typeof(IPEndPoint)); + } + + public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer) + { + IPEndPoint ep = (IPEndPoint)value; + JObject jo = new JObject(); + jo.Add("Host", JToken.FromObject(ep.Address, serializer)); + jo.Add("Port", ep.Port); + jo.WriteTo(writer); + } + + public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer) + { + JObject jo = JObject.Load(reader); + IPAddress address = jo["Host"].ToObject(serializer); + int port = (int)jo["Port"]; + return new IPEndPoint(address, port); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Enums/JT1078TransportProtocolType.cs b/src/JT1078.DotNetty.Core/Enums/JT1078TransportProtocolType.cs new file mode 100644 index 0000000..7886520 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Enums/JT1078TransportProtocolType.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Enums +{ + /// + /// 传输协议类型 + /// + public enum JT1078TransportProtocolType + { + tcp=1, + udp = 2 + } +} diff --git a/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs b/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs new file mode 100644 index 0000000..a34b4f7 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Impl/JT1078BuilderDefault.cs @@ -0,0 +1,25 @@ +using System; +using System.Collections.Generic; +using System.Text; +using JT1078.DotNetty.Core.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT1078.DotNetty.Core.Impl +{ + sealed class JT1078BuilderDefault : IJT1078Builder + { + public IServiceCollection Services { get; } + + public JT1078BuilderDefault(IServiceCollection services) + { + Services = services; + } + + public IJT1078Builder Replace() where T : IJT1078SourcePackageDispatcher + { + Services.Replace(new ServiceDescriptor(typeof(IJT1078SourcePackageDispatcher), typeof(T), ServiceLifetime.Singleton)); + return this; + } + } +} diff --git a/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs b/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs new file mode 100644 index 0000000..1a6d353 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Impl/JT1078SourcePackageDispatcherDefault.cs @@ -0,0 +1,195 @@ +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.Core.Interfaces; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Timers; + +namespace JT1078.DotNetty.Core.Impl +{ + class JT1078SourcePackageDispatcherDefault : IJT1078SourcePackageDispatcher,IDisposable + { + private readonly ILogger logger; + private IOptionsMonitor optionsMonitor; + private ConcurrentDictionary channeldic = new ConcurrentDictionary(); + private Queue reconnectionQueue = new Queue(); + public JT1078SourcePackageDispatcherDefault(ILoggerFactory loggerFactory, + IOptionsMonitor optionsMonitor) + { + logger = loggerFactory.CreateLogger(); + this.optionsMonitor = optionsMonitor; + timer = new System.Timers.Timer(10000); + timer.Elapsed += new ElapsedEventHandler(timer_Elapsed); + timer.Start(); + InitialDispatcherClient(); + } + + private System.Timers.Timer timer; + + public Task SendAsync(byte[] data) + { + foreach (var item in channeldic) + { + try + { + if (item.Value.Connected) + { + item.Value.Client.Send(data); + } + else + { + logger.LogError($"{item}链接已关闭"); + channeldic.TryRemove(item.Key, out _); + reconnectionQueue.Enqueue(item.Key); + } + } + catch (Exception ex) + { + logger.LogError($"{item}发送数据出现异常:{ex}"); + reconnectionQueue.Enqueue(item.Key); + channeldic.TryRemove(item.Key, out _); + } + } + return Task.CompletedTask; + } + + public void InitialDispatcherClient() + { + Task.Run(async () => + { + optionsMonitor.OnChange(options => + { + List lastRemoteServers = new List(); + if (options.RemoteServerOptions.RemoteServers != null) + { + lastRemoteServers = options.RemoteServerOptions.RemoteServers; + } + DelRemoteServsers(lastRemoteServers); + AddRemoteServsers(lastRemoteServers); + }); + await InitRemoteServsers(); + }); + } + + void timer_Elapsed(object sender, ElapsedEventArgs e) + { + timer.Stop(); + Thread.CurrentThread.IsBackground = true; + var ip = reconnectionQueue.Dequeue(); + if (!string.IsNullOrEmpty(ip)) + { + AddRemoteServsers(new List() { ip }); + } + timer.Start(); + } + + /// + /// 初始化远程服务器 + /// + /// + /// + /// + private async Task InitRemoteServsers() + { + List remoteServers = new List(); + if (optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers != null) + { + remoteServers = optionsMonitor.CurrentValue.RemoteServerOptions.RemoteServers; + } + foreach (var item in remoteServers) + { + try + { + TcpClient client = new TcpClient(); + client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1]))); + if (client.Connected) + { + channeldic.TryAdd(item, client); + } + } + catch (Exception ex) + { + logger.LogError($"初始化配置链接远程服务端{item},链接异常:{ex}"); + } + } + await Task.CompletedTask; + } + + /// + /// 动态删除远程服务器 + /// + /// + private void DelRemoteServsers(List lastRemoteServers) + { + var delChannels = channeldic.Keys.Except(lastRemoteServers).ToList(); + foreach (var item in delChannels) + { + channeldic[item].Close(); + channeldic[item].Dispose(); + channeldic.TryRemove(item, out var client); + } + } + /// + /// 动态添加远程服务器 + /// + /// + /// + private void AddRemoteServsers(List lastRemoteServers) + { + var addChannels = lastRemoteServers.Except(channeldic.Keys).ToList(); + foreach (var item in addChannels) + { + try + { + TcpClient client = new TcpClient(); + client.Connect(new IPEndPoint(IPAddress.Parse(item.Split(':')[0]), int.Parse(item.Split(':')[1]))); + if (client.Connected) + { + channeldic.TryAdd(item, client); + } + else + { + reconnectionQueue.Enqueue(item); + } + } + catch (Exception ex) + { + logger.LogError($"变更配置后链接远程服务端{item},重连异常:{ex}"); + reconnectionQueue.Enqueue(item); + } + } + } + + public void Dispose() + { + timer.Stop(); + if (channeldic != null) + { + foreach (var item in channeldic) + { + try + { + if (item.Value.Connected) + { + item.Value.Close(); + item.Value.Dispose(); + } + } + catch (Exception) + { + + } + } + } + timer.Dispose(); + } + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs new file mode 100644 index 0000000..633d30a --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078Builder.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078Builder + { + IServiceCollection Services { get; } + + IJT1078Builder Replace() where T: IJT1078SourcePackageDispatcher; + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs new file mode 100644 index 0000000..3df9f76 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078SourcePackageDispatcher.cs @@ -0,0 +1,12 @@ +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Core.Interfaces +{ + /// + /// 源包分发器 + /// + public interface IJT1078SourcePackageDispatcher + { + Task SendAsync(byte[] data); + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpBuilder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpBuilder.cs new file mode 100644 index 0000000..321fdc0 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpBuilder.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078TcpBuilder + { + IJT1078Builder Instance { get;} + IJT1078Builder Builder(); + IJT1078TcpBuilder Replace() where T : IJT1078TcpMessageHandlers; + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpMessageHandlers.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpMessageHandlers.cs new file mode 100644 index 0000000..bf9badf --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078TcpMessageHandlers.cs @@ -0,0 +1,10 @@ +using JT1078.Protocol; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078TcpMessageHandlers + { + Task Processor(JT1078Package package); + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpBuilder.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpBuilder.cs new file mode 100644 index 0000000..2cded08 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpBuilder.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078UdpBuilder + { + IJT1078Builder Instance { get; } + IJT1078Builder Builder(); + IJT1078UdpBuilder Replace() where T : IJT1078UdpMessageHandlers; + } +} diff --git a/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpMessageHandlers.cs b/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpMessageHandlers.cs new file mode 100644 index 0000000..558b1d2 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Interfaces/IJT1078UdpMessageHandlers.cs @@ -0,0 +1,10 @@ +using JT1078.Protocol; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Core.Interfaces +{ + public interface IJT1078UdpMessageHandlers + { + Task Processor(JT1078Package package); + } +} diff --git a/src/JT1078.DotNetty.Core/JT1078.DotNetty.Core.csproj b/src/JT1078.DotNetty.Core/JT1078.DotNetty.Core.csproj new file mode 100644 index 0000000..dd57a78 --- /dev/null +++ b/src/JT1078.DotNetty.Core/JT1078.DotNetty.Core.csproj @@ -0,0 +1,20 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + + diff --git a/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs b/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs new file mode 100644 index 0000000..d416ec0 --- /dev/null +++ b/src/JT1078.DotNetty.Core/JT1078CoreDotnettyExtensions.cs @@ -0,0 +1,79 @@ +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.Core.Converters; +using JT1078.DotNetty.Core.Impl; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.DotNetty.Core.Services; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Options; +using Newtonsoft.Json; +using System; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT1078.DotNetty.Core.Test")] +[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp.Test")] +[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp.Test")] +[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp")] +[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp")] +namespace JT1078.DotNetty.Core +{ + public static class JT1078CoreDotnettyExtensions + { + static JT1078CoreDotnettyExtensions() + { + JsonConvert.DefaultSettings = new Func(() => + { + Newtonsoft.Json.JsonSerializerSettings settings = new Newtonsoft.Json.JsonSerializerSettings(); + //日期类型默认格式化处理 + settings.DateFormatHandling = Newtonsoft.Json.DateFormatHandling.MicrosoftDateFormat; + settings.DateFormatString = "yyyy-MM-dd HH:mm:ss"; + //空值处理 + settings.NullValueHandling = NullValueHandling.Ignore; + settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; + settings.Converters.Add(new JsonIPAddressConverter()); + settings.Converters.Add(new JsonIPEndPointConverter()); + return settings; + }); + } + + public static IJT1078Builder AddJT1078Core(this IServiceCollection serviceDescriptors, IConfiguration configuration, Newtonsoft.Json.JsonSerializerSettings settings=null) + { + if (settings != null) + { + JsonConvert.DefaultSettings = new Func(() => + { + settings.Converters.Add(new JsonIPAddressConverter()); + settings.Converters.Add(new JsonIPEndPointConverter()); + settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; + return settings; + }); + } + IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); + builder.Services.Configure(configuration.GetSection("JT1078Configuration")); + builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); + return builder; + } + + public static IJT1078Builder AddJT1078Core(this IServiceCollection serviceDescriptors, Action jt1078Options, Newtonsoft.Json.JsonSerializerSettings settings = null) + { + if (settings != null) + { + JsonConvert.DefaultSettings = new Func(() => + { + settings.Converters.Add(new JsonIPAddressConverter()); + settings.Converters.Add(new JsonIPEndPointConverter()); + settings.ReferenceLoopHandling = ReferenceLoopHandling.Ignore; + return settings; + }); + } + IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); + builder.Services.Configure(jt1078Options); + builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); + return builder; + } + } +} \ No newline at end of file diff --git a/src/JT1078.DotNetty.Core/Metadata/JT1078AtomicCounter.cs b/src/JT1078.DotNetty.Core/Metadata/JT1078AtomicCounter.cs new file mode 100644 index 0000000..b9258bd --- /dev/null +++ b/src/JT1078.DotNetty.Core/Metadata/JT1078AtomicCounter.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT1078.DotNetty.Core.Metadata +{ + /// + /// + /// + /// + internal class JT1078AtomicCounter + { + long counter = 0; + + public JT1078AtomicCounter(long initialCount = 0) + { + this.counter = initialCount; + } + + public void Reset() + { + Interlocked.Exchange(ref counter, 0); + } + + public long Increment() + { + return Interlocked.Increment(ref counter); + } + + public long Add(long len) + { + return Interlocked.Add(ref counter,len); + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); + } + + public long Count + { + get + { + return Interlocked.Read(ref counter); + } + } + } +} diff --git a/src/JT1078.DotNetty.Core/Metadata/JT1078TcpSession.cs b/src/JT1078.DotNetty.Core/Metadata/JT1078TcpSession.cs new file mode 100644 index 0000000..16b3b9c --- /dev/null +++ b/src/JT1078.DotNetty.Core/Metadata/JT1078TcpSession.cs @@ -0,0 +1,29 @@ +using DotNetty.Transport.Channels; +using System; + +namespace JT1078.DotNetty.Core.Metadata +{ + public class JT1078TcpSession + { + public JT1078TcpSession(IChannel channel, string terminalPhoneNo) + { + Channel = channel; + TerminalPhoneNo = terminalPhoneNo; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + } + + public JT1078TcpSession() { } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + + public IChannel Channel { get; set; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; set; } + } +} diff --git a/src/JT1078.DotNetty.Core/Metadata/JT1078UdpPackage.cs b/src/JT1078.DotNetty.Core/Metadata/JT1078UdpPackage.cs new file mode 100644 index 0000000..80e2a6d --- /dev/null +++ b/src/JT1078.DotNetty.Core/Metadata/JT1078UdpPackage.cs @@ -0,0 +1,20 @@ +using System; +using System.Collections.Generic; +using System.Net; +using System.Text; + +namespace JT1078.DotNetty.Core.Metadata +{ + public class JT1078UdpPackage + { + public JT1078UdpPackage(byte[] buffer, EndPoint sender) + { + Buffer = buffer; + Sender = sender; + } + + public byte[] Buffer { get; } + + public EndPoint Sender { get; } + } +} diff --git a/src/JT1078.DotNetty.Core/Metadata/JT1078UdpSession.cs b/src/JT1078.DotNetty.Core/Metadata/JT1078UdpSession.cs new file mode 100644 index 0000000..5c2dcad --- /dev/null +++ b/src/JT1078.DotNetty.Core/Metadata/JT1078UdpSession.cs @@ -0,0 +1,35 @@ +using DotNetty.Transport.Channels; +using System; +using System.Net; + +namespace JT1078.DotNetty.Core.Metadata +{ + public class JT1078UdpSession + { + public JT1078UdpSession(IChannel channel, + EndPoint sender, + string terminalPhoneNo) + { + Channel = channel; + TerminalPhoneNo = terminalPhoneNo; + StartTime = DateTime.Now; + LastActiveTime = DateTime.Now; + Sender = sender; + } + + public EndPoint Sender { get; set; } + + public JT1078UdpSession() { } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + + public IChannel Channel { get; set; } + + public DateTime LastActiveTime { get; set; } + + public DateTime StartTime { get; set; } + } +} diff --git a/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterService.cs b/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterService.cs new file mode 100644 index 0000000..1cc6df0 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterService.cs @@ -0,0 +1,52 @@ +using JT1078.DotNetty.Core.Metadata; + +namespace JT1078.DotNetty.Core.Services +{ + /// + /// 计数包服务 + /// + public class JT1078AtomicCounterService + { + private readonly JT1078AtomicCounter MsgSuccessCounter; + + private readonly JT1078AtomicCounter MsgFailCounter; + + public JT1078AtomicCounterService() + { + MsgSuccessCounter=new JT1078AtomicCounter(); + MsgFailCounter = new JT1078AtomicCounter(); + } + + public void Reset() + { + MsgSuccessCounter.Reset(); + MsgFailCounter.Reset(); + } + + public long MsgSuccessIncrement() + { + return MsgSuccessCounter.Increment(); + } + + public long MsgSuccessCount + { + get + { + return MsgSuccessCounter.Count; + } + } + + public long MsgFailIncrement() + { + return MsgFailCounter.Increment(); + } + + public long MsgFailCount + { + get + { + return MsgFailCounter.Count; + } + } + } +} diff --git a/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterServiceFactory.cs b/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterServiceFactory.cs new file mode 100644 index 0000000..6763f39 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Services/JT1078AtomicCounterServiceFactory.cs @@ -0,0 +1,30 @@ +using JT1078.DotNetty.Core.Enums; +using System; +using System.Collections.Concurrent; + +namespace JT1078.DotNetty.Core.Services +{ + public class JT1078AtomicCounterServiceFactory + { + private readonly ConcurrentDictionary cache; + + public JT1078AtomicCounterServiceFactory() + { + cache = new ConcurrentDictionary(); + } + + public JT1078AtomicCounterService Create(JT1078TransportProtocolType type) + { + if(cache.TryGetValue(type,out var service)) + { + return service; + } + else + { + var serviceNew = new JT1078AtomicCounterService(); + cache.TryAdd(type, serviceNew); + return serviceNew; + } + } + } +} diff --git a/src/JT1078.DotNetty.Core/Session/JT1078TcpSessionManager.cs b/src/JT1078.DotNetty.Core/Session/JT1078TcpSessionManager.cs new file mode 100644 index 0000000..94bce5e --- /dev/null +++ b/src/JT1078.DotNetty.Core/Session/JT1078TcpSessionManager.cs @@ -0,0 +1,100 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using DotNetty.Transport.Channels; +using JT1078.DotNetty.Core.Metadata; + +namespace JT1078.DotNetty.Core.Session +{ + /// + /// JT1078 Tcp会话管理 + /// + public class JT1078TcpSessionManager + { + private readonly ILogger logger; + + public JT1078TcpSessionManager( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public JT1078TcpSession GetSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) + return default; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + + public void TryAdd(string terminalPhoneNo,IChannel channel) + { + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078TcpSession oldSession)) + { + oldSession.LastActiveTime = DateTime.Now; + oldSession.Channel = channel; + SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); + } + else + { + JT1078TcpSession session = new JT1078TcpSession(channel, terminalPhoneNo); + if (SessionIdDict.TryAdd(terminalPhoneNo, session)) + { + + } + } + } + + public JT1078TcpSession RemoveSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) return default; + if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078TcpSession sessionRemove)) + { + logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + return sessionRemove; + } + else + { + return default; + } + } + + public void RemoveSessionByChannel(IChannel channel) + { + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); + if (terminalPhoneNos.Count > 0) + { + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT1078TcpSession sessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{nos} Channel Remove."); + } + } + + public IEnumerable GetAll() + { + return SessionIdDict.Select(s => s.Value).ToList(); + } + } +} + diff --git a/src/JT1078.DotNetty.Core/Session/JT1078UdpSessionManager.cs b/src/JT1078.DotNetty.Core/Session/JT1078UdpSessionManager.cs new file mode 100644 index 0000000..f6cacb4 --- /dev/null +++ b/src/JT1078.DotNetty.Core/Session/JT1078UdpSessionManager.cs @@ -0,0 +1,116 @@ +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using DotNetty.Transport.Channels; +using Microsoft.Extensions.Options; +using System.Net; +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.Core.Metadata; + +namespace JT1078.DotNetty.Core.Session +{ + /// + /// JT1078 udp会话管理 + /// 估计要轮询下 + /// + public class JT1078UdpSessionManager + { + private readonly ILogger logger; + + public JT1078UdpSessionManager( + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + private ConcurrentDictionary SessionIdDict = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + + public int SessionCount + { + get + { + return SessionIdDict.Count; + } + } + + public JT1078UdpSession GetSession(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) + return default; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession targetSession)) + { + return targetSession; + } + else + { + return default; + } + } + + public void TryAdd(IChannel channel,EndPoint sender,string terminalPhoneNo) + { + //1.先判断是否在缓存里面 + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession UdpSession)) + { + UdpSession.LastActiveTime=DateTime.Now; + UdpSession.Sender = sender; + UdpSession.Channel = channel; + SessionIdDict.TryUpdate(terminalPhoneNo, UdpSession, UdpSession); + } + else + { + SessionIdDict.TryAdd(terminalPhoneNo, new JT1078UdpSession(channel, sender, terminalPhoneNo)); + } + } + + public void Heartbeat(string terminalPhoneNo) + { + if (string.IsNullOrEmpty(terminalPhoneNo)) return; + if (SessionIdDict.TryGetValue(terminalPhoneNo, out JT1078UdpSession oldSession)) + { + oldSession.LastActiveTime = DateTime.Now; + SessionIdDict.TryUpdate(terminalPhoneNo, oldSession, oldSession); + } + } + + public JT1078UdpSession RemoveSession(string terminalPhoneNo) + { + //设备离线可以进行通知 + //使用Redis 发布订阅 + if (string.IsNullOrEmpty(terminalPhoneNo)) return default; + if (SessionIdDict.TryRemove(terminalPhoneNo, out JT1078UdpSession SessionRemove)) + { + logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + return SessionRemove; + } + else + { + return default; + } + } + + public void RemoveSessionByChannel(IChannel channel) + { + //设备离线可以进行通知 + //使用Redis 发布订阅 + var terminalPhoneNos = SessionIdDict.Where(w => w.Value.Channel.Id == channel.Id).Select(s => s.Key).ToList(); + if (terminalPhoneNos.Count > 0) + { + foreach (var key in terminalPhoneNos) + { + SessionIdDict.TryRemove(key, out JT1078UdpSession SessionRemove); + } + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{nos} Channel Remove."); + } + } + + public IEnumerable GetAll() + { + return SessionIdDict.Select(s => s.Value).ToList(); + } + } +} + diff --git a/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpConnectionHandler.cs b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpConnectionHandler.cs new file mode 100644 index 0000000..fd14847 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpConnectionHandler.cs @@ -0,0 +1,99 @@ +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Channels; +using JT1078.DotNetty.Core.Session; +using Microsoft.Extensions.Logging; +using System; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Tcp.Handlers +{ + /// + /// JT1078服务通道处理程序 + /// + internal class JT1078TcpConnectionHandler : ChannelHandlerAdapter + { + private readonly ILogger logger; + + private readonly JT1078TcpSessionManager SessionManager; + + public JT1078TcpConnectionHandler( + JT1078TcpSessionManager sessionManager, + ILoggerFactory loggerFactory) + { + this.SessionManager = sessionManager; + logger = loggerFactory.CreateLogger(); + } + + /// + /// 通道激活 + /// + /// + public override void ChannelActive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } Successful client connection to server."); + base.ChannelActive(context); + } + + /// + /// 设备主动断开 + /// + /// + public override void ChannelInactive(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($">>>{ channelId } The client disconnects from the server."); + SessionManager.RemoveSessionByChannel(context.Channel); + base.ChannelInactive(context); + } + + /// + /// 服务器主动断开 + /// + /// + /// + public override Task CloseAsync(IChannelHandlerContext context) + { + string channelId = context.Channel.Id.AsShortText(); + if (logger.IsEnabled(LogLevel.Debug)) + logger.LogDebug($"<<<{ channelId } The server disconnects from the client."); + SessionManager.RemoveSessionByChannel(context.Channel); + return base.CloseAsync(context); + } + + public override void ChannelReadComplete(IChannelHandlerContext context)=> context.Flush(); + + /// + /// 超时策略 + /// + /// + /// + public override void UserEventTriggered(IChannelHandlerContext context, object evt) + { + IdleStateEvent idleStateEvent = evt as IdleStateEvent; + if (idleStateEvent != null) + { + if(idleStateEvent.State== IdleState.ReaderIdle) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}"); + // 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。 + SessionManager.RemoveSessionByChannel(context.Channel); + context.CloseAsync(); + } + } + base.UserEventTriggered(context, evt); + } + + public override void ExceptionCaught(IChannelHandlerContext context, Exception exception) + { + string channelId = context.Channel.Id.AsShortText(); + logger.LogError(exception,$"{channelId} {exception.Message}" ); + SessionManager.RemoveSessionByChannel(context.Channel); + context.CloseAsync(); + } + } +} + diff --git a/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs new file mode 100644 index 0000000..1502807 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpMessageProcessorEmptyImpl.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.Protocol; + +namespace JT1078.DotNetty.Tcp.Handlers +{ + class JT1078TcpMessageProcessorEmptyImpl : IJT1078TcpMessageHandlers + { + public Task Processor(JT1078Package package) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs new file mode 100644 index 0000000..0a9f3db --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/Handlers/JT1078TcpServerHandler.cs @@ -0,0 +1,73 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using System; +using Microsoft.Extensions.Logging; +using JT1078.DotNetty.Core.Enums; +using JT1078.DotNetty.Core.Services; +using JT1078.Protocol; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.DotNetty.Core.Session; + +namespace JT1078.DotNetty.Tcp.Handlers +{ + /// + /// JT1078 服务端处理程序 + /// + internal class JT1078TcpServerHandler : SimpleChannelInboundHandler + { + private readonly JT1078TcpSessionManager SessionManager; + + private readonly JT1078AtomicCounterService AtomicCounterService; + + private readonly ILogger logger; + + private readonly IJT1078TcpMessageHandlers handlers; + + private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher; + + public JT1078TcpServerHandler( + IJT1078SourcePackageDispatcher sourcePackageDispatcher, + IJT1078TcpMessageHandlers handlers, + ILoggerFactory loggerFactory, + JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, + JT1078TcpSessionManager sessionManager) + { + this.sourcePackageDispatcher = sourcePackageDispatcher; + this.handlers = handlers; + this.SessionManager = sessionManager; + this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.tcp); + logger = loggerFactory.CreateLogger(); + } + + + protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) + { + try + { + sourcePackageDispatcher.SendAsync(msg); + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); + logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg)); + } + JT1078Package package = JT1078Serializer.Deserialize(msg); + AtomicCounterService.MsgSuccessIncrement(); + SessionManager.TryAdd(package.SIM, ctx.Channel); + handlers.Processor(package); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); + } + } + catch (Exception ex) + { + AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg)); + } + } + } + } +} diff --git a/src/JT1078.DotNetty.Tcp/JT1078.DotNetty.Tcp.csproj b/src/JT1078.DotNetty.Tcp/JT1078.DotNetty.Tcp.csproj new file mode 100644 index 0000000..7c229d6 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/JT1078.DotNetty.Tcp.csproj @@ -0,0 +1,22 @@ + + + + netstandard2.0 + + + + + + + + + + + + + + + + + + diff --git a/src/JT1078.DotNetty.Tcp/JT1078TcpBuilderDefault.cs b/src/JT1078.DotNetty.Tcp/JT1078TcpBuilderDefault.cs new file mode 100644 index 0000000..5e6f279 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/JT1078TcpBuilderDefault.cs @@ -0,0 +1,30 @@ +using JT1078.DotNetty.Core.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Tcp +{ + class JT1078TcpBuilderDefault : IJT1078TcpBuilder + { + public IJT1078Builder Instance { get; } + + public JT1078TcpBuilderDefault(IJT1078Builder builder) + { + Instance = builder; + } + + public IJT1078Builder Builder() + { + return Instance; + } + + public IJT1078TcpBuilder Replace() where T : IJT1078TcpMessageHandlers + { + Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078TcpMessageHandlers), typeof(T), ServiceLifetime.Singleton)); + return this; + } + } +} diff --git a/src/JT1078.DotNetty.Tcp/JT1078TcpDotnettyExtensions.cs b/src/JT1078.DotNetty.Tcp/JT1078TcpDotnettyExtensions.cs new file mode 100644 index 0000000..11faf55 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/JT1078TcpDotnettyExtensions.cs @@ -0,0 +1,26 @@ +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.DotNetty.Core.Session; +using JT1078.DotNetty.Tcp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT1078.DotNetty.Tcp.Test")] + +namespace JT1078.DotNetty.Tcp +{ + public static class JT1078TcpDotnettyExtensions + { + public static IJT1078TcpBuilder AddJT1078TcpHost(this IJT1078Builder builder) + { + builder.Services.TryAddSingleton(); + builder.Services.TryAddScoped(); + builder.Services.TryAddScoped(); + builder.Services.TryAddSingleton(); + builder.Services.TryAddScoped(); + builder.Services.AddHostedService(); + return new JT1078TcpBuilderDefault(builder); + } + } +} \ No newline at end of file diff --git a/src/JT1078.DotNetty.Tcp/JT1078TcpServerHost.cs b/src/JT1078.DotNetty.Tcp/JT1078TcpServerHost.cs new file mode 100644 index 0000000..1a66e98 --- /dev/null +++ b/src/JT1078.DotNetty.Tcp/JT1078TcpServerHost.cs @@ -0,0 +1,94 @@ +using DotNetty.Buffers; +using DotNetty.Codecs; +using DotNetty.Handlers.Timeout; +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Libuv; +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.Tcp.Handlers; +using JT1078.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Tcp +{ + /// + /// JT1078 Tcp网关服务 + /// + internal class JT1078TcpServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT1078Configuration configuration; + private readonly ILogger logger; + private DispatcherEventLoopGroup bossGroup; + private WorkerEventLoopGroup workerGroup; + private IChannel bootstrapChannel; + private IByteBufferAllocator serverBufferAllocator; + + public JT1078TcpServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions configurationAccessor) + { + serviceProvider = provider; + configuration = configurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + bossGroup = new DispatcherEventLoopGroup(); + workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount); + serverBufferAllocator = new PooledByteBufferAllocator(); + ServerBootstrap bootstrap = new ServerBootstrap(); + bootstrap.Group(bossGroup, workerGroup); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true) + .ChildOption(ChannelOption.SoReuseaddr, true); + } + bootstrap + .Option(ChannelOption.SoBacklog, configuration.SoBacklog) + .ChildOption(ChannelOption.Allocator, serverBufferAllocator) + .ChildHandler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + channel.Pipeline.AddLast("JT1078TcpBuffer", new DelimiterBasedFrameDecoder(int.MaxValue,true, + Unpooled.CopiedBuffer(JT1078Package.FH_Bytes))); + channel.Pipeline.AddLast("JT1078TcpDecode", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("JT1078SystemIdleState", new IdleStateHandler( + configuration.ReaderIdleTimeSeconds, + configuration.WriterIdleTimeSeconds, + configuration.AllIdleTimeSeconds)); + channel.Pipeline.AddLast("JT1078TcpConnection", scope.ServiceProvider.GetRequiredService()); + channel.Pipeline.AddLast("JT1078TcpService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT1078 TCP Server start at {IPAddress.Any}:{configuration.TcpPort}."); + return bootstrap.BindAsync(configuration.TcpPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/Handlers/JT1078TcpMessageHandlers.cs b/src/JT1078.DotNetty.TestHosting/Handlers/JT1078TcpMessageHandlers.cs new file mode 100644 index 0000000..47faf3b --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/Handlers/JT1078TcpMessageHandlers.cs @@ -0,0 +1,27 @@ +using JT1078.DotNetty.Core.Interfaces; +using JT1078.Protocol; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.TestHosting.Handlers +{ + public class JT1078TcpMessageHandlers : IJT1078TcpMessageHandlers + { + private readonly ILogger logger; + + public JT1078TcpMessageHandlers(ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + public Task Processor(JT1078Package package) + { + logger.LogDebug(JsonConvert.SerializeObject(package)); + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/Handlers/JT1078UdpMessageHandlers.cs b/src/JT1078.DotNetty.TestHosting/Handlers/JT1078UdpMessageHandlers.cs new file mode 100644 index 0000000..3c0aefa --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/Handlers/JT1078UdpMessageHandlers.cs @@ -0,0 +1,24 @@ +using JT1078.DotNetty.Core.Interfaces; +using JT1078.Protocol; +using Microsoft.Extensions.Logging; +using Newtonsoft.Json; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.TestHosting.Handlers +{ + public class JT1078UdpMessageHandlers : IJT1078UdpMessageHandlers + { + private readonly ILogger logger; + + public JT1078UdpMessageHandlers(ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + public Task Processor(JT1078Package package) + { + logger.LogDebug(JsonConvert.SerializeObject(package)); + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj new file mode 100644 index 0000000..071e711 --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/JT1078.DotNetty.TestHosting.csproj @@ -0,0 +1,27 @@ + + + + Exe + netcoreapp2.2 + + + + + + + + + + + + + + + + + + Always + + + + diff --git a/src/JT1078.DotNetty.TestHosting/Program.cs b/src/JT1078.DotNetty.TestHosting/Program.cs new file mode 100644 index 0000000..3514ef0 --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/Program.cs @@ -0,0 +1,51 @@ +using JT1078.DotNetty.Core; +using JT1078.DotNetty.Tcp; +using JT1078.DotNetty.TestHosting.Handlers; +using JT1078.DotNetty.Udp; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Diagnostics; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.TestHosting +{ + class Program + { + static async Task Main(string[] args) + { + //3031636481E2108801123456781001100000016BB392CA7C02800028002E0000000161E1A2BF0098CFC0EE1E17283407788E39A403FDDBD1D546BFB063013F59AC34C97A021AB96A28A42C08 + var serverHostBuilder = new HostBuilder() + .ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); + }) + .ConfigureLogging((context, logging) => + { + logging.SetMinimumLevel(LogLevel.Trace); + logging.AddConsole(); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + services.AddJT1078Core(hostContext.Configuration) + .AddJT1078TcpHost() + .Replace() + .Builder() + .AddJT1078UdpHost() + .Replace() + .Builder(); + }); + + await serverHostBuilder.RunConsoleAsync(); + } + } +} diff --git a/src/JT1078.DotNetty.TestHosting/appsettings.json b/src/JT1078.DotNetty.TestHosting/appsettings.json new file mode 100644 index 0000000..8a684ec --- /dev/null +++ b/src/JT1078.DotNetty.TestHosting/appsettings.json @@ -0,0 +1,22 @@ +{ + "Logging": { + "IncludeScopes": false, + "Debug": { + "LogLevel": { + "Default": "Trace" + } + }, + "Console": { + "LogLevel": { + "Default": "Trace" + } + } + }, + "JT1078Configuration": { + "TcpPort": 1808, + "UdpPort": 1808, + "RemoteServerOptions": { + "RemoteServers": ["172.16.19.209:16868"] + } + } +} diff --git a/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs new file mode 100644 index 0000000..9982724 --- /dev/null +++ b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpMessageProcessorEmptyImpl.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.Protocol; + +namespace JT1078.DotNetty.Udp.Handlers +{ + class JT1078UdpMessageProcessorEmptyImpl : IJT1078UdpMessageHandlers + { + public Task Processor(JT1078Package package) + { + return Task.CompletedTask; + } + } +} diff --git a/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs new file mode 100644 index 0000000..1908df7 --- /dev/null +++ b/src/JT1078.DotNetty.Udp/Handlers/JT1078UdpServerHandler.cs @@ -0,0 +1,75 @@ +using DotNetty.Buffers; +using DotNetty.Transport.Channels; +using System; +using Microsoft.Extensions.Logging; +using JT1078.DotNetty.Core.Metadata; +using JT1078.DotNetty.Core.Session; +using JT1078.DotNetty.Core.Services; +using JT1078.DotNetty.Core.Enums; +using JT1078.Protocol; +using JT1078.DotNetty.Core.Interfaces; + +namespace JT1078.DotNetty.Udp.Handlers +{ + /// + /// JT1078 Udp服务端处理程序 + /// + internal class JT1078UdpServerHandler : SimpleChannelInboundHandler + { + private readonly ILogger logger; + + private readonly JT1078UdpSessionManager SessionManager; + + private readonly JT1078AtomicCounterService AtomicCounterService; + + private readonly IJT1078UdpMessageHandlers handlers; + + private readonly IJT1078SourcePackageDispatcher sourcePackageDispatcher; + public JT1078UdpServerHandler( + IJT1078SourcePackageDispatcher sourcePackageDispatcher, + ILoggerFactory loggerFactory, + JT1078AtomicCounterServiceFactory atomicCounterServiceFactory, + IJT1078UdpMessageHandlers handlers, + JT1078UdpSessionManager sessionManager) + { + this.sourcePackageDispatcher = sourcePackageDispatcher; + this.AtomicCounterService = atomicCounterServiceFactory.Create(JT1078TransportProtocolType.udp); + this.SessionManager = sessionManager; + logger = loggerFactory.CreateLogger(); + this.handlers = handlers; + } + + protected override void ChannelRead0(IChannelHandlerContext ctx, JT1078UdpPackage msg) + { + try + { + sourcePackageDispatcher.SendAsync(msg.Buffer); + if (logger.IsEnabled(LogLevel.Trace)) + { + logger.LogTrace("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); + logger.LogTrace("accept msg <<< " + ByteBufferUtil.HexDump(msg.Buffer)); + } + JT1078Package package = JT1078Serializer.Deserialize(msg.Buffer); + AtomicCounterService.MsgSuccessIncrement(); + SessionManager.TryAdd(ctx.Channel, msg.Sender, package.SIM); + handlers.Processor(package); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug("accept package success count<<<" + AtomicCounterService.MsgSuccessCount.ToString()); + } + } + catch (Exception ex) + { + AtomicCounterService.MsgFailIncrement(); + if (logger.IsEnabled(LogLevel.Error)) + { + logger.LogError("accept package fail count<<<" + AtomicCounterService.MsgFailCount.ToString()); + logger.LogError(ex, "accept msg<<<" + ByteBufferUtil.HexDump(msg.Buffer)); + } + } + } + + public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush(); + + } +} diff --git a/src/JT1078.DotNetty.Udp/JT1078.DotNetty.Udp.csproj b/src/JT1078.DotNetty.Udp/JT1078.DotNetty.Udp.csproj new file mode 100644 index 0000000..a91073d --- /dev/null +++ b/src/JT1078.DotNetty.Udp/JT1078.DotNetty.Udp.csproj @@ -0,0 +1,11 @@ + + + + netstandard2.0 + + + + + + + diff --git a/src/JT1078.DotNetty.Udp/JT1078UdpBuilderDefault.cs b/src/JT1078.DotNetty.Udp/JT1078UdpBuilderDefault.cs new file mode 100644 index 0000000..e6b30de --- /dev/null +++ b/src/JT1078.DotNetty.Udp/JT1078UdpBuilderDefault.cs @@ -0,0 +1,30 @@ +using JT1078.DotNetty.Core.Interfaces; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.DotNetty.Udp +{ + class JT1078UdpBuilderDefault : IJT1078UdpBuilder + { + public IJT1078Builder Instance { get; } + + public JT1078UdpBuilderDefault(IJT1078Builder builder) + { + Instance = builder; + } + + public IJT1078Builder Builder() + { + return Instance; + } + + public IJT1078UdpBuilder Replace() where T : IJT1078UdpMessageHandlers + { + Instance.Services.Replace(new ServiceDescriptor(typeof(IJT1078UdpMessageHandlers), typeof(T), ServiceLifetime.Singleton)); + return this; + } + } +} diff --git a/src/JT1078.DotNetty.Udp/JT1078UdpDotnettyExtensions.cs b/src/JT1078.DotNetty.Udp/JT1078UdpDotnettyExtensions.cs new file mode 100644 index 0000000..4c589ae --- /dev/null +++ b/src/JT1078.DotNetty.Udp/JT1078UdpDotnettyExtensions.cs @@ -0,0 +1,26 @@ +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Interfaces; +using JT1078.DotNetty.Core.Session; +using JT1078.DotNetty.Udp; +using JT1078.DotNetty.Udp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT1078.DotNetty.Udp.Test")] + +namespace JT1078.DotNetty.Udp +{ + public static class JT1078UdpDotnettyExtensions + { + public static IJT1078UdpBuilder AddJT1078UdpHost(this IJT1078Builder builder) + { + builder.Services.TryAddSingleton(); + builder.Services.TryAddSingleton(); + builder.Services.TryAddScoped(); + builder.Services.TryAddScoped(); + builder.Services.AddHostedService(); + return new JT1078UdpBuilderDefault(builder); + } + } +} \ No newline at end of file diff --git a/src/JT1078.DotNetty.Udp/JT1078UdpServerHost.cs b/src/JT1078.DotNetty.Udp/JT1078UdpServerHost.cs new file mode 100644 index 0000000..2c4fa2b --- /dev/null +++ b/src/JT1078.DotNetty.Udp/JT1078UdpServerHost.cs @@ -0,0 +1,76 @@ +using DotNetty.Transport.Bootstrapping; +using DotNetty.Transport.Channels; +using DotNetty.Transport.Channels.Sockets; +using JT1078.DotNetty.Core.Codecs; +using JT1078.DotNetty.Core.Configurations; +using JT1078.DotNetty.Udp.Handlers; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Net; +using System.Runtime.InteropServices; +using System.Threading; +using System.Threading.Tasks; + +namespace JT1078.DotNetty.Udp +{ + /// + /// JT1078 Udp网关服务 + /// + internal class JT1078UdpServerHost : IHostedService + { + private readonly IServiceProvider serviceProvider; + private readonly JT1078Configuration configuration; + private readonly ILogger logger; + private MultithreadEventLoopGroup group; + private IChannel bootstrapChannel; + + public JT1078UdpServerHost( + IServiceProvider provider, + ILoggerFactory loggerFactory, + IOptions jT808ConfigurationAccessor) + { + serviceProvider = provider; + configuration = jT808ConfigurationAccessor.Value; + logger=loggerFactory.CreateLogger(); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + group = new MultithreadEventLoopGroup(); + Bootstrap bootstrap = new Bootstrap(); + bootstrap.Group(group); + bootstrap.Channel(); + if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux) + || RuntimeInformation.IsOSPlatform(OSPlatform.OSX)) + { + bootstrap + .Option(ChannelOption.SoReuseport, true); + } + bootstrap + .Option(ChannelOption.SoBroadcast, true) + .Handler(new ActionChannelInitializer(channel => + { + IChannelPipeline pipeline = channel.Pipeline; + using (var scope = serviceProvider.CreateScope()) + { + pipeline.AddLast("JT1078UdpDecoder", scope.ServiceProvider.GetRequiredService()); + pipeline.AddLast("JT1078UdpService", scope.ServiceProvider.GetRequiredService()); + } + })); + logger.LogInformation($"JT1078 Udp Server start at {IPAddress.Any}:{configuration.UdpPort}."); + return bootstrap.BindAsync(configuration.UdpPort) + .ContinueWith(i => bootstrapChannel = i.Result); + } + + public async Task StopAsync(CancellationToken cancellationToken) + { + await bootstrapChannel.CloseAsync(); + var quietPeriod = configuration.QuietPeriodTimeSpan; + var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan; + await group.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout); + } + } +} diff --git a/src/JT1078DotNetty.sln b/src/JT1078DotNetty.sln new file mode 100644 index 0000000..3ea800e --- /dev/null +++ b/src/JT1078DotNetty.sln @@ -0,0 +1,43 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29009.5 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Core", "JT1078.DotNetty.Core\JT1078.DotNetty.Core.csproj", "{71D35280-24BE-4BF1-AFA4-07DFBC696FA5}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Tcp", "JT1078.DotNetty.Tcp\JT1078.DotNetty.Tcp.csproj", "{C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.TestHosting", "JT1078.DotNetty.TestHosting\JT1078.DotNetty.TestHosting.csproj", "{DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.DotNetty.Udp", "JT1078.DotNetty.Udp\JT1078.DotNetty.Udp.csproj", "{6405D7FA-3B6E-4545-827E-BA13EB5BB268}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Debug|Any CPU.Build.0 = Debug|Any CPU + {71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Release|Any CPU.ActiveCfg = Release|Any CPU + {71D35280-24BE-4BF1-AFA4-07DFBC696FA5}.Release|Any CPU.Build.0 = Release|Any CPU + {C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C4B4BBEB-4597-469F-B99A-A9F380DDB8FB}.Release|Any CPU.Build.0 = Release|Any CPU + {DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DCCA6BB7-C2B5-490A-B43F-C28ABEA922D7}.Release|Any CPU.Build.0 = Release|Any CPU + {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6405D7FA-3B6E-4545-827E-BA13EB5BB268}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {07ED058A-2CEB-43FD-8478-7EEC7E56F868} + EndGlobalSection +EndGlobal