diff --git a/README.md b/README.md index aab504a..498d2c9 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -# JT1078DotNetty +# JT1078.Gateway ## 前提条件 diff --git a/src/JT1078.Gateway/Enums/JT1078TransportProtocolType.cs b/src/JT1078.Gateway.Abstractions/Enums/JT1078TransportProtocolType.cs similarity index 77% rename from src/JT1078.Gateway/Enums/JT1078TransportProtocolType.cs rename to src/JT1078.Gateway.Abstractions/Enums/JT1078TransportProtocolType.cs index ac581fa..6462b65 100644 --- a/src/JT1078.Gateway/Enums/JT1078TransportProtocolType.cs +++ b/src/JT1078.Gateway.Abstractions/Enums/JT1078TransportProtocolType.cs @@ -2,14 +2,14 @@ using System.Collections.Generic; using System.Text; -namespace JT1078.Gateway.Enums +namespace JT1078.Gateway.Abstractions.Enums { /// /// 传输协议类型 /// public enum JT1078TransportProtocolType { - tcp=1, + tcp = 1, udp = 2 } } diff --git a/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs b/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs new file mode 100644 index 0000000..8f6c35f --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Abstractions.Enums +{ + public enum JT1078UseType : byte + { + /// + /// 使用正常方式 + /// + Normal = 1, + /// + /// 使用队列方式 + /// + Queue = 2 + } +} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078Builder.cs b/src/JT1078.Gateway.Abstractions/IJT1078Builder.cs similarity index 85% rename from src/JT1078.Gateway/Interfaces/IJT1078Builder.cs rename to src/JT1078.Gateway.Abstractions/IJT1078Builder.cs index 720fd49..8c90b8b 100644 --- a/src/JT1078.Gateway/Interfaces/IJT1078Builder.cs +++ b/src/JT1078.Gateway.Abstractions/IJT1078Builder.cs @@ -3,7 +3,7 @@ using System; using System.Collections.Generic; using System.Text; -namespace JT1078.Gateway.Interfaces +namespace JT1078.Gateway.Abstractions { public interface IJT1078Builder { diff --git a/src/JT1078.Gateway.Abstractions/IJT1078GatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078GatewayBuilder.cs new file mode 100644 index 0000000..ffdb120 --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078GatewayBuilder.cs @@ -0,0 +1,14 @@ +using JT1078.Protocol; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078GatewayBuilder + { + IJT1078Builder JT1078Builder { get; } + IJT1078Builder Builder(); + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs new file mode 100644 index 0000000..53a391f --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078MsgConsumer : IJT1078PubSub, IDisposable + { + void OnMessage(Action<(string TerminalNo, byte[] Data)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs new file mode 100644 index 0000000..30a557e --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078MsgProducer : IJT1078PubSub, IDisposable + { + /// + /// + /// + /// 设备终端号 + /// jt1078 hex data + ValueTask ProduceAsync(string terminalNo, byte[] data); + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs new file mode 100644 index 0000000..fd574d1 --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078NormalGatewayBuilder: IJT1078GatewayBuilder + { + + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs new file mode 100644 index 0000000..7b50be2 --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs @@ -0,0 +1,16 @@ +using JT1078.Protocol; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078PackageConsumer : IJT1078PubSub, IDisposable + { + void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs new file mode 100644 index 0000000..8f6e1d6 --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs @@ -0,0 +1,18 @@ +using JT1078.Protocol; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078PackageProducer : IJT1078PubSub, IDisposable + { + /// + /// + /// + /// 设备终端号 + /// jt1078 package data + ValueTask ProduceAsync(string terminalNo, JT1078Package data); + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PubSub.cs b/src/JT1078.Gateway.Abstractions/IJT1078PubSub.cs new file mode 100644 index 0000000..fc4fb4a --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078PubSub.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078PubSub + { + string TopicName { get; } + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs new file mode 100644 index 0000000..84dc38e --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078QueueGatewayBuilder: IJT1078GatewayBuilder + { + + } +} diff --git a/src/JT1078.Gateway.Abstractions/IJT1078Session.cs b/src/JT1078.Gateway.Abstractions/IJT1078Session.cs new file mode 100644 index 0000000..45d4588 --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/IJT1078Session.cs @@ -0,0 +1,26 @@ +using JT1078.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; + +namespace JT1078.Gateway.Abstractions +{ + public interface IJT1078Session + { + /// + /// 终端手机号 + /// + string TerminalPhoneNo { get; set; } + string SessionID { get; } + Socket Client { get; set; } + DateTime StartTime { get; set; } + DateTime ActiveTime { get; set; } + JT1078TransportProtocolType TransportProtocolType { get;} + CancellationTokenSource ReceiveTimeout { get; set; } + EndPoint RemoteEndPoint { get; set; } + void Close(); + } +} diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj new file mode 100644 index 0000000..d52d04b --- /dev/null +++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj @@ -0,0 +1,34 @@ + + + + netstandard2.1 + 8.0 + Copyright 2019. + SmallChi(Koike) + JT1078.Gateway.Abstractions + JT1078.Gateway.Abstractions + 基于Pipeline实现的JT1078Gateway的抽象库 + 基于Pipeline实现的JT1078Gateway的抽象库 + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + false + false + LICENSE + true + 1.0.0-preview1 + + + + + + + + + + + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj new file mode 100644 index 0000000..5b454b9 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/JT1078.Gateway.Test.csproj @@ -0,0 +1,27 @@ + + + + netcoreapp3.1 + + false + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs new file mode 100644 index 0000000..37a5957 --- /dev/null +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs @@ -0,0 +1,244 @@ +using System; +using System.Buffers; +using JT1078.Protocol.Extensions; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Xunit; +using JT1078.Protocol; +using System.Buffers.Binary; +using JT1078.Protocol.Enums; +using System.Linq; +using System.Text; + +namespace JT1078.Gateway.Test +{ + public class PipeTest + { + [Fact] + public void Test1() + { + var reader = new ReadOnlySequence("303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes()); + SequenceReader seqReader = new SequenceReader(reader); + long totalConsumed = 0; + List packages = new List(); + while (!seqReader.End) + { + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + Assert.Equal(JT1078Package.FH, headerValue); + if(JT1078Package.FH == headerValue) + { + //sim + var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + Assert.Equal("1901305037", sim); + //ʹӦݳ + seqReader.Advance(15); + if(seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + Assert.Equal(JT1078DataType.ƵI֡, label3.DataType); + int bodyLength = 0; + //͸ʱûиֶ + if (label3.DataType!= JT1078DataType.͸) + { + //ʱ + bodyLength += 8; + } + //Ƶ֡ʱûиֶ + if (label3.DataType == JT1078DataType.ƵI֡ || + label3.DataType == JT1078DataType.ƵP֡ || + label3.DataType == JT1078DataType.ƵB֡) + { + //һؼ֡ + һ֡ = 2 + 2 + bodyLength += 4; + } + seqReader.Advance(bodyLength); + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + //峤 + seqReader.Advance(2); + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + Assert.Equal(950, bodyLength); + // + seqReader.Advance(bodyLength); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + } + } + Assert.Single(packages); + } + + [Fact] + public void Test2() + { + var data1 = "303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes(); + var data2 = "303163648162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); + var reader = new ReadOnlySequence(data1.Concat(data2).ToArray()); + SequenceReader seqReader = new SequenceReader(reader); + long totalConsumed = 0; + List packages = new List(); + while (!seqReader.End) + { + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + Assert.Equal(JT1078Package.FH, headerValue); + if (JT1078Package.FH == headerValue) + { + //ʹӦݳ + seqReader.Advance(15); + if (seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + int bodyLength = 0; + //͸ʱûиֶ + if (label3.DataType != JT1078DataType.͸) + { + //ʱ + bodyLength += 8; + } + //Ƶ֡ʱûиֶ + if (label3.DataType == JT1078DataType.ƵI֡ || + label3.DataType == JT1078DataType.ƵP֡ || + label3.DataType == JT1078DataType.ƵB֡) + { + //һؼ֡ + һ֡ = 2 + 2 + bodyLength += 4; + } + seqReader.Advance(bodyLength); + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + //峤 + seqReader.Advance(2); + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + // + seqReader.Advance(bodyLength); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + } + } + Assert.Equal(2,packages.Count); + } + + [Fact] + public void Test3() + { + Assert.Throws(() => + { + var data1 = "3031636481".ToHexBytes(); + var reader = new ReadOnlySequence(data1); + SequenceReader seqReader = new SequenceReader(reader); + while (!seqReader.End) + { + if ((seqReader.Length - seqReader.Consumed)< 15) + { + throw new ArgumentException("not jt1078 package"); + } + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + Assert.Equal(JT1078Package.FH, headerValue); + if (JT1078Package.FH == headerValue) + { + //ʹӦݳ + seqReader.Advance(15); + if (seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + if (seqReader.End) break; + } + } + else + { + throw new ArgumentException("not jt1078 package"); + } + } + }); + } + + [Fact] + public void Test4() + { + Assert.Throws(() => + { + var data1 = "303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes(); + //var data2 = "303163648162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); + var data2 = "8162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); + var reader = new ReadOnlySequence(data1.Concat(data2).ToArray()); + SequenceReader seqReader = new SequenceReader(reader); + long totalConsumed = 0; + List packages = new List(); + while (!seqReader.End) + { + if ((seqReader.Length - seqReader.Consumed) < 15) + { + throw new ArgumentException("not jt1078 package"); + } + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + if (JT1078Package.FH == headerValue) + { + //ʹӦݳ + seqReader.Advance(15); + if (seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + int bodyLength = 0; + //͸ʱûиֶ + if (label3.DataType != JT1078DataType.͸) + { + //ʱ + bodyLength += 8; + } + //Ƶ֡ʱûиֶ + if (label3.DataType == JT1078DataType.ƵI֡ || + label3.DataType == JT1078DataType.ƵP֡ || + label3.DataType == JT1078DataType.ƵB֡) + { + //һؼ֡ + һ֡ = 2 + 2 + bodyLength += 4; + } + seqReader.Advance(bodyLength); + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + //峤 + seqReader.Advance(2); + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + // + seqReader.Advance(bodyLength); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + } + else + { + throw new ArgumentException("not jt1078 package"); + } + } + }); + } + + public string ReadBCD(ReadOnlySpan readOnlySpan,int len) + { + int count = len / 2; + StringBuilder bcdSb = new StringBuilder(count); + for (int i = 0; i < count; i++) + { + bcdSb.Append(readOnlySpan[i].ToString("X2")); + } + return bcdSb.ToString().TrimStart('0'); + } + + [Fact] + public void Test5() + { + var empty = "000000000".TrimStart('0'); + Assert.Equal("", empty); + } + } +} diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index b6abe65..6d2021e 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -3,9 +3,13 @@ Microsoft Visual Studio Solution File, Format Version 12.00 # Visual Studio Version 16 VisualStudioVersion = 16.0.29418.71 MinimumVisualStudioVersion = 10.0.40219.1 -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.SimpleServer", "JT1078.Gateway.SimpleServer\JT1078.Gateway.SimpleServer.csproj", "{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}" +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -17,14 +21,21 @@ Global {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.Build.0 = Debug|Any CPU {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.ActiveCfg = Release|Any CPU {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.Build.0 = Release|Any CPU - {F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Debug|Any CPU.Build.0 = Debug|Any CPU - {F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Release|Any CPU.ActiveCfg = Release|Any CPU - {F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Release|Any CPU.Build.0 = Release|Any CPU + {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.Build.0 = Debug|Any CPU + {DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.ActiveCfg = Release|Any CPU + {DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.Build.0 = Release|Any CPU + {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU + {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU + {EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {DAC80A8E-4172-451F-910D-9032BF8640F9} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E} + EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {9172690A-1D5A-491A-ACDD-3AF893980E0B} EndGlobalSection diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs index 7c17fa4..2793fcd 100644 --- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs +++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs @@ -10,26 +10,25 @@ namespace JT1078.Gateway.Configurations public int TcpPort { get; set; } = 1078; public int UdpPort { get; set; } = 1078; public int HttpPort { get; set; } = 1079; - - public int QuietPeriodSeconds { get; set; } = 1; - - public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); - - public int ShutdownTimeoutSeconds { get; set; } = 3; - - public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); - public int SoBacklog { get; set; } = 8192; - - public int EventLoopCount { get; set; } = Environment.ProcessorCount; - - public int ReaderIdleTimeSeconds { get; set; } = 3600; - - public int WriterIdleTimeSeconds { get; set; } = 3600; - - public int AllIdleTimeSeconds { get; set; } = 3600; - - public JT1078RemoteServerOptions RemoteServerOptions { get; set; } + public int MiniNumBufferSize { get; set; } = 8096; + /// + /// Tcp读超时 + /// 默认10分钟检查一次 + /// + public int TcpReaderIdleTimeSeconds { get; set; } = 60 * 10; + /// + /// Tcp 60s检查一次 + /// + public int TcpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; + /// + /// Udp读超时 + /// + public int UdpReaderIdleTimeSeconds { get; set; } = 60; + /// + /// Udp 60s检查一次 + /// + public int UdpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; public JT1078Configuration Value => this; } diff --git a/src/JT1078.Gateway/Impl/JT1078BuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078BuilderDefault.cs index dd0fd0e..e9f9abf 100644 --- a/src/JT1078.Gateway/Impl/JT1078BuilderDefault.cs +++ b/src/JT1078.Gateway/Impl/JT1078BuilderDefault.cs @@ -1,9 +1,5 @@ -using System; -using System.Collections.Generic; -using System.Text; -using JT1078.Gateway.Interfaces; +using JT1078.Gateway.Abstractions; using Microsoft.Extensions.DependencyInjection; -using Microsoft.Extensions.DependencyInjection.Extensions; namespace JT1078.Gateway.Impl { diff --git a/src/JT1078.Gateway/Impl/JT1078GatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078GatewayBuilderDefault.cs new file mode 100644 index 0000000..29cb793 --- /dev/null +++ b/src/JT1078.Gateway/Impl/JT1078GatewayBuilderDefault.cs @@ -0,0 +1,20 @@ + +using JT1078.Gateway.Abstractions; + +namespace JT1078.Gateway.Impl +{ + public class JT1078GatewayBuilderDefault : IJT1078GatewayBuilder + { + public IJT1078Builder JT1078Builder { get; } + + public JT1078GatewayBuilderDefault(IJT1078Builder builder) + { + JT1078Builder = builder; + } + + public IJT1078Builder Builder() + { + return JT1078Builder; + } + } +} \ No newline at end of file diff --git a/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs new file mode 100644 index 0000000..e99363a --- /dev/null +++ b/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs @@ -0,0 +1,19 @@ +using JT1078.Gateway.Abstractions; + +namespace JT1078.Gateway.Impl +{ + public class JT1078NormalGatewayBuilderDefault : IJT1078NormalGatewayBuilder + { + public IJT1078Builder JT1078Builder { get; } + + public JT1078NormalGatewayBuilderDefault(IJT1078Builder builder) + { + JT1078Builder = builder; + } + + public IJT1078Builder Builder() + { + return JT1078Builder; + } + } +} \ No newline at end of file diff --git a/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs new file mode 100644 index 0000000..faf51c0 --- /dev/null +++ b/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs @@ -0,0 +1,19 @@ +using JT1078.Gateway.Abstractions; + +namespace JT1078.Gateway.Impl +{ + public class JT1078QueueGatewayBuilderDefault : IJT1078QueueGatewayBuilder + { + public IJT1078Builder JT1078Builder { get; } + + public JT1078QueueGatewayBuilderDefault(IJT1078Builder builder) + { + JT1078Builder = builder; + } + + public IJT1078Builder Builder() + { + return JT1078Builder; + } + } +} \ No newline at end of file diff --git a/src/JT1078.Gateway/Interfaces/IHttpMiddleware.cs b/src/JT1078.Gateway/Interfaces/IHttpMiddleware.cs deleted file mode 100644 index d1d0a33..0000000 --- a/src/JT1078.Gateway/Interfaces/IHttpMiddleware.cs +++ /dev/null @@ -1,14 +0,0 @@ -using DotNetty.Codecs.Http; -using DotNetty.Transport.Channels; -using System; -using System.Collections.Generic; -using System.Security.Principal; -using System.Text; - -namespace JT1078.Gateway.Interfaces -{ - public interface IHttpMiddleware - { - void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal); - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078Authorization.cs b/src/JT1078.Gateway/Interfaces/IJT1078Authorization.cs deleted file mode 100644 index b6b4d06..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078Authorization.cs +++ /dev/null @@ -1,13 +0,0 @@ -using DotNetty.Codecs.Http; -using System; -using System.Collections.Generic; -using System.Security.Principal; -using System.Text; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078Authorization - { - bool Authorization(IFullHttpRequest request, out IPrincipal principal); - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078HttpBuilder.cs b/src/JT1078.Gateway/Interfaces/IJT1078HttpBuilder.cs deleted file mode 100644 index 8eab080..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078HttpBuilder.cs +++ /dev/null @@ -1,15 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078HttpBuilder - { - IJT1078Builder Instance { get; } - IJT1078Builder Builder(); - IJT1078HttpBuilder Replace() where T : IJT1078Authorization; - IJT1078HttpBuilder UseHttpMiddleware() where T : IHttpMiddleware; - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078TcpBuilder.cs b/src/JT1078.Gateway/Interfaces/IJT1078TcpBuilder.cs deleted file mode 100644 index e7d3cd0..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078TcpBuilder.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078TcpBuilder - { - IJT1078Builder Instance { get;} - IJT1078Builder Builder(); - IJT1078TcpBuilder Replace() where T : IJT1078TcpMessageHandlers; - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078TcpMessageHandlers.cs b/src/JT1078.Gateway/Interfaces/IJT1078TcpMessageHandlers.cs deleted file mode 100644 index d18a3ff..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078TcpMessageHandlers.cs +++ /dev/null @@ -1,11 +0,0 @@ -using JT1078.Gateway.Metadata; -using JT1078.Protocol; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078TcpMessageHandlers - { - Task Processor(JT1078Request request); - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078UdpBuilder.cs b/src/JT1078.Gateway/Interfaces/IJT1078UdpBuilder.cs deleted file mode 100644 index 25839a3..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078UdpBuilder.cs +++ /dev/null @@ -1,14 +0,0 @@ -using Microsoft.Extensions.DependencyInjection; -using System; -using System.Collections.Generic; -using System.Text; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078UdpBuilder - { - IJT1078Builder Instance { get; } - IJT1078Builder Builder(); - IJT1078UdpBuilder Replace() where T : IJT1078UdpMessageHandlers; - } -} diff --git a/src/JT1078.Gateway/Interfaces/IJT1078UdpMessageHandlers.cs b/src/JT1078.Gateway/Interfaces/IJT1078UdpMessageHandlers.cs deleted file mode 100644 index 2e43cf1..0000000 --- a/src/JT1078.Gateway/Interfaces/IJT1078UdpMessageHandlers.cs +++ /dev/null @@ -1,11 +0,0 @@ -using JT1078.Gateway.Metadata; -using JT1078.Protocol; -using System.Threading.Tasks; - -namespace JT1078.Gateway.Interfaces -{ - public interface IJT1078UdpMessageHandlers - { - Task Processor(JT1078Request request); - } -} diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj index 4eded5d..e4bf481 100644 --- a/src/JT1078.Gateway/JT1078.Gateway.csproj +++ b/src/JT1078.Gateway/JT1078.Gateway.csproj @@ -1,38 +1,56 @@  - netstandard2.0 + netstandard2.1 8.0 Copyright 2019. SmallChi(Koike) JT1078.Gateway JT1078.Gateway - 基于DotNetty实现的JT1078DotNetty的库 - 基于DotNetty实现的JT1078DotNetty的库 - https://github.com/SmallChi/JT1078DotNetty - https://github.com/SmallChi/JT1078DotNetty - https://github.com/SmallChi/JT1078DotNetty/blob/master/LICENSE - https://github.com/SmallChi/JT1078DotNetty/blob/master/LICENSE + 基于Pipeline实现的JT1078Gateway库 + 基于Pipeline实现的JT1078Gateway库 + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE false false LICENSE true 1.0.0-preview1 + + + + + + + + + + + + + + + + + + + + + + + - - - - - - - - - - + + + + + diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index ab94a4c..c69709c 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -1,12 +1,10 @@ -using JT1078.Gateway.Configurations; +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Configurations; using JT1078.Gateway.Impl; -using JT1078.Gateway.Interfaces; -using JT1078.Gateway.Session.Services; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using Microsoft.Extensions.Options; -using Newtonsoft.Json; using System; using System.Runtime.CompilerServices; @@ -21,7 +19,6 @@ namespace JT1078.Gateway { IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure(configuration.GetSection("JT1078Configuration")); - builder.Services.TryAddSingleton(); return builder; } @@ -29,8 +26,6 @@ namespace JT1078.Gateway { IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); builder.Services.Configure(jt1078Options); - builder.Services.TryAddSingleton(); - builder.Services.TryAddSingleton(); return builder; } } diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs new file mode 100644 index 0000000..53f8d83 --- /dev/null +++ b/src/JT1078.Gateway/JT1078TcpServer.cs @@ -0,0 +1,298 @@ +using System; +using System.Buffers; +using System.Buffers.Binary; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Abstractions.Enums; +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using JT1078.Protocol; +using JT1078.Protocol.Enums; +using JT1078.Protocol.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; + +namespace JT1078.Gateway +{ + public class JT1078TcpServer : IHostedService + { + private Socket server; + + private readonly ILogger Logger; + + private readonly JT1078Configuration Configuration; + + private readonly JT1078SessionManager SessionManager; + + private readonly IJT1078PackageProducer jT1078PackageProducer; + + private readonly IJT1078MsgProducer jT1078MsgProducer; + + private readonly JT1078UseType jT1078UseType; + + /// + /// 使用正常方式 + /// + /// + /// + /// + /// + public JT1078TcpServer( + IJT1078PackageProducer jT1078PackageProducer, + IOptions jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager) + { + SessionManager = jT1078SessionManager; + jT1078UseType = JT1078UseType.Normal; + Logger = loggerFactory.CreateLogger("JT1078TcpServer"); + Configuration = jT1078ConfigurationAccessor.Value; + this.jT1078PackageProducer = jT1078PackageProducer; + InitServer(); + } + + /// + /// 使用队列方式 + /// + /// + /// + /// + /// + public JT1078TcpServer( + IJT1078MsgProducer jT1078MsgProducer, + IOptions jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager) + { + SessionManager = jT1078SessionManager; + jT1078UseType = JT1078UseType.Queue; + Logger = loggerFactory.CreateLogger("JT1078TcpServer"); + Configuration = jT1078ConfigurationAccessor.Value; + this.jT1078MsgProducer = jT1078MsgProducer; + InitServer(); + } + + private void InitServer() + { + var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort); + server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); + server.LingerState = new LingerOption(false, 0); + server.Bind(IPEndPoint); + server.Listen(Configuration.SoBacklog); + } + public Task StartAsync(CancellationToken cancellationToken) + { + Logger.LogInformation($"JT1078 Tcp Server start at {IPAddress.Any}:{Configuration.TcpPort}."); + Task.Factory.StartNew(async () => + { + while (!cancellationToken.IsCancellationRequested) + { + var socket = await server.AcceptAsync(); + JT1078TcpSession jT808TcpSession = new JT1078TcpSession(socket); + SessionManager.TryAdd(jT808TcpSession); + await Task.Factory.StartNew(async (state) => + { + var session = (JT1078TcpSession)state; + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); + } + var pipe = new Pipe(); + Task writing = FillPipeAsync(session, pipe.Writer); + Task reading = ReadPipeAsync(session, pipe.Reader); + await Task.WhenAll(reading, writing); + SessionManager.RemoveBySessionId(session.SessionID); + }, jT808TcpSession); + } + }, cancellationToken); + return Task.CompletedTask; + } + private async Task FillPipeAsync(JT1078TcpSession session, PipeWriter writer) + { + while (true) + { + try + { + Memory memory = writer.GetMemory(Configuration.MiniNumBufferSize); + //设备多久没发数据就断开连接 Receive Timeout. + int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token); + if (bytesRead == 0) + { + break; + } + writer.Advance(bytesRead); + } + catch (OperationCanceledException ex) + { + Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); + break; + } + catch (System.Net.Sockets.SocketException ex) + { + Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning restore CA1031 // Do not catch general exception types + FlushResult result = await writer.FlushAsync(); + if (result.IsCompleted) + { + break; + } + } + writer.Complete(); + } + private async Task ReadPipeAsync(JT1078TcpSession session, PipeReader reader) + { + while (true) + { + ReadResult result = await reader.ReadAsync(); + if (result.IsCompleted) + { + break; + } + ReadOnlySequence buffer = result.Buffer; + SequencePosition consumed = buffer.Start; + SequencePosition examined = buffer.End; + try + { + if (result.IsCanceled) break; + if (buffer.Length > 0) + { + ReaderBuffer(ref buffer, session, out consumed, out examined); + } + } +#pragma warning disable CA1031 // Do not catch general exception types + catch (Exception ex) + { + Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}"); + break; + } +#pragma warning restore CA1031 // Do not catch general exception types + finally + { + reader.AdvanceTo(consumed, examined); + } + } + reader.Complete(); + } + private void ReaderBuffer(ref ReadOnlySequence buffer, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) + { + consumed = buffer.Start; + examined = buffer.End; + if (buffer.Length < 15) + { + throw new ArgumentException("not JT1078 package"); + } + SequenceReader seqReader = new SequenceReader(buffer); + long totalConsumed = 0; + while (!seqReader.End) + { + if ((seqReader.Length - seqReader.Consumed) < 15) + { + throw new ArgumentException("not JT1078 package"); + } + var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); + var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); + if (JT1078Package.FH == headerValue) + { + //sim + var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); + //根据数据类型处理对应的数据长度 + seqReader.Advance(15); + if (seqReader.TryRead(out byte dataType)) + { + JT1078Label3 label3 = new JT1078Label3(dataType); + int bodyLength = 0; + //透传的时候没有该字段 + if (label3.DataType != JT1078DataType.透传数据) + { + //时间戳 + bodyLength += 8; + } + //非视频帧时没有该字段 + if (label3.DataType == JT1078DataType.视频I帧 || + label3.DataType == JT1078DataType.视频P帧 || + label3.DataType == JT1078DataType.视频B帧) + { + //上一个关键帧 + 上一帧 = 2 + 2 + bodyLength += 4; + } + seqReader.Advance(bodyLength); + var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; + //数据体长度 + seqReader.Advance(2); + bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); + //数据体 + seqReader.Advance(bodyLength); + if (string.IsNullOrEmpty(sim)) + { + sim = session.SessionID; + } + SessionManager.TryLink(sim, session); + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); + try + { + if (jT1078UseType== JT1078UseType.Queue) + { + jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); + } + else + { + jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); + } + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + } + } + } + if (seqReader.Length == totalConsumed) + { + examined = consumed = buffer.End; + } + else + { + consumed = buffer.GetPosition(totalConsumed); + } + } + public Task StopAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("JT1078 Tcp Server Stop"); + if (server?.Connected ?? false) + server.Shutdown(SocketShutdown.Both); + server?.Close(); + return Task.CompletedTask; + } + string ReadBCD(ReadOnlySpan readOnlySpan, int len) + { + int count = len / 2; + StringBuilder bcdSb = new StringBuilder(count); + for (int i = 0; i < count; i++) + { + bcdSb.Append(readOnlySpan[i].ToString("X2")); + } + return bcdSb.ToString().TrimStart('0'); + } + } +} diff --git a/src/JT1078.Gateway/Metadata/JT1078AtomicCounter.cs b/src/JT1078.Gateway/Metadata/JT1078AtomicCounter.cs deleted file mode 100644 index 5c2feb7..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078AtomicCounter.cs +++ /dev/null @@ -1,49 +0,0 @@ -using System; -using System.Collections.Generic; -using System.Text; -using System.Threading; - -namespace JT1078.Gateway.Metadata -{ - /// - /// - /// - /// - internal class JT1078AtomicCounter - { - long counter = 0; - - public JT1078AtomicCounter(long initialCount = 0) - { - this.counter = initialCount; - } - - public void Reset() - { - Interlocked.Exchange(ref counter, 0); - } - - public long Increment() - { - return Interlocked.Increment(ref counter); - } - - public long Add(long len) - { - return Interlocked.Add(ref counter,len); - } - - public long Decrement() - { - return Interlocked.Decrement(ref counter); - } - - public long Count - { - get - { - return Interlocked.Read(ref counter); - } - } - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078TcpSession.cs b/src/JT1078.Gateway/Metadata/JT1078TcpSession.cs deleted file mode 100644 index 38dfb41..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078TcpSession.cs +++ /dev/null @@ -1,29 +0,0 @@ -using DotNetty.Transport.Channels; -using System; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078TcpSession - { - public JT1078TcpSession(IChannel channel, string terminalPhoneNo) - { - Channel = channel; - TerminalPhoneNo = terminalPhoneNo; - StartTime = DateTime.Now; - LastActiveTime = DateTime.Now; - } - - public JT1078TcpSession() { } - - /// - /// 终端手机号 - /// - public string TerminalPhoneNo { get; set; } - - public IChannel Channel { get; set; } - - public DateTime LastActiveTime { get; set; } - - public DateTime StartTime { get; set; } - } -} diff --git a/src/JT1078.Gateway/Metadata/JT1078UdpSession.cs b/src/JT1078.Gateway/Metadata/JT1078UdpSession.cs deleted file mode 100644 index ab7da52..0000000 --- a/src/JT1078.Gateway/Metadata/JT1078UdpSession.cs +++ /dev/null @@ -1,35 +0,0 @@ -using DotNetty.Transport.Channels; -using System; -using System.Net; - -namespace JT1078.Gateway.Metadata -{ - public class JT1078UdpSession - { - public JT1078UdpSession(IChannel channel, - EndPoint sender, - string terminalPhoneNo) - { - Channel = channel; - TerminalPhoneNo = terminalPhoneNo; - StartTime = DateTime.Now; - LastActiveTime = DateTime.Now; - Sender = sender; - } - - public EndPoint Sender { get; set; } - - public JT1078UdpSession() { } - - /// - /// 终端手机号 - /// - public string TerminalPhoneNo { get; set; } - - public IChannel Channel { get; set; } - - public DateTime LastActiveTime { get; set; } - - public DateTime StartTime { get; set; } - } -} diff --git a/src/JT1078.Gateway/Services/JT1078AtomicCounterService.cs b/src/JT1078.Gateway/Services/JT1078AtomicCounterService.cs deleted file mode 100644 index a949cf2..0000000 --- a/src/JT1078.Gateway/Services/JT1078AtomicCounterService.cs +++ /dev/null @@ -1,52 +0,0 @@ -using JT1078.Gateway.Metadata; - -namespace JT1078.Gateway.Session.Services -{ - /// - /// 计数包服务 - /// - public class JT1078AtomicCounterService - { - private readonly JT1078AtomicCounter MsgSuccessCounter; - - private readonly JT1078AtomicCounter MsgFailCounter; - - public JT1078AtomicCounterService() - { - MsgSuccessCounter=new JT1078AtomicCounter(); - MsgFailCounter = new JT1078AtomicCounter(); - } - - public void Reset() - { - MsgSuccessCounter.Reset(); - MsgFailCounter.Reset(); - } - - public long MsgSuccessIncrement() - { - return MsgSuccessCounter.Increment(); - } - - public long MsgSuccessCount - { - get - { - return MsgSuccessCounter.Count; - } - } - - public long MsgFailIncrement() - { - return MsgFailCounter.Increment(); - } - - public long MsgFailCount - { - get - { - return MsgFailCounter.Count; - } - } - } -} diff --git a/src/JT1078.Gateway/Services/JT1078AtomicCounterServiceFactory.cs b/src/JT1078.Gateway/Services/JT1078AtomicCounterServiceFactory.cs deleted file mode 100644 index e8fe808..0000000 --- a/src/JT1078.Gateway/Services/JT1078AtomicCounterServiceFactory.cs +++ /dev/null @@ -1,30 +0,0 @@ -using JT1078.Gateway.Enums; -using System; -using System.Collections.Concurrent; - -namespace JT1078.Gateway.Session.Services -{ - public class JT1078AtomicCounterServiceFactory - { - private readonly ConcurrentDictionary cache; - - public JT1078AtomicCounterServiceFactory() - { - cache = new ConcurrentDictionary(); - } - - public JT1078AtomicCounterService Create(JT1078TransportProtocolType type) - { - if(cache.TryGetValue(type,out var service)) - { - return service; - } - else - { - var serviceNew = new JT1078AtomicCounterService(); - cache.TryAdd(type, serviceNew); - return serviceNew; - } - } - } -} diff --git a/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs new file mode 100644 index 0000000..5775d78 --- /dev/null +++ b/src/JT1078.Gateway/Sessions/JT1078SessionManager.cs @@ -0,0 +1,246 @@ +using JT1078.Gateway.Abstractions; +using JT1078.Gateway.Abstractions.Enums; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Threading.Tasks; + +namespace JT1078.Gateway.Sessions +{ + /// + /// + /// 不支持变态类型:既发TCP和UDP + /// + public class JT1078SessionManager + { + private readonly ILogger logger; + public ConcurrentDictionary Sessions { get; } + public ConcurrentDictionary TerminalPhoneNoSessions { get; } + public JT1078SessionManager(ILoggerFactory loggerFactory) + { + Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + logger = loggerFactory.CreateLogger("JT1078SessionManager"); + } + + public int TotalSessionCount + { + get + { + return Sessions.Count; + } + } + + public int TcpSessionCount + { + get + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.tcp).Count(); + } + } + + public int UdpSessionCount + { + get + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.udp).Count(); + } + } + + internal void TryLink(string terminalPhoneNo, IJT1078Session session) + { + DateTime curretDatetime= DateTime.Now; + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out IJT1078Session cacheSession)) + { + if (session.SessionID != cacheSession.SessionID) + { + //从转发到直连的数据需要更新缓存 + session.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession); + //会话通知 + //todo: 会话通知 + + } + else + { + cacheSession.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, cacheSession, cacheSession); + } + } + else + { + session.TerminalPhoneNo = terminalPhoneNo; + if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) + { + //会话通知 + //todo: 会话通知 + + } + } + } + + internal void TryLink(IJT1078Session session) + { + DateTime curretDatetime = DateTime.Now; + if (TerminalPhoneNoSessions.TryGetValue(session.SessionID, out IJT1078Session cacheSession)) + { + if (session.SessionID != cacheSession.SessionID) + { + //从转发到直连的数据需要更新缓存 + session.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(session.SessionID, session, cacheSession); + //会话通知 + //todo: 会话通知 + + } + else + { + cacheSession.ActiveTime = curretDatetime; + TerminalPhoneNoSessions.TryUpdate(session.SessionID, cacheSession, cacheSession); + } + } + else + { + session.TerminalPhoneNo = session.SessionID; + if (TerminalPhoneNoSessions.TryAdd(session.SessionID, session)) + { + //会话通知 + //todo: 会话通知 + + } + } + } + + public IJT1078Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint) + { + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT1078Session currentSession)) + { + currentSession.ActiveTime = DateTime.Now; + currentSession.TerminalPhoneNo = terminalPhoneNo; + currentSession.RemoteEndPoint = remoteEndPoint; + TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, currentSession, currentSession); + } + else + { + JT1078UdpSession session = new JT1078UdpSession(socket, remoteEndPoint); + session.TerminalPhoneNo = terminalPhoneNo; + Sessions.TryAdd(session.SessionID, session); + TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session); + currentSession = session; + } + //会话通知 + //todo: 会话通知 + return currentSession; + } + + internal bool TryAdd(IJT1078Session session) + { + return Sessions.TryAdd(session.SessionID, session); + } + + public async ValueTask TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data) + { + if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session)) + { + if (session.TransportProtocolType == JT1078TransportProtocolType.tcp) + { + await session.Client.SendAsync(data, SocketFlags.None); + } + else + { + await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); + } + return true; + } + else + { + return false; + } + } + + public async ValueTask TrySendBySessionIdAsync(string sessionId, byte[] data) + { + if (Sessions.TryGetValue(sessionId, out var session)) + { + if(session.TransportProtocolType== JT1078TransportProtocolType.tcp) + { + await session.Client.SendAsync(data, SocketFlags.None); + } + else + { + await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); + } + return true; + } + else + { + return false; + } + } + + public void RemoveByTerminalPhoneNo(string terminalPhoneNo) + { + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeTerminalPhoneNoSessions)) + { + // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 + //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 + var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == removeTerminalPhoneNoSessions.SessionID).Select(s => s.Key).ToList(); + //2.存在则一个个移除 + string tmpTerminalPhoneNo = terminalPhoneNo; + if (terminalPhoneNos.Count > 0) + { + //3.移除包括当前的设备号 + foreach (var item in terminalPhoneNos) + { + TerminalPhoneNoSessions.TryRemove(item, out _); + } + tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); + } + if (Sessions.TryRemove(removeTerminalPhoneNoSessions.SessionID, out var removeSession)) + { + removeSession.Close(); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); + //会话通知 + //todo: 会话通知 SessionOffline + + } + } + } + + public void RemoveBySessionId(string sessionId) + { + if (Sessions.TryRemove(sessionId, out var removeSession)) + { + var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == sessionId).Select(s => s.Key).ToList(); + if (terminalPhoneNos.Count > 0) + { + foreach (var item in terminalPhoneNos) + { + TerminalPhoneNoSessions.TryRemove(item, out _); + } + var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); + //会话通知 + //todo: 会话通知 SessionOffline tmpTerminalPhoneNo + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}"); + } + removeSession.Close(); + } + } + + public List GetTcpAll() + { + return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.tcp).Select(s => (JT1078TcpSession)s.Value).ToList(); + } + + public List GetUdpAll() + { + return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.udp).Select(s => (JT1078UdpSession)s.Value).ToList(); + } + } +} diff --git a/src/JT1078.Gateway/Sessions/JT1078TcpSession.cs b/src/JT1078.Gateway/Sessions/JT1078TcpSession.cs new file mode 100644 index 0000000..264eccb --- /dev/null +++ b/src/JT1078.Gateway/Sessions/JT1078TcpSession.cs @@ -0,0 +1,45 @@ +using JT1078.Gateway.Abstractions.Enums; +using JT1078.Gateway.Abstractions; +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace JT1078.Gateway.Sessions +{ + public class JT1078TcpSession : IJT1078Session + { + public JT1078TcpSession(Socket client) + { + Client = client; + RemoteEndPoint = client.RemoteEndPoint; + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionID = Guid.NewGuid().ToString("N"); + ReceiveTimeout = new CancellationTokenSource(); + } + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + public DateTime ActiveTime { get; set; } + public DateTime StartTime { get; set; } + public JT1078TransportProtocolType TransportProtocolType { get;} = JT1078TransportProtocolType.tcp; + public string SessionID { get; } + public Socket Client { get; set; } + public CancellationTokenSource ReceiveTimeout { get; set; } + public EndPoint RemoteEndPoint { get ; set; } + public void Close() + { + try + { + Client.Shutdown(SocketShutdown.Both); + } + catch { } + finally + { + Client.Close(); + } + } + } +} diff --git a/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs b/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs new file mode 100644 index 0000000..c1e8596 --- /dev/null +++ b/src/JT1078.Gateway/Sessions/JT1078UdpSession.cs @@ -0,0 +1,38 @@ +using JT1078.Gateway.Abstractions.Enums; +using JT1078.Gateway.Abstractions; +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace JT1078.Gateway.Sessions +{ + public class JT1078UdpSession: IJT1078Session + { + public JT1078UdpSession(Socket socket, EndPoint sender) + { + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionID = Guid.NewGuid().ToString("N"); + ReceiveTimeout = new CancellationTokenSource(); + RemoteEndPoint = sender; + Client = socket; + } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + public DateTime ActiveTime { get; set; } + public DateTime StartTime { get; set; } + public JT1078TransportProtocolType TransportProtocolType { get; set; } = JT1078TransportProtocolType.udp; + public string SessionID { get; } + public Socket Client { get; set; } + public CancellationTokenSource ReceiveTimeout { get; set; } + public EndPoint RemoteEndPoint { get; set ; } + public void Close() + { + + } + } +}