Browse Source

用简单粗暴的方式实现pipeline

tags/v2.3.1
SmallChi(Koike) 5 years ago
parent
commit
15ddc833fa
49 changed files with 5625 additions and 3 deletions
  1. +4
    -3
      README.md
  2. +25
    -0
      src/.dockerignore
  3. +15
    -0
      src/JT808.Gateway.Abstractions/Enums/JT808TransportProtocolType.cs
  4. +14
    -0
      src/JT808.Gateway.Abstractions/IJT808ClientBuilder.cs
  5. +14
    -0
      src/JT808.Gateway.Abstractions/IJT808GatewayBuilder.cs
  6. +15
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgConsumer.cs
  7. +17
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs
  8. +15
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumer.cs
  9. +17
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgReplyProducer.cs
  10. +11
    -0
      src/JT808.Gateway.Abstractions/IJT808PubSub.cs
  11. +18
    -0
      src/JT808.Gateway.Abstractions/IJT808SessionConsumer.cs
  12. +13
    -0
      src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs
  13. +33
    -0
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  14. +11
    -0
      src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs
  15. +63
    -0
      src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto
  16. +28
    -0
      src/JT808.Gateway.Test/JT808.Gateway.Test.csproj
  17. +100
    -0
      src/JT808.Gateway.Test/PipeTest.cs
  18. +187
    -0
      src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs
  19. +3106
    -0
      src/JT808.Gateway.TestHosting/Configs/NLog.xsd
  20. +36
    -0
      src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config
  21. +36
    -0
      src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config
  22. +23
    -0
      src/JT808.Gateway.TestHosting/Dockerfile
  23. +35
    -0
      src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj
  24. +69
    -0
      src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs
  25. +46
    -0
      src/JT808.Gateway.TestHosting/Program.cs
  26. +4
    -0
      src/JT808.Gateway.TestHosting/startup.txt
  27. +43
    -0
      src/JT808.Gateway.sln
  28. +43
    -0
      src/JT808.Gateway/Configurations/JT808Configuration.cs
  29. +18
    -0
      src/JT808.Gateway/Enums/JT808MessageQueueType.cs
  30. +26
    -0
      src/JT808.Gateway/Interfaces/IJT808Session.cs
  31. +20
    -0
      src/JT808.Gateway/Internal/JT808GatewayBuilderDefault.cs
  32. +28
    -0
      src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs
  33. +201
    -0
      src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs
  34. +11
    -0
      src/JT808.Gateway/Internal/JT808MsgService.cs
  35. +31
    -0
      src/JT808.Gateway/JT808.Gateway.csproj
  36. +75
    -0
      src/JT808.Gateway/JT808GatewayExtensions.cs
  37. +54
    -0
      src/JT808.Gateway/JT808GrpcServer.cs
  38. +226
    -0
      src/JT808.Gateway/JT808TcpServer.cs
  39. +134
    -0
      src/JT808.Gateway/JT808UdpServer.cs
  40. +49
    -0
      src/JT808.Gateway/Metadata/JT808AtomicCounter.cs
  41. +52
    -0
      src/JT808.Gateway/Services/JT808AtomicCounterService.cs
  42. +30
    -0
      src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs
  43. +144
    -0
      src/JT808.Gateway/Services/JT808GatewayService.cs
  44. +52
    -0
      src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs
  45. +58
    -0
      src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs
  46. +63
    -0
      src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs
  47. +224
    -0
      src/JT808.Gateway/Session/JT808SessionManager.cs
  48. +47
    -0
      src/JT808.Gateway/Session/JT808TcpSession.cs
  49. +41
    -0
      src/JT808.Gateway/Session/JT808UdpSession.cs

+ 4
- 3
README.md View File

@@ -2,7 +2,7 @@


基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理 基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理


基于Pipeline封装的JT808DotNetty支持TCP/UDP通用消息业务处理
基于Pipeline封装的JT808Pipeline支持TCP/UDP通用消息业务处理


[了解JT808协议进这边](https://github.com/SmallChi/JT808) [了解JT808协议进这边](https://github.com/SmallChi/JT808)


@@ -81,6 +81,7 @@


| Package Name | Version | Downloads | | Package Name | Version | Downloads |
| --------------------- | -------------------------------------------------- | --------------------------------------------------- | | --------------------- | -------------------------------------------------- | --------------------------------------------------- |
| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) |
| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | | Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) |
| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | | Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) |


@@ -133,7 +134,7 @@ static async Task Main(string[] args)
``` ```


如图所示: 如图所示:
![demo1](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/demo1.png)
![demo1](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo1.png)


## 举个栗子2 ## 举个栗子2


@@ -144,4 +145,4 @@ static async Task Main(string[] args)
3.进入JT808.DotNetty.SimpleClient项目下的Debug目录运行客户端 3.进入JT808.DotNetty.SimpleClient项目下的Debug目录运行客户端


如图所示: 如图所示:
![demo2](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/demo2.png)
![demo2](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo2.png)

+ 25
- 0
src/.dockerignore View File

@@ -0,0 +1,25 @@
**/.classpath
**/.dockerignore
**/.env
**/.git
**/.gitignore
**/.project
**/.settings
**/.toolstarget
**/.vs
**/.vscode
**/*.*proj.user
**/*.dbmdl
**/*.jfm
**/azds.yaml
**/bin
**/charts
**/docker-compose*
**/Dockerfile*
**/node_modules
**/npm-debug.log
**/obj
**/secrets.dev.yaml
**/values.dev.yaml
LICENSE
README.md

+ 15
- 0
src/JT808.Gateway.Abstractions/Enums/JT808TransportProtocolType.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions.Enums
{
/// <summary>
/// 传输协议类型
/// </summary>
public enum JT808TransportProtocolType
{
tcp=1,
udp = 2
}
}

+ 14
- 0
src/JT808.Gateway.Abstractions/IJT808ClientBuilder.cs View File

@@ -0,0 +1,14 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions
{
public interface IJT808ClientBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 14
- 0
src/JT808.Gateway.Abstractions/IJT808GatewayBuilder.cs View File

@@ -0,0 +1,14 @@
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions
{
public interface IJT808GatewayBuilder
{
IJT808Builder JT808Builder { get; }
IJT808Builder Builder();
}
}

+ 15
- 0
src/JT808.Gateway.Abstractions/IJT808MsgConsumer.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string TerminalNo, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 17
- 0
src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgProducer : IJT808PubSub, IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param>
ValueTask ProduceAsync(string terminalNo, byte[] data);
}
}

+ 15
- 0
src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumer.cs View File

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgReplyConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string TerminalNo, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 17
- 0
src/JT808.Gateway.Abstractions/IJT808MsgReplyProducer.cs View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgReplyProducer : IJT808PubSub, IDisposable
{
/// <summary>
///
/// </summary>
/// <param name="terminalNo">设备终端号</param>
/// <param name="data">808 hex data</param>
ValueTask ProduceAsync(string terminalNo, byte[] data);
}
}

+ 11
- 0
src/JT808.Gateway.Abstractions/IJT808PubSub.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions
{
public interface IJT808PubSub
{
string TopicName { get; }
}
}

+ 18
- 0
src/JT808.Gateway.Abstractions/IJT808SessionConsumer.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Abstractions
{
/// <summary>
/// 会话通知(在线/离线)
/// </summary>
public interface IJT808SessionConsumer : IJT808PubSub, IDisposable
{
void OnMessage(Action<(string Notice, string TerminalNo)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
}
}

+ 13
- 0
src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs View File

@@ -0,0 +1,13 @@
using System;
using System.Threading.Tasks;

namespace JT808.Gateway.Abstractions
{
/// <summary>
/// 会话通知(在线/离线)
/// </summary>
public interface IJT808SessionProducer : IJT808PubSub, IDisposable
{
ValueTask ProduceAsync(string notice,string terminalNo);
}
}

+ 33
- 0
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj View File

@@ -0,0 +1,33 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2019.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Description>基于Pipeline实现的JT808Gateway的抽象库</Description>
<PackageReleaseNotes>基于Pipeline实现的JT808Gateway的抽象库</PackageReleaseNotes>
<PackageId>JT808.Gateway.Abstractions</PackageId>
<Product>JT808.Gateway.Abstractions</Product>
<Version>1.0.0-preview2</Version>
</PropertyGroup>
<ItemGroup>
<Protobuf Include="Protos\JT808Gateway.proto" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="Google.Protobuf" Version="3.11.2" />
<PackageReference Include="Grpc.Core" Version="2.25.0" />
<PackageReference Include="Grpc.Tools" Version="2.25.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="JT808" Version="2.2.2" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>
</Project>

+ 11
- 0
src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs View File

@@ -0,0 +1,11 @@
namespace JT808.Gateway.Abstractions
{
public static class JT808GatewayConstants
{
public const string SessionOnline= "JT808SessionOnline";
public const string SessionOffline = "JT808SessionOffline";
public const string SessionTopic = "jt808session";
public const string MsgTopic = "jt808msgdefault";
public const string MsgReplyTopic = "jt808msgreplydefault";
}
}

+ 63
- 0
src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto View File

@@ -0,0 +1,63 @@
syntax = "proto3";

option csharp_namespace = "JT808.Gateway.GrpcService";

package JT808GatewayGrpc;

service JT808Gateway{
// 会话服务-获取会话服务集合
rpc GetTcpSessionAll(Empty) returns (TcpSessionInfoReply);
// 会话服务-通过设备终端号移除对应会话
rpc RemoveSessionByTerminalPhoneNo(SessionRemoveRequest) returns (SessionRemoveReply);
// 统一下发信息
rpc UnificationSend(UnificationSendRequest) returns (UnificationSendReply);
// 获取Tcp包计数器
rpc GetTcpAtomicCounter(Empty) returns (TcpAtomicCounterReply);
// 会话服务-获取会话服务集合
rpc GetUdpSessionAll(Empty) returns (UdpSessionInfoReply);
// 获取Udp包计数器
rpc GetUdpAtomicCounter(Empty) returns (UdpAtomicCounterReply);
}

message Empty{}

message TcpSessionInfoReply{
repeated SessionInfo TcpSessions=1;
}
message UdpSessionInfoReply{
repeated SessionInfo UdpSessions=1;
}

message SessionInfo{
string StartTime=1;
string LastActiveTime=2;
string TerminalPhoneNo=3;
string RemoteAddressIP=4;
}

message SessionRemoveRequest{
string TerminalPhoneNo=1;
}

message SessionRemoveReply{
bool Success = 1;
}

message UnificationSendRequest{
string TerminalPhoneNo=1;
bytes Data=2;
}

message UnificationSendReply{
bool Success = 1;
}

message TcpAtomicCounterReply{
int64 MsgSuccessCount=1;
int64 MsgFailCount=2;
}

message UdpAtomicCounterReply{
int64 MsgSuccessCount=1;
int64 MsgFailCount=2;
}

+ 28
- 0
src/JT808.Gateway.Test/JT808.Gateway.Test.csproj View File

@@ -0,0 +1,28 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp3.1</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.1.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="1.1.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>

</Project>

+ 100
- 0
src/JT808.Gateway.Test/PipeTest.cs View File

@@ -0,0 +1,100 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Threading;
using System.Threading.Tasks;
using Xunit;

namespace JT808.Gateway.Test
{
public class PipeTest
{
[Fact]
public void Test1()
{
var reader = new ReadOnlySequence<byte>(new byte[] { 0x7E, 0, 1, 2, 0x7E});
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader);
int index = 0;
byte mark = 0;
long totalConsumed = 0;
List<byte[]> packages = new List<byte[]>();
while (!seqReader.End)
{
if (seqReader.IsNext(0x7E, advancePast: true))
{
if (mark == 1)
{
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
packages.Add(package);
totalConsumed += (seqReader.Consumed - totalConsumed);
index++;
if (seqReader.End) break;
seqReader.Advance(1);
mark = 0;
}
mark++;
}
else
{
seqReader.Advance(1);
}
index++;
}
Assert.Equal(5, index);
Assert.Single(packages);
Assert.Equal(5, seqReader.Consumed);
}

[Fact]
public void Test2()
{
var reader = new ReadOnlySequence<byte>(new byte[] { 0x7E, 0, 1, 2, 0x7E, 0x7E, 0, 1, 0x7E, 0x7E, 2, 2, 2 });
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader);
int index = 0;
byte mark = 0;
long totalConsumed = 0;
List<byte[]> packages = new List<byte[]>();
while (!seqReader.End)
{
if (seqReader.IsNext(0x7E, advancePast: true))
{
if (mark == 1)
{
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray();
packages.Add(package);
totalConsumed += (seqReader.Consumed - totalConsumed);
index++;
if (seqReader.End) break;
seqReader.Advance(1);
mark = 0;
}
mark++;
}
else
{
seqReader.Advance(1);
}
index++;
}
Assert.Equal(13, index);
Assert.Equal(2,packages.Count);
Assert.Equal(9, totalConsumed);
Assert.Equal(13, seqReader.Consumed);
}

[Fact]
public void Test3()
{
Assert.Throws<Exception>(() =>
{
var reader = new ReadOnlySequence<byte>(new byte[] { 0, 1, 2, 0x7E });
SequenceReader<byte> seqReader = new SequenceReader<byte>(reader);
if (seqReader.TryPeek(out byte beginMark))
{
if (beginMark != 0x7E) throw new ArgumentException("not 808 packages");
}
});
}
}
}

+ 187
- 0
src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs View File

@@ -0,0 +1,187 @@
using JT808.Gateway.Session;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;
using Microsoft.Extensions.Logging;
using System.Net.Sockets;

namespace JT808.Gateway.Test.Session
{
public class JT808SessionManagerTest
{
[Fact]
public void TryAddTest()
{
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var result=jT808SessionManager.TryAdd(new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)));
Assert.True(result);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
}

[Fact]
public void TryLinkTest()
{
string tno = "123456";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
jT808SessionManager.TryLink(tno, session);
Assert.True(result1);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno));
}

/// <summary>
/// 用于转发过来的车辆
/// </summary>
[Fact]
public void TryLinkTest1_1_N()
{
string tno1 = "123456";
string tno2 = "123457";
string tno3 = "123458";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
jT808SessionManager.TryLink(tno1, session);
jT808SessionManager.TryLink(tno2, session);
jT808SessionManager.TryLink(tno3, session);
Assert.True(result1);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
Assert.Equal(3,jT808SessionManager.TerminalPhoneNoSessions.Count);
jT808SessionManager.RemoveBySessionId(session.SessionID);
Assert.Equal(0, jT808SessionManager.TotalSessionCount);
Assert.Empty(jT808SessionManager.TerminalPhoneNoSessions);
}

/// <summary>
/// 用于转发过来的车辆
/// </summary>
[Fact]
public void TryLinkTest2_1_N()
{
string tno1 = "123456";
string tno2 = "123457";
string tno3 = "123458";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
jT808SessionManager.TryLink(tno1, session);
jT808SessionManager.TryLink(tno2, session);
jT808SessionManager.TryLink(tno3, session);
Assert.True(result1);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count);
jT808SessionManager.RemoveByTerminalPhoneNo(tno1);
Assert.Equal(0, jT808SessionManager.TotalSessionCount);
Assert.Empty(jT808SessionManager.TerminalPhoneNoSessions);
}

/// <summary>
/// 转发过来的车辆切换为直连车辆
/// </summary>
[Fact]
public void UpdateLinkTest2_1_N()
{
string tno1 = "123456";
string tno2 = "123457";
string tno3 = "123458";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session1 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var session2 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session1);
var result2 = jT808SessionManager.TryAdd(session2);
//转发车辆
jT808SessionManager.TryLink(tno1, session1);
jT808SessionManager.TryLink(tno2, session1);
//直连车辆
jT808SessionManager.TryLink(tno3, session2);

Assert.True(result1);
Assert.True(result2);
Assert.Equal(2, jT808SessionManager.TotalSessionCount);
Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count);

//将tno2切换为直连车辆
var session3 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result3 = jT808SessionManager.TryAdd(session3);
jT808SessionManager.TryLink(tno2, session3);
Assert.True(result3);
if (jT808SessionManager.TerminalPhoneNoSessions.TryGetValue(tno2,out string sessionid))
{
//实际的通道Id
Assert.Equal(session3.SessionID, sessionid);
}
Assert.Equal(3, jT808SessionManager.TotalSessionCount);
Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count);

jT808SessionManager.RemoveByTerminalPhoneNo(tno1);
Assert.Equal(2, jT808SessionManager.TotalSessionCount);
Assert.Equal(2,jT808SessionManager.TerminalPhoneNoSessions.Count);
}

[Fact]
public void RemoveBySessionIdTest()
{
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
Assert.True(result1);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
jT808SessionManager.RemoveBySessionId(session.SessionID);
Assert.Equal(0, jT808SessionManager.TotalSessionCount);
}

[Fact]
public void RemoveByTerminalPhoneNoTest()
{
string tno = "123456";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
jT808SessionManager.TryLink(tno, session);
Assert.True(result1);
Assert.Equal(1, jT808SessionManager.TotalSessionCount);
jT808SessionManager.RemoveByTerminalPhoneNo(tno);
Assert.False(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno));
Assert.Equal(0, jT808SessionManager.TotalSessionCount);
}

[Fact]
public void SendTest()
{
Assert.Throws<SocketException>(() =>
{
string tno = "123456";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session);
jT808SessionManager.TryLink(tno, session);
jT808SessionManager.TrySendByTerminalPhoneNo(tno, new byte[] { 0x7e, 0, 0, 0x7e });
});
}

[Fact]
public void GetTcpAllTest()
{
string tno1 = "123456";
string tno2 = "123457";
JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
var session1 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var session2 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp));
var result1 = jT808SessionManager.TryAdd(session1);
var result2 = jT808SessionManager.TryAdd(session2);
jT808SessionManager.TryLink(tno1, session1);
jT808SessionManager.TryLink(tno2, session2);
Assert.True(result1);
Assert.True(result2);
Assert.Equal(2, jT808SessionManager.TotalSessionCount);
Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno1));
Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno2));
var sessions = jT808SessionManager.GetTcpAll();
Assert.Equal(session1.SessionID, sessions[0].SessionID);
Assert.Equal(session2.SessionID, sessions[1].SessionID);
}
}
}

+ 3106
- 0
src/JT808.Gateway.TestHosting/Configs/NLog.xsd
File diff suppressed because it is too large
View File


+ 36
- 0
src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html
autoReload:自动再配置
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。
<nlog throwExceptions="true" />
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。
-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="/data/serviceslogs/JT808.Gateway/internalLog.txt"
internalLogLevel="Debug" >
<variable name="Directory" value="/data/serviceslogs/JT808.Gateway"/>
<targets>
<target name="Gateway" xsi:type="File"
fileName="${Directory}/Gateway.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="Gateway"/>
</rules>
</nlog>

+ 36
- 0
src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config View File

@@ -0,0 +1,36 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html
autoReload:自动再配置
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。
<nlog throwExceptions="true" />
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。
-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="wwwroot/logs/JT808.Gateway/internalLog.txt"
internalLogLevel="Debug" >
<variable name="Directory" value="/data/logs/JT808.Gateway"/>
<targets>
<target name="Gateway" xsi:type="File"
fileName="${Directory}/Gateway.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="Gateway,console"/>
</rules>
</nlog>

+ 23
- 0
src/JT808.Gateway.TestHosting/Dockerfile View File

@@ -0,0 +1,23 @@
#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging.

FROM mcr.microsoft.com/dotnet/core/runtime:3.1-buster-slim AS base
EXPOSE 808/tcp
WORKDIR /app

FROM mcr.microsoft.com/dotnet/core/sdk:3.1-buster AS build
WORKDIR /src
COPY ["JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj", "JT808.Gateway.TestHosting/"]
COPY ["JT808.Gateway/JT808.Gateway.csproj", "JT808.Gateway/"]
COPY ["JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj", "JT808.Gateway.Abstractions/"]
RUN dotnet restore "JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj"
COPY . .
WORKDIR "/src/JT808.Gateway.TestHosting"
RUN dotnet build "JT808.Gateway.TestHosting.csproj" -c Release -o /app/build

FROM build AS publish
RUN dotnet publish "JT808.Gateway.TestHosting.csproj" -c Release -o /app/publish

FROM base AS final
WORKDIR /app
COPY --from=publish /app/publish .
ENTRYPOINT ["dotnet", "JT808.Gateway.TestHosting.dll"]

+ 35
- 0
src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj View File

@@ -0,0 +1,35 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
<DockerDefaultTargetOS>Linux</DockerDefaultTargetOS>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.0" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.9.10" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\nlog.Unix.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\nlog.Win32NT.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\NLog.xsd">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>

+ 69
- 0
src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs View File

@@ -0,0 +1,69 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using JT808.Gateway.Configurations;
using JT808.Gateway.GrpcService;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;

namespace JT808.Gateway.TestHosting.Jobs
{
public class CallGrpcClientJob :IHostedService
{
private Channel channel;
private readonly ILogger Logger;
private Grpc.Core.Metadata AuthMetadata;
public CallGrpcClientJob(
ILoggerFactory loggerFactory,
JT808Configuration configuration)
{
Logger = loggerFactory.CreateLogger("CallGrpcClientJob");
channel = new Channel($"{configuration.WebApiHost}:{configuration.WebApiPort}",
ChannelCredentials.Insecure);
AuthMetadata = new Grpc.Core.Metadata();
AuthMetadata.Add("token", configuration.WebApiToken);
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
JT808Gateway.JT808GatewayClient jT808GatewayClient = new JT808Gateway.JT808GatewayClient(channel);
try
{
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty(), AuthMetadata);
var result2 = jT808GatewayClient.GetTcpSessionAll(new Empty(), AuthMetadata);
Logger.LogInformation($"[GetTcpAtomicCounter]:{JsonSerializer.Serialize(result1)}");
Logger.LogInformation($"[GetTcpSessionAll]:{JsonSerializer.Serialize(result2)}");
}
catch (Exception ex)
{
Logger.LogError(ex, "Call Grpc Error");
}
try
{
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty());
}
catch (RpcException ex)
{
Logger.LogError($"{ex.StatusCode.ToString()}-{ex.Message}");
}
Thread.Sleep(3000);
}
}, cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
channel.ShutdownAsync();
return Task.CompletedTask;
}
}
}

+ 46
- 0
src/JT808.Gateway.TestHosting/Program.cs View File

@@ -0,0 +1,46 @@
using System;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using NLog.Extensions.Logging;
using JT808.Gateway.TestHosting.Jobs;

namespace JT808.Gateway.TestHosting
{
class Program
{
static async Task Main(string[] args)
{
var serverHostBuilder = new HostBuilder()
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory)
.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
.AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
})
.ConfigureLogging((context, logging) =>
{
Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}");
NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config");
logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true });
logging.SetMinimumLevel(LogLevel.Trace);
})
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808Gateway()
.AddTcp()
.AddUdp()
.AddGrpc();
//services.AddHostedService<CallGrpcClientJob>();
});

await serverHostBuilder.RunConsoleAsync();
}
}
}

+ 4
- 0
src/JT808.Gateway.TestHosting/startup.txt View File

@@ -0,0 +1,4 @@
pm2 start "dotnet JT808.DotNetty.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.DotNetty.CleintBenchmark" -o "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/error.log"


pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log"

+ 43
- 0
src/JT808.Gateway.sln View File

@@ -0,0 +1,43 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29409.12
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway", "JT808.Gateway\JT808.Gateway.csproj", "{4C8A2546-8333-416D-B123-91062B630087}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{AE40AFE0-0950-442C-A74C-10CDF53E9F36}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808.Gateway.Test\JT808.Gateway.Test.csproj", "{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{4C8A2546-8333-416D-B123-91062B630087}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{4C8A2546-8333-416D-B123-91062B630087}.Debug|Any CPU.Build.0 = Debug|Any CPU
{4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.ActiveCfg = Release|Any CPU
{4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.Build.0 = Release|Any CPU
{AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.Build.0 = Debug|Any CPU
{AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.ActiveCfg = Release|Any CPU
{AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.Build.0 = Release|Any CPU
{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Debug|Any CPU.Build.0 = Debug|Any CPU
{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Release|Any CPU.ActiveCfg = Release|Any CPU
{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Release|Any CPU.Build.0 = Release|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {AA9303A7-6FB3-4572-88AA-3302E85330D1}
EndGlobalSection
EndGlobal

+ 43
- 0
src/JT808.Gateway/Configurations/JT808Configuration.cs View File

@@ -0,0 +1,43 @@
using JT808.Gateway.Enums;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configurations
{
public class JT808Configuration
{
public int TcpPort { get; set; } = 808;
public int UdpPort { get; set; } = 808;
public int WebApiPort { get; set; } = 828;
public string WebApiHost{ get; set; } = "localhost";
/// <summary>
/// WebApi 默认token 123456
/// </summary>
public string WebApiToken { get; set; } = "123456";
public int SoBacklog { get; set; } = 8192;
public int MiniNumBufferSize { get; set; } = 8096;
/// <summary>
/// Tcp读超时
/// 默认10分钟检查一次
/// </summary>
public int TcpReaderIdleTimeSeconds { get; set; } = 60*10;
/// <summary>
/// Tcp 60s检查一次
/// </summary>
public int TcpReceiveTimeoutCheckTimeSeconds { get; set; } = 60;
/// <summary>
/// Udp读超时
/// </summary>
public int UdpReaderIdleTimeSeconds { get; set; } = 60;
/// <summary>
/// Udp 60s检查一次
/// </summary>
public int UdpReceiveTimeoutCheckTimeSeconds { get; set; } = 60;
/// <summary>
/// 队列类型
/// 默认内存队列
/// </summary>
public JT808MessageQueueType MessageQueueType { get; set; } = JT808MessageQueueType.InMemory;
}
}

+ 18
- 0
src/JT808.Gateway/Enums/JT808MessageQueueType.cs View File

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Enums
{
public enum JT808MessageQueueType:byte
{
/// <summary>
/// 使用内存队列
/// </summary>
InMemory=1,
/// <summary>
/// 使用第三方队列
/// </summary>
InPlug=2
}
}

+ 26
- 0
src/JT808.Gateway/Interfaces/IJT808Session.cs View File

@@ -0,0 +1,26 @@
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Interfaces
{
public interface IJT808Session
{
/// <summary>
/// 终端手机号
/// </summary>
string TerminalPhoneNo { get; set; }
string SessionID { get; }
Socket Client { get; set; }
DateTime StartTime { get; set; }
DateTime ActiveTime { get; set; }
JT808TransportProtocolType TransportProtocolType { get;}
CancellationTokenSource ReceiveTimeout { get; set; }
EndPoint RemoteEndPoint { get; set; }
void Close();
}
}

+ 20
- 0
src/JT808.Gateway/Internal/JT808GatewayBuilderDefault.cs View File

@@ -0,0 +1,20 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;

namespace JT808.Gateway.Internal
{
public class JT808GatewayBuilderDefault : IJT808GatewayBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808GatewayBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 28
- 0
src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs View File

@@ -0,0 +1,28 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Internal;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
internal class JT808MsgProducerDefault : IJT808MsgProducer
{
private readonly JT808MsgService JT808MsgService;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgProducerDefault(JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
}
public void Dispose()
{
}
public ValueTask ProduceAsync(string terminalNo, byte[] data)
{
JT808MsgService.MsgQueue.Add((terminalNo, data));
return default;
}
}
}

+ 201
- 0
src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs View File

@@ -0,0 +1,201 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
internal class JT808MsgReplyConsumerDefault : IJT808MsgReplyConsumer
{
private readonly JT808MsgService JT808MsgService;

private readonly JT808Serializer JT808Serializer;

private delegate byte[] MethodDelegate(JT808HeaderPackage headerPackage);

private Dictionary<ushort, MethodDelegate> HandlerDict;
public JT808MsgReplyConsumerDefault(
IJT808Config jT808Config,
JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
this.JT808Serializer = jT808Config.GetSerializer();
HandlerDict = new Dictionary<ushort, MethodDelegate> {
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001},
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102},
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002},
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003},
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100},
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 },
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 },
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 }
};
}
public CancellationTokenSource Cts =>new CancellationTokenSource();

public string TopicName => JT808GatewayConstants.MsgReplyTopic;

public void Dispose()
{
Cts.Dispose();
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
foreach(var item in JT808MsgService.MsgQueue.GetConsumingEnumerable())
{
try
{
var package = JT808Serializer.HeaderDeserialize(item.Data);
if (HandlerDict.TryGetValue(package.Header.MsgId, out var func))
{
var buffer = func(package);
if (buffer != null)
{
callback((item.TerminalNo, buffer));
}
}
}
catch
{

}
}
}, Cts.Token);
}

public void Subscribe()
{
}

public void Unsubscribe()
{
Cts.Cancel();
}

/// <summary>
/// 终端通用应答
/// 平台无需回复
/// 实现自己的业务
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0001(JT808HeaderPackage request)
{
return null;
}

/// <summary>
/// 终端心跳
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0002(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 终端注销
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0003(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 终端注册
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0100(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8100()
{
Code = "J" + request.Header.TerminalPhoneNo,
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功,
AckMsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 终端鉴权
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0102(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 位置信息汇报
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0200(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 定位数据批量上传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0704(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 数据上行透传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0900(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
}
}

+ 11
- 0
src/JT808.Gateway/Internal/JT808MsgService.cs View File

@@ -0,0 +1,11 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Internal
{
internal class JT808MsgService
{
public System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)> MsgQueue { get; set; } = new System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)>();
}
}

+ 31
- 0
src/JT808.Gateway/JT808.Gateway.csproj View File

@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2019.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Description>基于Pipeline实现的JT808Gateway的网络库</Description>
<PackageReleaseNotes>基于Pipeline实现的JT808Gateway的网络库</PackageReleaseNotes>
<PackageId>JT808.Gateway</PackageId>
<Product>JT808.Gateway</Product>
<Version>1.0.0-preview2</Version>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>
</Project>

+ 75
- 0
src/JT808.Gateway/JT808GatewayExtensions.cs View File

@@ -0,0 +1,75 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Configurations;
using JT808.Gateway.Internal;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("JT808.Gateway.TestHosting")]
[assembly: InternalsVisibleTo("JT808.Gateway.Test")]
namespace JT808.Gateway
{
public static partial class JT808GatewayExtensions
{
public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
server.JT808Builder.Services.TryAddSingleton<JT808Configuration>();
server.AddJT808Core();
return server;
}

public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder,Action<JT808Configuration> config)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
server.JT808Builder.Services.Configure(config);
server.AddJT808Core();
return server;
}

public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder, IConfiguration configuration)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
server.JT808Builder.Services.Configure<JT808Configuration>(configuration.GetSection("JT808Configuration"));
server.AddJT808Core();
return server;
}

public static IJT808GatewayBuilder AddTcp(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddHostedService<JT808TcpServer>();
config.JT808Builder.Services.AddHostedService<JT808TcpReceiveTimeoutHostedService>();
return config;
}

public static IJT808GatewayBuilder AddUdp(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddHostedService<JT808UdpServer>();
config.JT808Builder.Services.AddHostedService<JT808UdpReceiveTimeoutHostedService>();
return config;
}

public static IJT808GatewayBuilder AddGrpc(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.AddHostedService<JT808GrpcServer>();
return config;
}

private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.TryAddSingleton<JT808Configuration>();
config.JT808Builder.Services.TryAddSingleton<JT808AtomicCounterServiceFactory>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgProducer, JT808MsgProducerDefault>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumerDefault>();
config.JT808Builder.Services.TryAddSingleton<JT808SessionManager>();
config.JT808Builder.Services.TryAddSingleton<JT808MsgService>();
config.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
return config;
}
}
}

+ 54
- 0
src/JT808.Gateway/JT808GrpcServer.cs View File

@@ -0,0 +1,54 @@
using System;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using JT808.Gateway.Configurations;
using JT808.Gateway.GrpcService;
using JT808.Gateway.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace JT808.Gateway
{
public class JT808GrpcServer : IHostedService
{
private readonly ILogger Logger;
private readonly JT808Configuration Configuration;
private readonly IServiceProvider ServiceProvider;
private Server server;
public JT808GrpcServer(
IServiceProvider serviceProvider,
JT808Configuration jT808Configuration,
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger("JT808GrpcServer");
Configuration = jT808Configuration;
ServiceProvider = serviceProvider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
server = new Server
{
Services = { JT808Gateway.BindService(new JT808GatewayService(ServiceProvider)) },
Ports = { new ServerPort(Configuration.WebApiHost, Configuration.WebApiPort, ServerCredentials.Insecure) }
};
Logger.LogInformation($"JT808 Grpc Server start at {Configuration.WebApiHost}:{Configuration.WebApiPort}.");
try
{
server.Start();
}
catch (Exception ex)
{
Logger.LogError(ex, "JT808 Grpc Server start error");
}
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("JT808 Grpc Server Stop");
server.ShutdownAsync();
return Task.CompletedTask;
}
}
}

+ 226
- 0
src/JT808.Gateway/JT808TcpServer.cs View File

@@ -0,0 +1,226 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
using JT808.Protocol.Exceptions;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace JT808.Gateway
{
public class JT808TcpServer:IHostedService
{
private Socket server;

private const byte beginAndEndMark = 0x7e;

private readonly ILogger Logger;

private readonly JT808SessionManager SessionManager;

private readonly IJT808MsgProducer MsgProducer;

private readonly JT808Serializer Serializer;

private readonly JT808AtomicCounterService AtomicCounterService;

private readonly JT808Configuration Configuration;

public JT808TcpServer(
JT808Configuration jT808Configuration,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager,
IJT808MsgProducer jT808MsgProducer,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808TcpServer");
Serializer = jT808Config.GetSerializer();
MsgProducer = jT808MsgProducer;
AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp);
Configuration = jT808Configuration;
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.TcpPort);
server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
server.LingerState = new LingerOption(false, 0);
server.Bind(IPEndPoint);
server.Listen(jT808Configuration.SoBacklog);
}

public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{Configuration.TcpPort}.");
Task.Run(async() => {
while (!cancellationToken.IsCancellationRequested)
{
var socket = await server.AcceptAsync();
JT808TcpSession jT808TcpSession = new JT808TcpSession(socket);
await Task.Factory.StartNew(async (state) =>
{
var session = (JT808TcpSession)state;
SessionManager.TryAdd(session);
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}");
}
var pipe = new Pipe();
Task writing = FillPipeAsync(session, pipe.Writer);
Task reading = ReadPipeAsync(session, pipe.Reader);
await Task.WhenAll(reading, writing);
SessionManager.RemoveBySessionId(session.SessionID);
}, jT808TcpSession);
}
}, cancellationToken);
return Task.CompletedTask;
}
private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer)
{
while (true)
{
try
{
Memory<byte> memory = writer.GetMemory(Configuration.MiniNumBufferSize);
//设备多久没发数据就断开连接 Receive Timeout.
int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token);
if (bytesRead == 0)
{
break;
}
writer.Advance(bytesRead);
}
catch(OperationCanceledException)
{
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}");
break;
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}");
break;
}
FlushResult result = await writer.FlushAsync();
if (result.IsCompleted)
{
break;
}
}
writer.Complete();
}
private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader)
{
while (true)
{
ReadResult result = await reader.ReadAsync();
if (result.IsCompleted)
{
break;
}
ReadOnlySequence<byte> buffer = result.Buffer;
SequencePosition consumed = buffer.Start;
SequencePosition examined = buffer.End;
try
{
if (result.IsCanceled) break;
if (buffer.Length > 0)
{
ReaderBuffer(ref buffer, session,out consumed, out examined);
}
}
catch (Exception ex)
{
SessionManager.RemoveBySessionId(session.SessionID);
break;
}
finally
{
reader.AdvanceTo(consumed, examined);
}
}
reader.Complete();
}
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined)
{
consumed = buffer.Start;
examined = buffer.End;
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
if (seqReader.TryPeek(out byte beginMark))
{
if (beginMark != beginAndEndMark) throw new ArgumentException("Not JT808 Packages.");
}
byte mark = 0;
long totalConsumed = 0;
while (!seqReader.End)
{
if (seqReader.IsNext(beginAndEndMark, advancePast: true))
{
if (mark == 1)
{
try
{
var package = Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan);
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
//设直连模式和转发模式的会话如何处理
SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
if(Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
MsgProducer.ProduceAsync(session.SessionID, package.OriginalData.ToArray());
}
else
{
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
}
catch (JT808Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex,$"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}");
}
totalConsumed += (seqReader.Consumed - totalConsumed);
if (seqReader.End) break;
seqReader.Advance(1);
mark = 0;
}
mark++;
}
else
{
seqReader.Advance(1);
}
}
if (seqReader.Length== totalConsumed)
{
examined = consumed = buffer.End;
}
else
{
consumed = buffer.GetPosition(totalConsumed);
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("808 Tcp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();
return Task.CompletedTask;
}
}
}

+ 134
- 0
src/JT808.Gateway/JT808UdpServer.cs View File

@@ -0,0 +1,134 @@
using System;
using System.Buffers;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
using JT808.Protocol.Exceptions;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

namespace JT808.Gateway
{
public class JT808UdpServer : IHostedService
{
private Socket server;

private readonly ILogger Logger;

private readonly JT808SessionManager SessionManager;

private readonly IJT808MsgProducer MsgProducer;

private readonly JT808Serializer Serializer;

private readonly JT808AtomicCounterService AtomicCounterService;

private readonly JT808Configuration Configuration;

private IPEndPoint LocalIPEndPoint;

public JT808UdpServer(
JT808Configuration jT808Configuration,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager,
IJT808MsgProducer jT808MsgProducer,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808UdpServer");
Serializer = jT808Config.GetSerializer();
MsgProducer = jT808MsgProducer;
AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp);
Configuration = jT808Configuration;
LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.UdpPort);
server = new Socket(LocalIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
server.Bind(LocalIPEndPoint);
}

public Task StartAsync(CancellationToken cancellationToken)
{
Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{Configuration.UdpPort}.");
Task.Run(async() => {
while (!cancellationToken.IsCancellationRequested)
{
var buffer = ArrayPool<byte>.Shared.Rent(Configuration.MiniNumBufferSize);
try
{
var segment = new ArraySegment<byte>(buffer);
SocketReceiveMessageFromResult result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, LocalIPEndPoint);
ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result);
}
catch(AggregateException ex)
{
Logger.LogError(ex, "Receive MessageFrom Async");
}
catch(Exception ex)
{
Logger.LogError(ex, $"Received Bytes");
}
finally
{
ArrayPool<byte>.Shared.Return(buffer);
}
}
}, cancellationToken);
return Task.CompletedTask;
}
private void ReaderBuffer(ReadOnlySpan<byte> buffer, Socket socket,SocketReceiveMessageFromResult receiveMessageFromResult)
{
try
{
var package = Serializer.HeaderDeserialize(buffer);
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
//设直连模式和转发模式的会话如何处理
string sessionId= SessionManager.TryLink(package.Header.TerminalPhoneNo, socket, receiveMessageFromResult.RemoteEndPoint);
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
}
if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
MsgProducer.ProduceAsync(sessionId, package.OriginalData.ToArray());
}
else
{
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
}
catch (JT808Exception ex)
{
AtomicCounterService.MsgFailIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex, $"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}-{buffer.ToArray().ToHexString()}");
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}");
Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}");
}
}
public Task StopAsync(CancellationToken cancellationToken)
{
Logger.LogInformation("808 Udp Server Stop");
if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both);
server?.Close();
return Task.CompletedTask;
}
}
}

+ 49
- 0
src/JT808.Gateway/Metadata/JT808AtomicCounter.cs View File

@@ -0,0 +1,49 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Metadata
{
/// <summary>
///
/// <see cref="Grpc.Core.Internal"/>
/// </summary>
internal class JT808AtomicCounter
{
long counter = 0;

public JT808AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public void Reset()
{
Interlocked.Exchange(ref counter, 0);
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Add(long len)
{
return Interlocked.Add(ref counter,len);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 52
- 0
src/JT808.Gateway/Services/JT808AtomicCounterService.cs View File

@@ -0,0 +1,52 @@
using JT808.Gateway.Metadata;

namespace JT808.Gateway.Services
{
/// <summary>
/// 计数包服务
/// </summary>
public class JT808AtomicCounterService
{
private readonly JT808AtomicCounter MsgSuccessCounter;

private readonly JT808AtomicCounter MsgFailCounter;

public JT808AtomicCounterService()
{
MsgSuccessCounter=new JT808AtomicCounter();
MsgFailCounter = new JT808AtomicCounter();
}

public void Reset()
{
MsgSuccessCounter.Reset();
MsgFailCounter.Reset();
}

public long MsgSuccessIncrement()
{
return MsgSuccessCounter.Increment();
}

public long MsgSuccessCount
{
get
{
return MsgSuccessCounter.Count;
}
}

public long MsgFailIncrement()
{
return MsgFailCounter.Increment();
}

public long MsgFailCount
{
get
{
return MsgFailCounter.Count;
}
}
}
}

+ 30
- 0
src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs View File

@@ -0,0 +1,30 @@
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Concurrent;

namespace JT808.Gateway.Services
{
public class JT808AtomicCounterServiceFactory
{
private readonly ConcurrentDictionary<JT808TransportProtocolType, JT808AtomicCounterService> cache;

public JT808AtomicCounterServiceFactory()
{
cache = new ConcurrentDictionary<JT808TransportProtocolType, JT808AtomicCounterService>();
}

public JT808AtomicCounterService Create(JT808TransportProtocolType type)
{
if(cache.TryGetValue(type,out var service))
{
return service;
}
else
{
var serviceNew = new JT808AtomicCounterService();
cache.TryAdd(type, serviceNew);
return serviceNew;
}
}
}
}

+ 144
- 0
src/JT808.Gateway/Services/JT808GatewayService.cs View File

@@ -0,0 +1,144 @@
using System;
using System.Linq;
using JT808.Gateway.GrpcService;
using Grpc.Core;
using System.Threading.Tasks;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Session;
using Microsoft.Extensions.DependencyInjection;
using static Grpc.Core.Metadata;
using Microsoft.Extensions.Options;
using JT808.Gateway.Configurations;

namespace JT808.Gateway.Services
{
public class JT808GatewayService: JT808Gateway.JT808GatewayBase
{
private readonly JT808AtomicCounterService jT808TcpAtomicCounterService;

private readonly JT808AtomicCounterService jT808UdpAtomicCounterService;

private readonly JT808SessionManager jT808SessionManager;

private readonly IOptionsMonitor<JT808Configuration> ConfigurationOptionsMonitor;

public JT808GatewayService(
IOptionsMonitor<JT808Configuration> configurationOptionsMonitor,
JT808SessionManager jT808SessionManager,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory
)
{
this.jT808SessionManager = jT808SessionManager;
this.ConfigurationOptionsMonitor = configurationOptionsMonitor;
this.jT808TcpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp);
this.jT808UdpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp);
}

public JT808GatewayService(IServiceProvider serviceProvider)
{
this.jT808SessionManager = serviceProvider.GetRequiredService<JT808SessionManager>();
this.jT808TcpAtomicCounterService = serviceProvider.GetRequiredService<JT808AtomicCounterServiceFactory>().Create(JT808TransportProtocolType.tcp);
this.jT808UdpAtomicCounterService = serviceProvider.GetRequiredService<JT808AtomicCounterServiceFactory>().Create(JT808TransportProtocolType.udp);
this.ConfigurationOptionsMonitor = serviceProvider.GetRequiredService<IOptionsMonitor<JT808Configuration>>();
}

public override Task<TcpSessionInfoReply> GetTcpSessionAll(Empty request, ServerCallContext context)
{
Auth(context);
var result = jT808SessionManager.GetTcpAll();
TcpSessionInfoReply reply = new TcpSessionInfoReply();
foreach (var item in result)
{
reply.TcpSessions.Add(new SessionInfo
{
LastActiveTime = item.ActiveTime.ToString("yyyy-MM-dd HH:mm:ss"),
StartTime = item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"),
RemoteAddressIP = item.RemoteEndPoint.ToString(),
TerminalPhoneNo = item.TerminalPhoneNo
});
}
return Task.FromResult(reply);
}

public override Task<SessionRemoveReply> RemoveSessionByTerminalPhoneNo(SessionRemoveRequest request, ServerCallContext context)
{
Auth(context);
try
{
jT808SessionManager.RemoveByTerminalPhoneNo(request.TerminalPhoneNo);
return Task.FromResult(new SessionRemoveReply { Success = true });
}
catch (Exception)
{
return Task.FromResult(new SessionRemoveReply { Success = false });
}
}

public override Task<UdpSessionInfoReply> GetUdpSessionAll(Empty request, ServerCallContext context)
{
Auth(context);
var result = jT808SessionManager.GetUdpAll();
UdpSessionInfoReply reply = new UdpSessionInfoReply();
foreach (var item in result)
{
reply.UdpSessions.Add(new SessionInfo
{
LastActiveTime = item.ActiveTime.ToString("yyyy-MM-dd HH:mm:ss"),
StartTime = item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"),
RemoteAddressIP = item.RemoteEndPoint.ToString(),
TerminalPhoneNo = item.TerminalPhoneNo
});
}
return Task.FromResult(reply);
}

public override Task<UnificationSendReply> UnificationSend(UnificationSendRequest request, ServerCallContext context)
{
Auth(context);
try
{
var flag = jT808SessionManager.TrySendByTerminalPhoneNo(request.TerminalPhoneNo, request.Data.ToByteArray());
return Task.FromResult(new UnificationSendReply { Success = flag });
}
catch (Exception)
{
return Task.FromResult(new UnificationSendReply { Success = false });
}
}

public override Task<TcpAtomicCounterReply> GetTcpAtomicCounter(Empty request, ServerCallContext context)
{
Auth(context);
TcpAtomicCounterReply reply = new TcpAtomicCounterReply();
reply.MsgFailCount=jT808TcpAtomicCounterService.MsgFailCount;
reply.MsgSuccessCount=jT808TcpAtomicCounterService.MsgSuccessCount;
return Task.FromResult(reply);
}

public override Task<UdpAtomicCounterReply> GetUdpAtomicCounter(Empty request, ServerCallContext context)
{
Auth(context);
UdpAtomicCounterReply reply = new UdpAtomicCounterReply();
reply.MsgFailCount = jT808UdpAtomicCounterService.MsgFailCount;
reply.MsgSuccessCount = jT808UdpAtomicCounterService.MsgSuccessCount;
return Task.FromResult(reply);
}

private void Auth(ServerCallContext context)
{
Entry tokenEntry = context.RequestHeaders.FirstOrDefault(w => w.Key == "token");
if (tokenEntry != null)
{
if(tokenEntry.Value != ConfigurationOptionsMonitor.CurrentValue.WebApiToken)
{
throw new Grpc.Core.RpcException(new Status(StatusCode.Unauthenticated, "token error"));
}
}
else
{
throw new Grpc.Core.RpcException(new Status(StatusCode.Unauthenticated,"token empty"));
}
}
}
}

+ 52
- 0
src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs View File

@@ -0,0 +1,52 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
internal class JT808MsgReplyHostedService : IHostedService
{
private readonly JT808SessionManager JT808SessionManager;

private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer;
private readonly JT808Configuration Configuration;
public JT808MsgReplyHostedService(
JT808Configuration jT808Configuration,
IJT808MsgReplyConsumer jT808MsgReplyConsumer,
JT808SessionManager jT808SessionManager)
{
JT808MsgReplyConsumer = jT808MsgReplyConsumer;
JT808SessionManager = jT808SessionManager;
Configuration = jT808Configuration;
}

public Task StartAsync(CancellationToken cancellationToken)
{
if(Configuration.MessageQueueType== JT808MessageQueueType.InMemory)
{
JT808MsgReplyConsumer.OnMessage(item =>
{
JT808SessionManager.TrySendBySessionId(item.TerminalNo, item.Data);
});
JT808MsgReplyConsumer.Subscribe();
}
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
JT808MsgReplyConsumer.Unsubscribe();
}
return Task.CompletedTask;
}
}
}

+ 58
- 0
src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs View File

@@ -0,0 +1,58 @@
using JT808.Gateway.Configurations;
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
internal class JT808TcpReceiveTimeoutHostedService : BackgroundService
{
private readonly ILogger Logger;

private readonly JT808SessionManager SessionManager;

private readonly JT808Configuration Configuration;
public JT808TcpReceiveTimeoutHostedService(
JT808Configuration jT808Configuration,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager
)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808TcpReceiveTimeout");
Configuration = jT808Configuration;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
foreach (var item in SessionManager.GetTcpAll())
{
if (item.ActiveTime.AddSeconds(Configuration.TcpReaderIdleTimeSeconds) < DateTime.Now)
{
item.ReceiveTimeout.Cancel();
}
}
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.TcpSessionCount}");
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.TcpReceiveTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

+ 63
- 0
src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs View File

@@ -0,0 +1,63 @@
using JT808.Gateway.Configurations;
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Services
{
internal class JT808UdpReceiveTimeoutHostedService : BackgroundService
{
private readonly ILogger Logger;

private readonly JT808SessionManager SessionManager;

private readonly JT808Configuration Configuration;
public JT808UdpReceiveTimeoutHostedService(
JT808Configuration jT808Configuration,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager
)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808UdpReceiveTimeout");
Configuration = jT808Configuration;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
List<string> sessionIds = new List<string>();
foreach (var item in SessionManager.GetUdpAll())
{
if (item.ActiveTime.AddSeconds(Configuration.UdpReaderIdleTimeSeconds) < DateTime.Now)
{
sessionIds.Add(item.SessionID);
}
}
foreach(var item in sessionIds)
{
SessionManager.RemoveBySessionId(item);
}
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}");
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.UdpReceiveTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

+ 224
- 0
src/JT808.Gateway/Session/JT808SessionManager.cs View File

@@ -0,0 +1,224 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Interfaces;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Session
{
/// <summary>
///
/// <remark>不支持变态类型:既发TCP和UDP</remark>
/// </summary>
public class JT808SessionManager
{
private readonly ILogger logger;
private readonly IJT808SessionProducer JT808SessionProducer;
public ConcurrentDictionary<string, IJT808Session> Sessions { get; }
public ConcurrentDictionary<string, string> TerminalPhoneNoSessions { get; }
public JT808SessionManager(
IJT808SessionProducer jT808SessionProducer,
ILoggerFactory loggerFactory
)
{
JT808SessionProducer = jT808SessionProducer;
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager");
}

public JT808SessionManager(ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary<string, IJT808Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT808SessionManager");
}

public int TotalSessionCount
{
get
{
return Sessions.Count;
}
}

public int TcpSessionCount
{
get
{
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Count();
}
}

public int UdpSessionCount
{
get
{
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Count();
}
}

internal void TryLink(string terminalPhoneNo, IJT808Session session)
{
session.ActiveTime = DateTime.Now;
session.TerminalPhoneNo = terminalPhoneNo;
Sessions.TryUpdate(session.SessionID, session, session);
TerminalPhoneNoSessions.AddOrUpdate(terminalPhoneNo, session.SessionID, (key, oldValue)=>
{
if(session.SessionID!= oldValue)
{
//会话通知
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, key);
return session.SessionID;
}
return oldValue;
});
}

public string TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint)
{
string sessionId = string.Empty;
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out sessionId))
{
if (Sessions.TryGetValue(sessionId, out IJT808Session sessionInfo))
{
sessionInfo.ActiveTime = DateTime.Now;
sessionInfo.TerminalPhoneNo = terminalPhoneNo;
sessionInfo.RemoteEndPoint = remoteEndPoint;
Sessions.TryUpdate(sessionId, sessionInfo, sessionInfo);
}
}
else
{
JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint);
Sessions.TryAdd(session.SessionID, session);
TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session.SessionID);
sessionId = session.SessionID;
}
//会话通知
//使用场景:
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//有设备关联上来可以进行通知 例如:使用Redis发布订阅
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
return sessionId;
}

internal bool TryAdd(IJT808Session session)
{
return Sessions.TryAdd(session.SessionID, session);
}

public bool TrySendByTerminalPhoneNo(string terminalPhoneNo, byte[] data)
{
if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var sessionid))
{
if (Sessions.TryGetValue(sessionid, out var session))
{
if (session.TransportProtocolType == JT808TransportProtocolType.tcp)
{
session.Client.Send(data, SocketFlags.None);
}
else
{
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
}
return true;
}
else
{
return false;
}
}
else
{
return false;
}
}

public bool TrySendBySessionId(string sessionId, byte[] data)
{
if (Sessions.TryGetValue(sessionId, out var session))
{
if(session.TransportProtocolType== JT808TransportProtocolType.tcp)
{
session.Client.Send(data, SocketFlags.None);
}
else
{
session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint);
}
return true;
}
else
{
return false;
}
}

public void RemoveByTerminalPhoneNo(string terminalPhoneNo)
{
if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeSessionId))
{
// 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据
//1.用当前会话的通道Id找出通过转发过来的其他设备的终端号
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == removeSessionId).Select(s => s.Key).ToList();
//2.存在则一个个移除
string tmpTerminalPhoneNo = terminalPhoneNo;
if (terminalPhoneNos.Count > 0)
{
//3.移除包括当前的设备号
foreach (var item in terminalPhoneNos)
{
TerminalPhoneNoSessions.TryRemove(item, out _);
}
tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
}
if (Sessions.TryRemove(removeSessionId, out var removeSession))
{
removeSession.Close();
if(logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
}
}
}

public void RemoveBySessionId(string sessionId)
{
if(Sessions.TryRemove(sessionId,out var removeSession))
{
var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == sessionId).Select(s => s.Key).ToList();
if (terminalPhoneNos.Count > 0)
{
foreach (var item in terminalPhoneNos)
{
TerminalPhoneNoSessions.TryRemove(item, out _);
}
var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo);
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}");
}
removeSession.Close();
}
}

public List<JT808TcpSession> GetTcpAll()
{
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Select(s => (JT808TcpSession)s.Value).ToList();
}

public List<JT808UdpSession> GetUdpAll()
{
return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Select(s => (JT808UdpSession)s.Value).ToList();
}
}
}

+ 47
- 0
src/JT808.Gateway/Session/JT808TcpSession.cs View File

@@ -0,0 +1,47 @@
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Interfaces;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace JT808.Gateway.Session
{
public class JT808TcpSession: IJT808Session
{
public JT808TcpSession(Socket client)
{
Client = client;
RemoteEndPoint = client.RemoteEndPoint;
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionID = Guid.NewGuid().ToString("N");
ReceiveTimeout = new CancellationTokenSource();
}

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }
public DateTime ActiveTime { get; set; }
public DateTime StartTime { get; set; }
public JT808TransportProtocolType TransportProtocolType { get;} = JT808TransportProtocolType.tcp;
public string SessionID { get; }
public Socket Client { get; set; }
public CancellationTokenSource ReceiveTimeout { get; set; }
public EndPoint RemoteEndPoint { get ; set; }

public void Close()
{
try
{
Client.Shutdown(SocketShutdown.Both);
}
catch { }
finally
{
Client.Close();
}
}
}
}

+ 41
- 0
src/JT808.Gateway/Session/JT808UdpSession.cs View File

@@ -0,0 +1,41 @@
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Interfaces;
using System;
using System.Net;
using System.Net.Sockets;
using System.Threading;

namespace JT808.Gateway.Session
{
public class JT808UdpSession: IJT808Session
{
public JT808UdpSession(Socket socket, EndPoint sender)
{
ActiveTime = DateTime.Now;
StartTime = DateTime.Now;
SessionID = Guid.NewGuid().ToString("N");
ReceiveTimeout = new CancellationTokenSource();
RemoteEndPoint = sender;
Client = socket;
}

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }
public DateTime ActiveTime { get; set; }
public DateTime StartTime { get; set; }
public JT808TransportProtocolType TransportProtocolType { get; set; } = JT808TransportProtocolType.udp;

public string SessionID { get; }

public Socket Client { get; set; }
public CancellationTokenSource ReceiveTimeout { get; set; }
public EndPoint RemoteEndPoint { get; set ; }

public void Close()
{
}
}
}

Loading…
Cancel
Save