@@ -1,4 +1,4 @@ | |||||
# JT1078DotNetty | # JT1078.Gateway | ||||
## 前提条件 | ## 前提条件 | ||||
@@ -2,14 +2,14 @@ | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
namespace JT1078.Gateway.Enums | namespace JT1078.Gateway.Abstractions.Enums | ||||
{ | { | ||||
/// <summary> | /// <summary> | ||||
/// 传输协议类型 | /// 传输协议类型 | ||||
/// </summary> | /// </summary> | ||||
public enum JT1078TransportProtocolType | public enum JT1078TransportProtocolType | ||||
{ | { | ||||
tcp=1, | tcp = 1, | ||||
udp = 2 | udp = 2 | ||||
} | } | ||||
} | } |
@@ -0,0 +1,18 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Abstractions.Enums | |||||
{ | |||||
public enum JT1078UseType : byte | |||||
{ | |||||
/// <summary> | |||||
/// 使用正常方式 | |||||
/// </summary> | |||||
Normal = 1, | |||||
/// <summary> | |||||
/// 使用队列方式 | |||||
/// </summary> | |||||
Queue = 2 | |||||
} | |||||
} |
@@ -3,7 +3,7 @@ using System; | |||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
namespace JT1078.Gateway.Interfaces | namespace JT1078.Gateway.Abstractions | ||||
{ | { | ||||
public interface IJT1078Builder | public interface IJT1078Builder | ||||
{ | { |
@@ -0,0 +1,14 @@ | |||||
using JT1078.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078GatewayBuilder | |||||
{ | |||||
IJT1078Builder JT1078Builder { get; } | |||||
IJT1078Builder Builder(); | |||||
} | |||||
} |
@@ -0,0 +1,15 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078MsgConsumer : IJT1078PubSub, IDisposable | |||||
{ | |||||
void OnMessage(Action<(string TerminalNo, byte[] Data)> callback); | |||||
CancellationTokenSource Cts { get; } | |||||
void Subscribe(); | |||||
void Unsubscribe(); | |||||
} | |||||
} |
@@ -0,0 +1,17 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078MsgProducer : IJT1078PubSub, IDisposable | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="terminalNo">设备终端号</param> | |||||
/// <param name="data">jt1078 hex data</param> | |||||
ValueTask ProduceAsync(string terminalNo, byte[] data); | |||||
} | |||||
} |
@@ -0,0 +1,12 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078NormalGatewayBuilder: IJT1078GatewayBuilder | |||||
{ | |||||
} | |||||
} |
@@ -0,0 +1,16 @@ | |||||
using JT1078.Protocol; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078PackageConsumer : IJT1078PubSub, IDisposable | |||||
{ | |||||
void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback); | |||||
CancellationTokenSource Cts { get; } | |||||
void Subscribe(); | |||||
void Unsubscribe(); | |||||
} | |||||
} |
@@ -0,0 +1,18 @@ | |||||
using JT1078.Protocol; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078PackageProducer : IJT1078PubSub, IDisposable | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="terminalNo">设备终端号</param> | |||||
/// <param name="data">jt1078 package data</param> | |||||
ValueTask ProduceAsync(string terminalNo, JT1078Package data); | |||||
} | |||||
} |
@@ -0,0 +1,11 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078PubSub | |||||
{ | |||||
string TopicName { get; } | |||||
} | |||||
} |
@@ -0,0 +1,12 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078QueueGatewayBuilder: IJT1078GatewayBuilder | |||||
{ | |||||
} | |||||
} |
@@ -0,0 +1,26 @@ | |||||
using JT1078.Gateway.Abstractions.Enums; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Text; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Abstractions | |||||
{ | |||||
public interface IJT1078Session | |||||
{ | |||||
/// <summary> | |||||
/// 终端手机号 | |||||
/// </summary> | |||||
string TerminalPhoneNo { get; set; } | |||||
string SessionID { get; } | |||||
Socket Client { get; set; } | |||||
DateTime StartTime { get; set; } | |||||
DateTime ActiveTime { get; set; } | |||||
JT1078TransportProtocolType TransportProtocolType { get;} | |||||
CancellationTokenSource ReceiveTimeout { get; set; } | |||||
EndPoint RemoteEndPoint { get; set; } | |||||
void Close(); | |||||
} | |||||
} |
@@ -0,0 +1,34 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.1</TargetFramework> | |||||
<LangVersion>8.0</LangVersion> | |||||
<Copyright>Copyright 2019.</Copyright> | |||||
<Authors>SmallChi(Koike)</Authors> | |||||
<PackageId>JT1078.Gateway.Abstractions</PackageId> | |||||
<Product>JT1078.Gateway.Abstractions</Product> | |||||
<Description>基于Pipeline实现的JT1078Gateway的抽象库</Description> | |||||
<PackageReleaseNotes>基于Pipeline实现的JT1078Gateway的抽象库</PackageReleaseNotes> | |||||
<RepositoryUrl>https://github.com/SmallChi/JT1078Gateway</RepositoryUrl> | |||||
<PackageProjectUrl>https://github.com/SmallChi/JT1078Gateway</PackageProjectUrl> | |||||
<licenseUrl>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</licenseUrl> | |||||
<license>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</license> | |||||
<GeneratePackageOnBuild>false</GeneratePackageOnBuild> | |||||
<SignAssembly>false</SignAssembly> | |||||
<PackageLicenseFile>LICENSE</PackageLicenseFile> | |||||
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance> | |||||
<Version>1.0.0-preview1</Version> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="JT1078" Version="1.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.5" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.5" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.5" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.5" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,27 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netcoreapp3.1</TargetFramework> | |||||
<IsPackable>false</IsPackable> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.6.1" /> | |||||
<PackageReference Include="System.IO.Pipelines" Version="4.7.2" /> | |||||
<PackageReference Include="xunit" Version="2.4.1" /> | |||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.2"> | |||||
<PrivateAssets>all</PrivateAssets> | |||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | |||||
</PackageReference> | |||||
<PackageReference Include="coverlet.collector" Version="1.3.0"> | |||||
<PrivateAssets>all</PrivateAssets> | |||||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | |||||
</PackageReference> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\JT1078.Gateway\JT1078.Gateway.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,244 @@ | |||||
using System; | |||||
using System.Buffers; | |||||
using JT1078.Protocol.Extensions; | |||||
using System.Collections.Generic; | |||||
using System.IO.Pipelines; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using Xunit; | |||||
using JT1078.Protocol; | |||||
using System.Buffers.Binary; | |||||
using JT1078.Protocol.Enums; | |||||
using System.Linq; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Test | |||||
{ | |||||
public class PipeTest | |||||
{ | |||||
[Fact] | |||||
public void Test1() | |||||
{ | |||||
var reader = new ReadOnlySequence<byte>("303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes()); | |||||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||||
long totalConsumed = 0; | |||||
List<byte[]> packages = new List<byte[]>(); | |||||
while (!seqReader.End) | |||||
{ | |||||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||||
Assert.Equal(JT1078Package.FH, headerValue); | |||||
if(JT1078Package.FH == headerValue) | |||||
{ | |||||
//sim | |||||
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||||
Assert.Equal("1901305037", sim); | |||||
//根据数据类型处理对应的数据长度 | |||||
seqReader.Advance(15); | |||||
if(seqReader.TryRead(out byte dataType)) | |||||
{ | |||||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||||
Assert.Equal(JT1078DataType.视频I帧, label3.DataType); | |||||
int bodyLength = 0; | |||||
//透传的时候没有该字段 | |||||
if (label3.DataType!= JT1078DataType.透传数据) | |||||
{ | |||||
//时间戳 | |||||
bodyLength += 8; | |||||
} | |||||
//非视频帧时没有该字段 | |||||
if (label3.DataType == JT1078DataType.视频I帧 || | |||||
label3.DataType == JT1078DataType.视频P帧 || | |||||
label3.DataType == JT1078DataType.视频B帧) | |||||
{ | |||||
//上一个关键帧 + 上一帧 = 2 + 2 | |||||
bodyLength += 4; | |||||
} | |||||
seqReader.Advance(bodyLength); | |||||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; | |||||
//数据体长度 | |||||
seqReader.Advance(2); | |||||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||||
Assert.Equal(950, bodyLength); | |||||
//数据体 | |||||
seqReader.Advance(bodyLength); | |||||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); | |||||
packages.Add(package); | |||||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||||
if (seqReader.End) break; | |||||
} | |||||
} | |||||
} | |||||
Assert.Single(packages); | |||||
} | |||||
[Fact] | |||||
public void Test2() | |||||
{ | |||||
var data1 = "303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes(); | |||||
var data2 = "303163648162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); | |||||
var reader = new ReadOnlySequence<byte>(data1.Concat(data2).ToArray()); | |||||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||||
long totalConsumed = 0; | |||||
List<byte[]> packages = new List<byte[]>(); | |||||
while (!seqReader.End) | |||||
{ | |||||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||||
Assert.Equal(JT1078Package.FH, headerValue); | |||||
if (JT1078Package.FH == headerValue) | |||||
{ | |||||
//根据数据类型处理对应的数据长度 | |||||
seqReader.Advance(15); | |||||
if (seqReader.TryRead(out byte dataType)) | |||||
{ | |||||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||||
int bodyLength = 0; | |||||
//透传的时候没有该字段 | |||||
if (label3.DataType != JT1078DataType.透传数据) | |||||
{ | |||||
//时间戳 | |||||
bodyLength += 8; | |||||
} | |||||
//非视频帧时没有该字段 | |||||
if (label3.DataType == JT1078DataType.视频I帧 || | |||||
label3.DataType == JT1078DataType.视频P帧 || | |||||
label3.DataType == JT1078DataType.视频B帧) | |||||
{ | |||||
//上一个关键帧 + 上一帧 = 2 + 2 | |||||
bodyLength += 4; | |||||
} | |||||
seqReader.Advance(bodyLength); | |||||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; | |||||
//数据体长度 | |||||
seqReader.Advance(2); | |||||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||||
//数据体 | |||||
seqReader.Advance(bodyLength); | |||||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); | |||||
packages.Add(package); | |||||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||||
if (seqReader.End) break; | |||||
} | |||||
} | |||||
} | |||||
Assert.Equal(2,packages.Count); | |||||
} | |||||
[Fact] | |||||
public void Test3() | |||||
{ | |||||
Assert.Throws<ArgumentException>(() => | |||||
{ | |||||
var data1 = "3031636481".ToHexBytes(); | |||||
var reader = new ReadOnlySequence<byte>(data1); | |||||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||||
while (!seqReader.End) | |||||
{ | |||||
if ((seqReader.Length - seqReader.Consumed)< 15) | |||||
{ | |||||
throw new ArgumentException("not jt1078 package"); | |||||
} | |||||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||||
Assert.Equal(JT1078Package.FH, headerValue); | |||||
if (JT1078Package.FH == headerValue) | |||||
{ | |||||
//根据数据类型处理对应的数据长度 | |||||
seqReader.Advance(15); | |||||
if (seqReader.TryRead(out byte dataType)) | |||||
{ | |||||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||||
if (seqReader.End) break; | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
throw new ArgumentException("not jt1078 package"); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
[Fact] | |||||
public void Test4() | |||||
{ | |||||
Assert.Throws<ArgumentException>(() => | |||||
{ | |||||
var data1 = "303163648162000000190130503701010000016f95973f840000000003b600000001674d001495a85825900000000168ee3c800000000106e5010d800000000165b80000090350bfdfe840b5b35c676079446e3ffe2f5240e25cc6b35cebac31720656a853ba1b571246fb858eaa5d8266acb95b92705fd187f1fd20ff0ca6b62c4cbcb3b662f5d61c016928ca82b411acdc4df6edb2034624b992eee9b6c241e1903bf9477c6e4293b65ba75e98d5a2566da6f71c85e1052a9d5ed35c393b1a73b181598749f3d26f6fbf48f0be61c673fcb9f2b0d305794bed03af5e3cedff7768bed3120261d6f3547a6d519943c2afcb80e423c9e6db088a06200dbfaa81edc5bc0de67957e791f67bf040ef944f7d62983d32517b2fb2d9572a71340c225617231bc0d98e66d19fe81a19b44280860b273f700bf3f3444a928e93fefc716e2af46995fbb658d0580a49e42f6835270c8c154abe28a17f76550b1b1fafe62945f80490b3f780fe9bb4d4b4107eac3d50b8c99d1a191f6754992096683fb0f599846bae759b06222079f5404be39e4416136c7c42255b0e7ca42d86fc2227892406d61f9816bc125d017989a671f13f2f4052e018b1fb02460802029a049a23d2ffeea6ac552109d35aa8731483fb2cae963987156056cafb32436a23a0dc918fb2440b14c9e6124441e7bb3b08706066d1ddab512267767b6e522f80732e67046ff5ad4d8193bf5cc5c05ccceb73a36b6c3ea39fa91bb308c8bb7bf88515d9c52409128e8b94e33e48a5396c35c20bd83b7c0e6d3d4a24bc14e84810066c686c6c04e687c41123fe87c89d5fa07b0095e7f82d3b07e72570163c47444bdde16ae9bfacd540df047e8ee34e98ff33178da5c7e6be9272e6dcfbb6db7e678a6d1d3832226c9bf85afa14feac15a270d5d3724a121b8fc9b40f0d37bb7f432de5421d286a65313a6efd251f7ed75b4ef6557975af5da5df2b87a0bbc1cb58183c4c1e24fdc4eb016777af1a6fa4a29d3eed7c4463482e591a6dc20540cabb6d7dd29cbb8ffdacafdaac2dd36db70fefe14fdeec85ef5fe01bb104d2d6439dbd7ceefc87007ce07b8409751dd7c21aa9a537f5fdefdef7d6ceba8d5ae876522f75dedd472e4dde1284e71380ee75ed313b2b9b9a94a56ebd03ae36b64a3b35abbdc7ba380016218201d156658ed9b5632f80f921879063e9037cd3509d01a2e91c17e03d892e2bc381ac723eba266497a1fbb0dc77ab3f4a9a981f95977b025b005a0e09b1add481888333927963fc5e5bf376655cb00e4ca8841fa450c8653f91cf2f3fb0247dbcace5dfde3af4a854f9fa2aaaa33706a78321332273ab4ee837ff4f8eba08676e7f889464a842b8e3e4a579d2".ToHexBytes(); | |||||
//var data2 = "303163648162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); | |||||
var data2 = "8162000100190130503701030000016f95973f840000000003b645f4927c25abe98be0b99f260c25cb3f712cdbad53f19a7b840297a2cafe6f7ef6733add48b133a12bf1b79059da8517d5a788f7df85c27c1c3dcfd3985de5dc01289483ff5a75c881bbcf4e348700159c194b3abef0e267d7ae9373947b5386708e21c17b9c63e8a7375441672754bc708299f9c9234834f9560c433a95e467a4e19ceba4344be62e4960a22ef4ec3665e79d8e561cd56f99b330334a84c860c77dcaa4bd989c29f19345af6996929871a4db81a54ebaacb846ec856f9afd661f5d2b632d7f49e6f2b6df7eec0d2df3bf7126db0e5622694861cf58c098520a17a010aaa9782b265bc9188c434405d00dfd70b98593d6937d3594549bdd0f4e2d1d1e4db56b8b5366fcb7d92f2a85c89af023eaffff6aa83417ffb9d463c5b6f5e2480dc022d50c7b7920295785c7b8af20a363e621a95018d9b595dc406a099f2221e1e10a2380776a701dc66c7da44d76d4b9d3aa1420ea30f366417a1f4ff34bb2b3aab2df46c45e0007cd2d571eb14de50a2ddf2a32bad57ab048278df10bfd25e1e557bd51c3e3e90e8c0951b11cb3611ed5f3939a4fed21c47b7a06cb447d7ac5cda7714eb35dc44a25b57faf3752e1b66d481b2d25b8fffb2ff0c27890162dc69180d9fb4ffffc3408ed41fb31dde78979f3ecffbea1c9e094480d838c91f36ce70498e6f1ff65803ffb67ecd1057f2a7648f1a4a4d190e447ed6d630b1d8c58808d0947fa463451f3f0002d03f269ef3d10679edbd8014bd0cf4a7c5d928be415498ec4356109d46dc4843f73e3c166087866e7f6b95de0fa7bca4b120fddc3d3a60fc09869353f5839c2cdc80270ae7f8014e940c403c75004cde11d1dbbbdc1ee5e1b1a7020fd053f7e2a2a30f926318d51f15893e5849a59df571f597557bc4cf2f192033a3b69643027ed6c538750f241be16e5652b66384d5d566604b2ac682df39284997cee7586a538ab9f37df53a2fd2ce547f94c11a6d301fd15b429cb57eac68b05c794f4c3db82f7f38a216adb592c0e09f16514018cb1d20ab36dd0e2bf96901a16fd6cdaf1dad409be144059554d3cd2562d332d9474498747a444f179298f74e72ff59e3c6a9727fd18a1d62e7f340ff93839a71509cad4d8fda099034e97eb3407ff91340f7590a0af188af847119a7fbe6e441da1dfb177be57a30018794f9ba6e131f5141636fb4df6117ce65148cf48e315739210373162239699ae8a3dc7ad624bdf818bc2230778990764c8c5f7cc13e3e74f1041e0be1b15bb0503a57091c3d89a6283820ecbaaa7ad76aa02df71a9979268a185a6a387216986651558e89d4f70fe18eff3b09a".ToHexBytes(); | |||||
var reader = new ReadOnlySequence<byte>(data1.Concat(data2).ToArray()); | |||||
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader); | |||||
long totalConsumed = 0; | |||||
List<byte[]> packages = new List<byte[]>(); | |||||
while (!seqReader.End) | |||||
{ | |||||
if ((seqReader.Length - seqReader.Consumed) < 15) | |||||
{ | |||||
throw new ArgumentException("not jt1078 package"); | |||||
} | |||||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||||
if (JT1078Package.FH == headerValue) | |||||
{ | |||||
//根据数据类型处理对应的数据长度 | |||||
seqReader.Advance(15); | |||||
if (seqReader.TryRead(out byte dataType)) | |||||
{ | |||||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||||
int bodyLength = 0; | |||||
//透传的时候没有该字段 | |||||
if (label3.DataType != JT1078DataType.透传数据) | |||||
{ | |||||
//时间戳 | |||||
bodyLength += 8; | |||||
} | |||||
//非视频帧时没有该字段 | |||||
if (label3.DataType == JT1078DataType.视频I帧 || | |||||
label3.DataType == JT1078DataType.视频P帧 || | |||||
label3.DataType == JT1078DataType.视频B帧) | |||||
{ | |||||
//上一个关键帧 + 上一帧 = 2 + 2 | |||||
bodyLength += 4; | |||||
} | |||||
seqReader.Advance(bodyLength); | |||||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; | |||||
//数据体长度 | |||||
seqReader.Advance(2); | |||||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||||
//数据体 | |||||
seqReader.Advance(bodyLength); | |||||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); | |||||
packages.Add(package); | |||||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||||
if (seqReader.End) break; | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
throw new ArgumentException("not jt1078 package"); | |||||
} | |||||
} | |||||
}); | |||||
} | |||||
public string ReadBCD(ReadOnlySpan<byte> readOnlySpan,int len) | |||||
{ | |||||
int count = len / 2; | |||||
StringBuilder bcdSb = new StringBuilder(count); | |||||
for (int i = 0; i < count; i++) | |||||
{ | |||||
bcdSb.Append(readOnlySpan[i].ToString("X2")); | |||||
} | |||||
return bcdSb.ToString().TrimStart('0'); | |||||
} | |||||
[Fact] | |||||
public void Test5() | |||||
{ | |||||
var empty = "000000000".TrimStart('0'); | |||||
Assert.Equal("", empty); | |||||
} | |||||
} | |||||
} |
@@ -3,9 +3,13 @@ Microsoft Visual Studio Solution File, Format Version 12.00 | |||||
# Visual Studio Version 16 | # Visual Studio Version 16 | ||||
VisualStudioVersion = 16.0.29418.71 | VisualStudioVersion = 16.0.29418.71 | ||||
MinimumVisualStudioVersion = 10.0.40219.1 | MinimumVisualStudioVersion = 10.0.40219.1 | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.SimpleServer", "JT1078.Gateway.SimpleServer\JT1078.Gateway.SimpleServer.csproj", "{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}" | Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}" | ||||
EndProject | |||||
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}" | |||||
EndProject | EndProject | ||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
@@ -17,14 +21,21 @@ Global | |||||
{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.Build.0 = Debug|Any CPU | {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.ActiveCfg = Release|Any CPU | {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.Build.0 = Release|Any CPU | {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||||
{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Debug|Any CPU.Build.0 = Debug|Any CPU | {DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Release|Any CPU.ActiveCfg = Release|Any CPU | {DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{F9DB75C7-BE07-416E-BE05-6BA170A0AAC6}.Release|Any CPU.Build.0 = Release|Any CPU | {DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(NestedProjects) = preSolution | |||||
{DAC80A8E-4172-451F-910D-9032BF8640F9} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E} | |||||
EndGlobalSection | |||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {9172690A-1D5A-491A-ACDD-3AF893980E0B} | SolutionGuid = {9172690A-1D5A-491A-ACDD-3AF893980E0B} | ||||
EndGlobalSection | EndGlobalSection | ||||
@@ -10,26 +10,25 @@ namespace JT1078.Gateway.Configurations | |||||
public int TcpPort { get; set; } = 1078; | public int TcpPort { get; set; } = 1078; | ||||
public int UdpPort { get; set; } = 1078; | public int UdpPort { get; set; } = 1078; | ||||
public int HttpPort { get; set; } = 1079; | public int HttpPort { get; set; } = 1079; | ||||
public int QuietPeriodSeconds { get; set; } = 1; | |||||
public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds); | |||||
public int ShutdownTimeoutSeconds { get; set; } = 3; | |||||
public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds); | |||||
public int SoBacklog { get; set; } = 8192; | public int SoBacklog { get; set; } = 8192; | ||||
public int MiniNumBufferSize { get; set; } = 8096; | |||||
public int EventLoopCount { get; set; } = Environment.ProcessorCount; | /// <summary> | ||||
/// Tcp读超时 | |||||
public int ReaderIdleTimeSeconds { get; set; } = 3600; | /// 默认10分钟检查一次 | ||||
/// </summary> | |||||
public int WriterIdleTimeSeconds { get; set; } = 3600; | public int TcpReaderIdleTimeSeconds { get; set; } = 60 * 10; | ||||
/// <summary> | |||||
public int AllIdleTimeSeconds { get; set; } = 3600; | /// Tcp 60s检查一次 | ||||
/// </summary> | |||||
public JT1078RemoteServerOptions RemoteServerOptions { get; set; } | public int TcpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; | ||||
/// <summary> | |||||
/// Udp读超时 | |||||
/// </summary> | |||||
public int UdpReaderIdleTimeSeconds { get; set; } = 60; | |||||
/// <summary> | |||||
/// Udp 60s检查一次 | |||||
/// </summary> | |||||
public int UdpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; | |||||
public JT1078Configuration Value => this; | public JT1078Configuration Value => this; | ||||
} | } | ||||
@@ -1,9 +1,5 @@ | |||||
using System; | using JT1078.Gateway.Abstractions; | ||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using JT1078.Gateway.Interfaces; | |||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
namespace JT1078.Gateway.Impl | namespace JT1078.Gateway.Impl | ||||
{ | { | ||||
@@ -0,0 +1,20 @@ | |||||
| |||||
using JT1078.Gateway.Abstractions; | |||||
namespace JT1078.Gateway.Impl | |||||
{ | |||||
public class JT1078GatewayBuilderDefault : IJT1078GatewayBuilder | |||||
{ | |||||
public IJT1078Builder JT1078Builder { get; } | |||||
public JT1078GatewayBuilderDefault(IJT1078Builder builder) | |||||
{ | |||||
JT1078Builder = builder; | |||||
} | |||||
public IJT1078Builder Builder() | |||||
{ | |||||
return JT1078Builder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,19 @@ | |||||
using JT1078.Gateway.Abstractions; | |||||
namespace JT1078.Gateway.Impl | |||||
{ | |||||
public class JT1078NormalGatewayBuilderDefault : IJT1078NormalGatewayBuilder | |||||
{ | |||||
public IJT1078Builder JT1078Builder { get; } | |||||
public JT1078NormalGatewayBuilderDefault(IJT1078Builder builder) | |||||
{ | |||||
JT1078Builder = builder; | |||||
} | |||||
public IJT1078Builder Builder() | |||||
{ | |||||
return JT1078Builder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,19 @@ | |||||
using JT1078.Gateway.Abstractions; | |||||
namespace JT1078.Gateway.Impl | |||||
{ | |||||
public class JT1078QueueGatewayBuilderDefault : IJT1078QueueGatewayBuilder | |||||
{ | |||||
public IJT1078Builder JT1078Builder { get; } | |||||
public JT1078QueueGatewayBuilderDefault(IJT1078Builder builder) | |||||
{ | |||||
JT1078Builder = builder; | |||||
} | |||||
public IJT1078Builder Builder() | |||||
{ | |||||
return JT1078Builder; | |||||
} | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using DotNetty.Codecs.Http; | |||||
using DotNetty.Transport.Channels; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Security.Principal; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IHttpMiddleware | |||||
{ | |||||
void Next(IChannelHandlerContext ctx, IFullHttpRequest req, IPrincipal principal); | |||||
} | |||||
} |
@@ -1,13 +0,0 @@ | |||||
using DotNetty.Codecs.Http; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Security.Principal; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078Authorization | |||||
{ | |||||
bool Authorization(IFullHttpRequest request, out IPrincipal principal); | |||||
} | |||||
} |
@@ -1,15 +0,0 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078HttpBuilder | |||||
{ | |||||
IJT1078Builder Instance { get; } | |||||
IJT1078Builder Builder(); | |||||
IJT1078HttpBuilder Replace<T>() where T : IJT1078Authorization; | |||||
IJT1078HttpBuilder UseHttpMiddleware<T>() where T : IHttpMiddleware; | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078TcpBuilder | |||||
{ | |||||
IJT1078Builder Instance { get;} | |||||
IJT1078Builder Builder(); | |||||
IJT1078TcpBuilder Replace<T>() where T : IJT1078TcpMessageHandlers; | |||||
} | |||||
} |
@@ -1,11 +0,0 @@ | |||||
using JT1078.Gateway.Metadata; | |||||
using JT1078.Protocol; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078TcpMessageHandlers | |||||
{ | |||||
Task<JT1078Response> Processor(JT1078Request request); | |||||
} | |||||
} |
@@ -1,14 +0,0 @@ | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078UdpBuilder | |||||
{ | |||||
IJT1078Builder Instance { get; } | |||||
IJT1078Builder Builder(); | |||||
IJT1078UdpBuilder Replace<T>() where T : IJT1078UdpMessageHandlers; | |||||
} | |||||
} |
@@ -1,11 +0,0 @@ | |||||
using JT1078.Gateway.Metadata; | |||||
using JT1078.Protocol; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.Gateway.Interfaces | |||||
{ | |||||
public interface IJT1078UdpMessageHandlers | |||||
{ | |||||
Task<JT1078Response> Processor(JT1078Request request); | |||||
} | |||||
} |
@@ -1,38 +1,56 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | <Project Sdk="Microsoft.NET.Sdk"> | ||||
<PropertyGroup> | <PropertyGroup> | ||||
<TargetFramework>netstandard2.0</TargetFramework> | <TargetFramework>netstandard2.1</TargetFramework> | ||||
<LangVersion>8.0</LangVersion> | <LangVersion>8.0</LangVersion> | ||||
<Copyright>Copyright 2019.</Copyright> | <Copyright>Copyright 2019.</Copyright> | ||||
<Authors>SmallChi(Koike)</Authors> | <Authors>SmallChi(Koike)</Authors> | ||||
<PackageId>JT1078.Gateway</PackageId> | <PackageId>JT1078.Gateway</PackageId> | ||||
<Product>JT1078.Gateway</Product> | <Product>JT1078.Gateway</Product> | ||||
<Description>基于DotNetty实现的JT1078DotNetty的库</Description> | <Description>基于Pipeline实现的JT1078Gateway库</Description> | ||||
<PackageReleaseNotes>基于DotNetty实现的JT1078DotNetty的库</PackageReleaseNotes> | <PackageReleaseNotes>基于Pipeline实现的JT1078Gateway库</PackageReleaseNotes> | ||||
<RepositoryUrl>https://github.com/SmallChi/JT1078DotNetty</RepositoryUrl> | <RepositoryUrl>https://github.com/SmallChi/JT1078Gateway</RepositoryUrl> | ||||
<PackageProjectUrl>https://github.com/SmallChi/JT1078DotNetty</PackageProjectUrl> | <PackageProjectUrl>https://github.com/SmallChi/JT1078Gateway</PackageProjectUrl> | ||||
<licenseUrl>https://github.com/SmallChi/JT1078DotNetty/blob/master/LICENSE</licenseUrl> | <licenseUrl>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</licenseUrl> | ||||
<license>https://github.com/SmallChi/JT1078DotNetty/blob/master/LICENSE</license> | <license>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</license> | ||||
<GeneratePackageOnBuild>false</GeneratePackageOnBuild> | <GeneratePackageOnBuild>false</GeneratePackageOnBuild> | ||||
<SignAssembly>false</SignAssembly> | <SignAssembly>false</SignAssembly> | ||||
<PackageLicenseFile>LICENSE</PackageLicenseFile> | <PackageLicenseFile>LICENSE</PackageLicenseFile> | ||||
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance> | <PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance> | ||||
<Version>1.0.0-preview1</Version> | <Version>1.0.0-preview1</Version> | ||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | |||||
<Compile Remove="Codecs\**" /> | |||||
<Compile Remove="Extensions\**" /> | |||||
<Compile Remove="Http\**" /> | |||||
<Compile Remove="Session\**" /> | |||||
<Compile Remove="Tcp\**" /> | |||||
<Compile Remove="Udp\**" /> | |||||
<EmbeddedResource Remove="Codecs\**" /> | |||||
<EmbeddedResource Remove="Extensions\**" /> | |||||
<EmbeddedResource Remove="Http\**" /> | |||||
<EmbeddedResource Remove="Session\**" /> | |||||
<EmbeddedResource Remove="Tcp\**" /> | |||||
<EmbeddedResource Remove="Udp\**" /> | |||||
<None Remove="Codecs\**" /> | |||||
<None Remove="Extensions\**" /> | |||||
<None Remove="Http\**" /> | |||||
<None Remove="Session\**" /> | |||||
<None Remove="Tcp\**" /> | |||||
<None Remove="Udp\**" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<Compile Remove="Metadata\JT1078HttpSession.cs" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="DotNetty.Codecs.Http" Version="0.6.0" /> | <PackageReference Include="System.IO.Pipelines" Version="4.7.2" /> | ||||
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" /> | |||||
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" /> | |||||
<PackageReference Include="JT1078" Version="1.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.0.0" /> | |||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.2" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" /> | |||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | <None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | ||||
</ItemGroup> | </ItemGroup> | ||||
<ItemGroup> | |||||
<ProjectReference Include="..\JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj" /> | |||||
</ItemGroup> | |||||
</Project> | </Project> |
@@ -1,12 +1,10 @@ | |||||
using JT1078.Gateway.Configurations; | using JT1078.Gateway.Abstractions; | ||||
using JT1078.Gateway.Configurations; | |||||
using JT1078.Gateway.Impl; | using JT1078.Gateway.Impl; | ||||
using JT1078.Gateway.Interfaces; | |||||
using JT1078.Gateway.Session.Services; | |||||
using Microsoft.Extensions.Configuration; | using Microsoft.Extensions.Configuration; | ||||
using Microsoft.Extensions.DependencyInjection; | using Microsoft.Extensions.DependencyInjection; | ||||
using Microsoft.Extensions.DependencyInjection.Extensions; | using Microsoft.Extensions.DependencyInjection.Extensions; | ||||
using Microsoft.Extensions.Options; | using Microsoft.Extensions.Options; | ||||
using Newtonsoft.Json; | |||||
using System; | using System; | ||||
using System.Runtime.CompilerServices; | using System.Runtime.CompilerServices; | ||||
@@ -21,7 +19,6 @@ namespace JT1078.Gateway | |||||
{ | { | ||||
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); | IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); | ||||
builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration")); | builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration")); | ||||
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>(); | |||||
return builder; | return builder; | ||||
} | } | ||||
@@ -29,8 +26,6 @@ namespace JT1078.Gateway | |||||
{ | { | ||||
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); | IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors); | ||||
builder.Services.Configure(jt1078Options); | builder.Services.Configure(jt1078Options); | ||||
builder.Services.TryAddSingleton<JT1078AtomicCounterService>(); | |||||
builder.Services.TryAddSingleton<JT1078AtomicCounterServiceFactory>(); | |||||
return builder; | return builder; | ||||
} | } | ||||
} | } |
@@ -0,0 +1,298 @@ | |||||
using System; | |||||
using System.Buffers; | |||||
using System.Buffers.Binary; | |||||
using System.Collections.Generic; | |||||
using System.IO.Pipelines; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
using JT1078.Gateway.Abstractions; | |||||
using JT1078.Gateway.Abstractions.Enums; | |||||
using JT1078.Gateway.Configurations; | |||||
using JT1078.Gateway.Sessions; | |||||
using JT1078.Protocol; | |||||
using JT1078.Protocol.Enums; | |||||
using JT1078.Protocol.Extensions; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
namespace JT1078.Gateway | |||||
{ | |||||
public class JT1078TcpServer : IHostedService | |||||
{ | |||||
private Socket server; | |||||
private readonly ILogger Logger; | |||||
private readonly JT1078Configuration Configuration; | |||||
private readonly JT1078SessionManager SessionManager; | |||||
private readonly IJT1078PackageProducer jT1078PackageProducer; | |||||
private readonly IJT1078MsgProducer jT1078MsgProducer; | |||||
private readonly JT1078UseType jT1078UseType; | |||||
/// <summary> | |||||
/// 使用正常方式 | |||||
/// </summary> | |||||
/// <param name="jT1078PackageProducer"></param> | |||||
/// <param name="jT1078ConfigurationAccessor"></param> | |||||
/// <param name="loggerFactory"></param> | |||||
/// <param name="jT1078SessionManager"></param> | |||||
public JT1078TcpServer( | |||||
IJT1078PackageProducer jT1078PackageProducer, | |||||
IOptions<JT1078Configuration> jT1078ConfigurationAccessor, | |||||
ILoggerFactory loggerFactory, | |||||
JT1078SessionManager jT1078SessionManager) | |||||
{ | |||||
SessionManager = jT1078SessionManager; | |||||
jT1078UseType = JT1078UseType.Normal; | |||||
Logger = loggerFactory.CreateLogger("JT1078TcpServer"); | |||||
Configuration = jT1078ConfigurationAccessor.Value; | |||||
this.jT1078PackageProducer = jT1078PackageProducer; | |||||
InitServer(); | |||||
} | |||||
/// <summary> | |||||
/// 使用队列方式 | |||||
/// </summary> | |||||
/// <param name="jT1078MsgProducer"></param> | |||||
/// <param name="jT1078ConfigurationAccessor"></param> | |||||
/// <param name="loggerFactory"></param> | |||||
/// <param name="jT1078SessionManager"></param> | |||||
public JT1078TcpServer( | |||||
IJT1078MsgProducer jT1078MsgProducer, | |||||
IOptions<JT1078Configuration> jT1078ConfigurationAccessor, | |||||
ILoggerFactory loggerFactory, | |||||
JT1078SessionManager jT1078SessionManager) | |||||
{ | |||||
SessionManager = jT1078SessionManager; | |||||
jT1078UseType = JT1078UseType.Queue; | |||||
Logger = loggerFactory.CreateLogger("JT1078TcpServer"); | |||||
Configuration = jT1078ConfigurationAccessor.Value; | |||||
this.jT1078MsgProducer = jT1078MsgProducer; | |||||
InitServer(); | |||||
} | |||||
private void InitServer() | |||||
{ | |||||
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort); | |||||
server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); | |||||
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); | |||||
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); | |||||
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveBuffer, Configuration.MiniNumBufferSize); | |||||
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendBuffer, Configuration.MiniNumBufferSize); | |||||
server.LingerState = new LingerOption(false, 0); | |||||
server.Bind(IPEndPoint); | |||||
server.Listen(Configuration.SoBacklog); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Logger.LogInformation($"JT1078 Tcp Server start at {IPAddress.Any}:{Configuration.TcpPort}."); | |||||
Task.Factory.StartNew(async () => | |||||
{ | |||||
while (!cancellationToken.IsCancellationRequested) | |||||
{ | |||||
var socket = await server.AcceptAsync(); | |||||
JT1078TcpSession jT808TcpSession = new JT1078TcpSession(socket); | |||||
SessionManager.TryAdd(jT808TcpSession); | |||||
await Task.Factory.StartNew(async (state) => | |||||
{ | |||||
var session = (JT1078TcpSession)state; | |||||
if (Logger.IsEnabled(LogLevel.Information)) | |||||
{ | |||||
Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); | |||||
} | |||||
var pipe = new Pipe(); | |||||
Task writing = FillPipeAsync(session, pipe.Writer); | |||||
Task reading = ReadPipeAsync(session, pipe.Reader); | |||||
await Task.WhenAll(reading, writing); | |||||
SessionManager.RemoveBySessionId(session.SessionID); | |||||
}, jT808TcpSession); | |||||
} | |||||
}, cancellationToken); | |||||
return Task.CompletedTask; | |||||
} | |||||
private async Task FillPipeAsync(JT1078TcpSession session, PipeWriter writer) | |||||
{ | |||||
while (true) | |||||
{ | |||||
try | |||||
{ | |||||
Memory<byte> memory = writer.GetMemory(Configuration.MiniNumBufferSize); | |||||
//设备多久没发数据就断开连接 Receive Timeout. | |||||
int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token); | |||||
if (bytesRead == 0) | |||||
{ | |||||
break; | |||||
} | |||||
writer.Advance(bytesRead); | |||||
} | |||||
catch (OperationCanceledException ex) | |||||
{ | |||||
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); | |||||
break; | |||||
} | |||||
catch (System.Net.Sockets.SocketException ex) | |||||
{ | |||||
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}"); | |||||
break; | |||||
} | |||||
#pragma warning disable CA1031 // Do not catch general exception types | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); | |||||
break; | |||||
} | |||||
#pragma warning restore CA1031 // Do not catch general exception types | |||||
FlushResult result = await writer.FlushAsync(); | |||||
if (result.IsCompleted) | |||||
{ | |||||
break; | |||||
} | |||||
} | |||||
writer.Complete(); | |||||
} | |||||
private async Task ReadPipeAsync(JT1078TcpSession session, PipeReader reader) | |||||
{ | |||||
while (true) | |||||
{ | |||||
ReadResult result = await reader.ReadAsync(); | |||||
if (result.IsCompleted) | |||||
{ | |||||
break; | |||||
} | |||||
ReadOnlySequence<byte> buffer = result.Buffer; | |||||
SequencePosition consumed = buffer.Start; | |||||
SequencePosition examined = buffer.End; | |||||
try | |||||
{ | |||||
if (result.IsCanceled) break; | |||||
if (buffer.Length > 0) | |||||
{ | |||||
ReaderBuffer(ref buffer, session, out consumed, out examined); | |||||
} | |||||
} | |||||
#pragma warning disable CA1031 // Do not catch general exception types | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, $"[ReadPipe Error]:{session.Client.RemoteEndPoint}"); | |||||
break; | |||||
} | |||||
#pragma warning restore CA1031 // Do not catch general exception types | |||||
finally | |||||
{ | |||||
reader.AdvanceTo(consumed, examined); | |||||
} | |||||
} | |||||
reader.Complete(); | |||||
} | |||||
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) | |||||
{ | |||||
consumed = buffer.Start; | |||||
examined = buffer.End; | |||||
if (buffer.Length < 15) | |||||
{ | |||||
throw new ArgumentException("not JT1078 package"); | |||||
} | |||||
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer); | |||||
long totalConsumed = 0; | |||||
while (!seqReader.End) | |||||
{ | |||||
if ((seqReader.Length - seqReader.Consumed) < 15) | |||||
{ | |||||
throw new ArgumentException("not JT1078 package"); | |||||
} | |||||
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4); | |||||
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); | |||||
if (JT1078Package.FH == headerValue) | |||||
{ | |||||
//sim | |||||
var sim = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12); | |||||
//根据数据类型处理对应的数据长度 | |||||
seqReader.Advance(15); | |||||
if (seqReader.TryRead(out byte dataType)) | |||||
{ | |||||
JT1078Label3 label3 = new JT1078Label3(dataType); | |||||
int bodyLength = 0; | |||||
//透传的时候没有该字段 | |||||
if (label3.DataType != JT1078DataType.透传数据) | |||||
{ | |||||
//时间戳 | |||||
bodyLength += 8; | |||||
} | |||||
//非视频帧时没有该字段 | |||||
if (label3.DataType == JT1078DataType.视频I帧 || | |||||
label3.DataType == JT1078DataType.视频P帧 || | |||||
label3.DataType == JT1078DataType.视频B帧) | |||||
{ | |||||
//上一个关键帧 + 上一帧 = 2 + 2 | |||||
bodyLength += 4; | |||||
} | |||||
seqReader.Advance(bodyLength); | |||||
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed, 2).FirstSpan; | |||||
//数据体长度 | |||||
seqReader.Advance(2); | |||||
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); | |||||
//数据体 | |||||
seqReader.Advance(bodyLength); | |||||
if (string.IsNullOrEmpty(sim)) | |||||
{ | |||||
sim = session.SessionID; | |||||
} | |||||
SessionManager.TryLink(sim, session); | |||||
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed); | |||||
try | |||||
{ | |||||
if (jT1078UseType== JT1078UseType.Queue) | |||||
{ | |||||
jT1078MsgProducer.ProduceAsync(sim, package.ToArray()); | |||||
} | |||||
else | |||||
{ | |||||
jT1078PackageProducer.ProduceAsync(sim, JT1078Serializer.Deserialize(package.FirstSpan)); | |||||
} | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}"); | |||||
} | |||||
totalConsumed += (seqReader.Consumed - totalConsumed); | |||||
if (seqReader.End) break; | |||||
} | |||||
} | |||||
} | |||||
if (seqReader.Length == totalConsumed) | |||||
{ | |||||
examined = consumed = buffer.End; | |||||
} | |||||
else | |||||
{ | |||||
consumed = buffer.GetPosition(totalConsumed); | |||||
} | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
Logger.LogInformation("JT1078 Tcp Server Stop"); | |||||
if (server?.Connected ?? false) | |||||
server.Shutdown(SocketShutdown.Both); | |||||
server?.Close(); | |||||
return Task.CompletedTask; | |||||
} | |||||
string ReadBCD(ReadOnlySpan<byte> readOnlySpan, int len) | |||||
{ | |||||
int count = len / 2; | |||||
StringBuilder bcdSb = new StringBuilder(count); | |||||
for (int i = 0; i < count; i++) | |||||
{ | |||||
bcdSb.Append(readOnlySpan[i].ToString("X2")); | |||||
} | |||||
return bcdSb.ToString().TrimStart('0'); | |||||
} | |||||
} | |||||
} |
@@ -1,49 +0,0 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Metadata | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// <see cref="Grpc.Core.Internal"/> | |||||
/// </summary> | |||||
internal class JT1078AtomicCounter | |||||
{ | |||||
long counter = 0; | |||||
public JT1078AtomicCounter(long initialCount = 0) | |||||
{ | |||||
this.counter = initialCount; | |||||
} | |||||
public void Reset() | |||||
{ | |||||
Interlocked.Exchange(ref counter, 0); | |||||
} | |||||
public long Increment() | |||||
{ | |||||
return Interlocked.Increment(ref counter); | |||||
} | |||||
public long Add(long len) | |||||
{ | |||||
return Interlocked.Add(ref counter,len); | |||||
} | |||||
public long Decrement() | |||||
{ | |||||
return Interlocked.Decrement(ref counter); | |||||
} | |||||
public long Count | |||||
{ | |||||
get | |||||
{ | |||||
return Interlocked.Read(ref counter); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -1,29 +0,0 @@ | |||||
using DotNetty.Transport.Channels; | |||||
using System; | |||||
namespace JT1078.Gateway.Metadata | |||||
{ | |||||
public class JT1078TcpSession | |||||
{ | |||||
public JT1078TcpSession(IChannel channel, string terminalPhoneNo) | |||||
{ | |||||
Channel = channel; | |||||
TerminalPhoneNo = terminalPhoneNo; | |||||
StartTime = DateTime.Now; | |||||
LastActiveTime = DateTime.Now; | |||||
} | |||||
public JT1078TcpSession() { } | |||||
/// <summary> | |||||
/// 终端手机号 | |||||
/// </summary> | |||||
public string TerminalPhoneNo { get; set; } | |||||
public IChannel Channel { get; set; } | |||||
public DateTime LastActiveTime { get; set; } | |||||
public DateTime StartTime { get; set; } | |||||
} | |||||
} |
@@ -1,35 +0,0 @@ | |||||
using DotNetty.Transport.Channels; | |||||
using System; | |||||
using System.Net; | |||||
namespace JT1078.Gateway.Metadata | |||||
{ | |||||
public class JT1078UdpSession | |||||
{ | |||||
public JT1078UdpSession(IChannel channel, | |||||
EndPoint sender, | |||||
string terminalPhoneNo) | |||||
{ | |||||
Channel = channel; | |||||
TerminalPhoneNo = terminalPhoneNo; | |||||
StartTime = DateTime.Now; | |||||
LastActiveTime = DateTime.Now; | |||||
Sender = sender; | |||||
} | |||||
public EndPoint Sender { get; set; } | |||||
public JT1078UdpSession() { } | |||||
/// <summary> | |||||
/// 终端手机号 | |||||
/// </summary> | |||||
public string TerminalPhoneNo { get; set; } | |||||
public IChannel Channel { get; set; } | |||||
public DateTime LastActiveTime { get; set; } | |||||
public DateTime StartTime { get; set; } | |||||
} | |||||
} |
@@ -1,52 +0,0 @@ | |||||
using JT1078.Gateway.Metadata; | |||||
namespace JT1078.Gateway.Session.Services | |||||
{ | |||||
/// <summary> | |||||
/// 计数包服务 | |||||
/// </summary> | |||||
public class JT1078AtomicCounterService | |||||
{ | |||||
private readonly JT1078AtomicCounter MsgSuccessCounter; | |||||
private readonly JT1078AtomicCounter MsgFailCounter; | |||||
public JT1078AtomicCounterService() | |||||
{ | |||||
MsgSuccessCounter=new JT1078AtomicCounter(); | |||||
MsgFailCounter = new JT1078AtomicCounter(); | |||||
} | |||||
public void Reset() | |||||
{ | |||||
MsgSuccessCounter.Reset(); | |||||
MsgFailCounter.Reset(); | |||||
} | |||||
public long MsgSuccessIncrement() | |||||
{ | |||||
return MsgSuccessCounter.Increment(); | |||||
} | |||||
public long MsgSuccessCount | |||||
{ | |||||
get | |||||
{ | |||||
return MsgSuccessCounter.Count; | |||||
} | |||||
} | |||||
public long MsgFailIncrement() | |||||
{ | |||||
return MsgFailCounter.Increment(); | |||||
} | |||||
public long MsgFailCount | |||||
{ | |||||
get | |||||
{ | |||||
return MsgFailCounter.Count; | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -1,30 +0,0 @@ | |||||
using JT1078.Gateway.Enums; | |||||
using System; | |||||
using System.Collections.Concurrent; | |||||
namespace JT1078.Gateway.Session.Services | |||||
{ | |||||
public class JT1078AtomicCounterServiceFactory | |||||
{ | |||||
private readonly ConcurrentDictionary<JT1078TransportProtocolType, JT1078AtomicCounterService> cache; | |||||
public JT1078AtomicCounterServiceFactory() | |||||
{ | |||||
cache = new ConcurrentDictionary<JT1078TransportProtocolType, JT1078AtomicCounterService>(); | |||||
} | |||||
public JT1078AtomicCounterService Create(JT1078TransportProtocolType type) | |||||
{ | |||||
if(cache.TryGetValue(type,out var service)) | |||||
{ | |||||
return service; | |||||
} | |||||
else | |||||
{ | |||||
var serviceNew = new JT1078AtomicCounterService(); | |||||
cache.TryAdd(type, serviceNew); | |||||
return serviceNew; | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,246 @@ | |||||
using JT1078.Gateway.Abstractions; | |||||
using JT1078.Gateway.Abstractions.Enums; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Concurrent; | |||||
using System.Collections.Generic; | |||||
using System.Linq; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Threading.Tasks; | |||||
namespace JT1078.Gateway.Sessions | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// <remark>不支持变态类型:既发TCP和UDP</remark> | |||||
/// </summary> | |||||
public class JT1078SessionManager | |||||
{ | |||||
private readonly ILogger logger; | |||||
public ConcurrentDictionary<string, IJT1078Session> Sessions { get; } | |||||
public ConcurrentDictionary<string, IJT1078Session> TerminalPhoneNoSessions { get; } | |||||
public JT1078SessionManager(ILoggerFactory loggerFactory) | |||||
{ | |||||
Sessions = new ConcurrentDictionary<string, IJT1078Session>(StringComparer.OrdinalIgnoreCase); | |||||
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT1078Session>(StringComparer.OrdinalIgnoreCase); | |||||
logger = loggerFactory.CreateLogger("JT1078SessionManager"); | |||||
} | |||||
public int TotalSessionCount | |||||
{ | |||||
get | |||||
{ | |||||
return Sessions.Count; | |||||
} | |||||
} | |||||
public int TcpSessionCount | |||||
{ | |||||
get | |||||
{ | |||||
return Sessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.tcp).Count(); | |||||
} | |||||
} | |||||
public int UdpSessionCount | |||||
{ | |||||
get | |||||
{ | |||||
return Sessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.udp).Count(); | |||||
} | |||||
} | |||||
internal void TryLink(string terminalPhoneNo, IJT1078Session session) | |||||
{ | |||||
DateTime curretDatetime= DateTime.Now; | |||||
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out IJT1078Session cacheSession)) | |||||
{ | |||||
if (session.SessionID != cacheSession.SessionID) | |||||
{ | |||||
//从转发到直连的数据需要更新缓存 | |||||
session.ActiveTime = curretDatetime; | |||||
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession); | |||||
//会话通知 | |||||
//todo: 会话通知 | |||||
} | |||||
else | |||||
{ | |||||
cacheSession.ActiveTime = curretDatetime; | |||||
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, cacheSession, cacheSession); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
session.TerminalPhoneNo = terminalPhoneNo; | |||||
if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session)) | |||||
{ | |||||
//会话通知 | |||||
//todo: 会话通知 | |||||
} | |||||
} | |||||
} | |||||
internal void TryLink(IJT1078Session session) | |||||
{ | |||||
DateTime curretDatetime = DateTime.Now; | |||||
if (TerminalPhoneNoSessions.TryGetValue(session.SessionID, out IJT1078Session cacheSession)) | |||||
{ | |||||
if (session.SessionID != cacheSession.SessionID) | |||||
{ | |||||
//从转发到直连的数据需要更新缓存 | |||||
session.ActiveTime = curretDatetime; | |||||
TerminalPhoneNoSessions.TryUpdate(session.SessionID, session, cacheSession); | |||||
//会话通知 | |||||
//todo: 会话通知 | |||||
} | |||||
else | |||||
{ | |||||
cacheSession.ActiveTime = curretDatetime; | |||||
TerminalPhoneNoSessions.TryUpdate(session.SessionID, cacheSession, cacheSession); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
session.TerminalPhoneNo = session.SessionID; | |||||
if (TerminalPhoneNoSessions.TryAdd(session.SessionID, session)) | |||||
{ | |||||
//会话通知 | |||||
//todo: 会话通知 | |||||
} | |||||
} | |||||
} | |||||
public IJT1078Session TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint) | |||||
{ | |||||
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out IJT1078Session currentSession)) | |||||
{ | |||||
currentSession.ActiveTime = DateTime.Now; | |||||
currentSession.TerminalPhoneNo = terminalPhoneNo; | |||||
currentSession.RemoteEndPoint = remoteEndPoint; | |||||
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, currentSession, currentSession); | |||||
} | |||||
else | |||||
{ | |||||
JT1078UdpSession session = new JT1078UdpSession(socket, remoteEndPoint); | |||||
session.TerminalPhoneNo = terminalPhoneNo; | |||||
Sessions.TryAdd(session.SessionID, session); | |||||
TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session); | |||||
currentSession = session; | |||||
} | |||||
//会话通知 | |||||
//todo: 会话通知 | |||||
return currentSession; | |||||
} | |||||
internal bool TryAdd(IJT1078Session session) | |||||
{ | |||||
return Sessions.TryAdd(session.SessionID, session); | |||||
} | |||||
public async ValueTask<bool> TrySendByTerminalPhoneNoAsync(string terminalPhoneNo, byte[] data) | |||||
{ | |||||
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var session)) | |||||
{ | |||||
if (session.TransportProtocolType == JT1078TransportProtocolType.tcp) | |||||
{ | |||||
await session.Client.SendAsync(data, SocketFlags.None); | |||||
} | |||||
else | |||||
{ | |||||
await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); | |||||
} | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
return false; | |||||
} | |||||
} | |||||
public async ValueTask<bool> TrySendBySessionIdAsync(string sessionId, byte[] data) | |||||
{ | |||||
if (Sessions.TryGetValue(sessionId, out var session)) | |||||
{ | |||||
if(session.TransportProtocolType== JT1078TransportProtocolType.tcp) | |||||
{ | |||||
await session.Client.SendAsync(data, SocketFlags.None); | |||||
} | |||||
else | |||||
{ | |||||
await session.Client.SendToAsync(data, SocketFlags.None, session.RemoteEndPoint); | |||||
} | |||||
return true; | |||||
} | |||||
else | |||||
{ | |||||
return false; | |||||
} | |||||
} | |||||
public void RemoveByTerminalPhoneNo(string terminalPhoneNo) | |||||
{ | |||||
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeTerminalPhoneNoSessions)) | |||||
{ | |||||
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 | |||||
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 | |||||
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == removeTerminalPhoneNoSessions.SessionID).Select(s => s.Key).ToList(); | |||||
//2.存在则一个个移除 | |||||
string tmpTerminalPhoneNo = terminalPhoneNo; | |||||
if (terminalPhoneNos.Count > 0) | |||||
{ | |||||
//3.移除包括当前的设备号 | |||||
foreach (var item in terminalPhoneNos) | |||||
{ | |||||
TerminalPhoneNoSessions.TryRemove(item, out _); | |||||
} | |||||
tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); | |||||
} | |||||
if (Sessions.TryRemove(removeTerminalPhoneNoSessions.SessionID, out var removeSession)) | |||||
{ | |||||
removeSession.Close(); | |||||
if (logger.IsEnabled(LogLevel.Information)) | |||||
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); | |||||
//会话通知 | |||||
//todo: 会话通知 SessionOffline | |||||
} | |||||
} | |||||
} | |||||
public void RemoveBySessionId(string sessionId) | |||||
{ | |||||
if (Sessions.TryRemove(sessionId, out var removeSession)) | |||||
{ | |||||
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value.SessionID == sessionId).Select(s => s.Key).ToList(); | |||||
if (terminalPhoneNos.Count > 0) | |||||
{ | |||||
foreach (var item in terminalPhoneNos) | |||||
{ | |||||
TerminalPhoneNoSessions.TryRemove(item, out _); | |||||
} | |||||
var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); | |||||
//会话通知 | |||||
//todo: 会话通知 SessionOffline tmpTerminalPhoneNo | |||||
if (logger.IsEnabled(LogLevel.Information)) | |||||
logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}"); | |||||
} | |||||
removeSession.Close(); | |||||
} | |||||
} | |||||
public List<JT1078TcpSession> GetTcpAll() | |||||
{ | |||||
return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.tcp).Select(s => (JT1078TcpSession)s.Value).ToList(); | |||||
} | |||||
public List<JT1078UdpSession> GetUdpAll() | |||||
{ | |||||
return TerminalPhoneNoSessions.Where(w => w.Value.TransportProtocolType == JT1078TransportProtocolType.udp).Select(s => (JT1078UdpSession)s.Value).ToList(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,45 @@ | |||||
using JT1078.Gateway.Abstractions.Enums; | |||||
using JT1078.Gateway.Abstractions; | |||||
using System; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Sessions | |||||
{ | |||||
public class JT1078TcpSession : IJT1078Session | |||||
{ | |||||
public JT1078TcpSession(Socket client) | |||||
{ | |||||
Client = client; | |||||
RemoteEndPoint = client.RemoteEndPoint; | |||||
ActiveTime = DateTime.Now; | |||||
StartTime = DateTime.Now; | |||||
SessionID = Guid.NewGuid().ToString("N"); | |||||
ReceiveTimeout = new CancellationTokenSource(); | |||||
} | |||||
/// <summary> | |||||
/// 终端手机号 | |||||
/// </summary> | |||||
public string TerminalPhoneNo { get; set; } | |||||
public DateTime ActiveTime { get; set; } | |||||
public DateTime StartTime { get; set; } | |||||
public JT1078TransportProtocolType TransportProtocolType { get;} = JT1078TransportProtocolType.tcp; | |||||
public string SessionID { get; } | |||||
public Socket Client { get; set; } | |||||
public CancellationTokenSource ReceiveTimeout { get; set; } | |||||
public EndPoint RemoteEndPoint { get ; set; } | |||||
public void Close() | |||||
{ | |||||
try | |||||
{ | |||||
Client.Shutdown(SocketShutdown.Both); | |||||
} | |||||
catch { } | |||||
finally | |||||
{ | |||||
Client.Close(); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
using JT1078.Gateway.Abstractions.Enums; | |||||
using JT1078.Gateway.Abstractions; | |||||
using System; | |||||
using System.Net; | |||||
using System.Net.Sockets; | |||||
using System.Threading; | |||||
namespace JT1078.Gateway.Sessions | |||||
{ | |||||
public class JT1078UdpSession: IJT1078Session | |||||
{ | |||||
public JT1078UdpSession(Socket socket, EndPoint sender) | |||||
{ | |||||
ActiveTime = DateTime.Now; | |||||
StartTime = DateTime.Now; | |||||
SessionID = Guid.NewGuid().ToString("N"); | |||||
ReceiveTimeout = new CancellationTokenSource(); | |||||
RemoteEndPoint = sender; | |||||
Client = socket; | |||||
} | |||||
/// <summary> | |||||
/// 终端手机号 | |||||
/// </summary> | |||||
public string TerminalPhoneNo { get; set; } | |||||
public DateTime ActiveTime { get; set; } | |||||
public DateTime StartTime { get; set; } | |||||
public JT1078TransportProtocolType TransportProtocolType { get; set; } = JT1078TransportProtocolType.udp; | |||||
public string SessionID { get; } | |||||
public Socket Client { get; set; } | |||||
public CancellationTokenSource ReceiveTimeout { get; set; } | |||||
public EndPoint RemoteEndPoint { get; set ; } | |||||
public void Close() | |||||
{ | |||||
} | |||||
} | |||||
} |