diff --git a/README.md b/README.md index 62abed7..e7a5fcb 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ 基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理 -基于Pipeline封装的JT808DotNetty支持TCP/UDP通用消息业务处理 +基于Pipeline封装的JT808Pipeline支持TCP/UDP通用消息业务处理 [了解JT808协议进这边](https://github.com/SmallChi/JT808) @@ -81,6 +81,7 @@ | Package Name | Version | Downloads | | --------------------- | -------------------------------------------------- | --------------------------------------------------- | +| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) | | Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) | | Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) | @@ -133,7 +134,7 @@ static async Task Main(string[] args) ``` 如图所示: -![demo1](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/demo1.png) +![demo1](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo1.png) ## 举个栗子2 @@ -144,4 +145,4 @@ static async Task Main(string[] args) 3.进入JT808.DotNetty.SimpleClient项目下的Debug目录运行客户端 如图所示: -![demo2](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/demo2.png) \ No newline at end of file +![demo2](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/dotnetty/demo2.png) diff --git a/src/.dockerignore b/src/.dockerignore new file mode 100644 index 0000000..3729ff0 --- /dev/null +++ b/src/.dockerignore @@ -0,0 +1,25 @@ +**/.classpath +**/.dockerignore +**/.env +**/.git +**/.gitignore +**/.project +**/.settings +**/.toolstarget +**/.vs +**/.vscode +**/*.*proj.user +**/*.dbmdl +**/*.jfm +**/azds.yaml +**/bin +**/charts +**/docker-compose* +**/Dockerfile* +**/node_modules +**/npm-debug.log +**/obj +**/secrets.dev.yaml +**/values.dev.yaml +LICENSE +README.md \ No newline at end of file diff --git a/src/JT808.Gateway.Abstractions/Enums/JT808TransportProtocolType.cs b/src/JT808.Gateway.Abstractions/Enums/JT808TransportProtocolType.cs new file mode 100644 index 0000000..4924be1 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/Enums/JT808TransportProtocolType.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions.Enums +{ + /// + /// 传输协议类型 + /// + public enum JT808TransportProtocolType + { + tcp=1, + udp = 2 + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808ClientBuilder.cs b/src/JT808.Gateway.Abstractions/IJT808ClientBuilder.cs new file mode 100644 index 0000000..9574094 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808ClientBuilder.cs @@ -0,0 +1,14 @@ +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808ClientBuilder + { + IJT808Builder JT808Builder { get; } + IJT808Builder Builder(); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808GatewayBuilder.cs b/src/JT808.Gateway.Abstractions/IJT808GatewayBuilder.cs new file mode 100644 index 0000000..72f6bbd --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808GatewayBuilder.cs @@ -0,0 +1,14 @@ +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808GatewayBuilder + { + IJT808Builder JT808Builder { get; } + IJT808Builder Builder(); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgConsumer.cs b/src/JT808.Gateway.Abstractions/IJT808MsgConsumer.cs new file mode 100644 index 0000000..1e38db3 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgConsumer.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgConsumer : IJT808PubSub, IDisposable + { + void OnMessage(Action<(string TerminalNo, byte[] Data)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs b/src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs new file mode 100644 index 0000000..1962f55 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgProducer.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgProducer : IJT808PubSub, IDisposable + { + /// + /// + /// + /// 设备终端号 + /// 808 hex data + ValueTask ProduceAsync(string terminalNo, byte[] data); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumer.cs b/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumer.cs new file mode 100644 index 0000000..3b5acb5 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumer.cs @@ -0,0 +1,15 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgReplyConsumer : IJT808PubSub, IDisposable + { + void OnMessage(Action<(string TerminalNo, byte[] Data)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808MsgReplyProducer.cs b/src/JT808.Gateway.Abstractions/IJT808MsgReplyProducer.cs new file mode 100644 index 0000000..39f0366 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808MsgReplyProducer.cs @@ -0,0 +1,17 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808MsgReplyProducer : IJT808PubSub, IDisposable + { + /// + /// + /// + /// 设备终端号 + /// 808 hex data + ValueTask ProduceAsync(string terminalNo, byte[] data); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808PubSub.cs b/src/JT808.Gateway.Abstractions/IJT808PubSub.cs new file mode 100644 index 0000000..7b1a327 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808PubSub.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Abstractions +{ + public interface IJT808PubSub + { + string TopicName { get; } + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808SessionConsumer.cs b/src/JT808.Gateway.Abstractions/IJT808SessionConsumer.cs new file mode 100644 index 0000000..78f405f --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808SessionConsumer.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Abstractions +{ + /// + /// 会话通知(在线/离线) + /// + public interface IJT808SessionConsumer : IJT808PubSub, IDisposable + { + void OnMessage(Action<(string Notice, string TerminalNo)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs b/src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs new file mode 100644 index 0000000..cecae48 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/IJT808SessionProducer.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading.Tasks; + +namespace JT808.Gateway.Abstractions +{ + /// + /// 会话通知(在线/离线) + /// + public interface IJT808SessionProducer : IJT808PubSub, IDisposable + { + ValueTask ProduceAsync(string notice,string terminalNo); + } +} diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj new file mode 100644 index 0000000..de3eb76 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj @@ -0,0 +1,33 @@ + + + netstandard2.1 + 8.0 + Copyright 2019. + SmallChi(Koike) + false + false + LICENSE + true + 基于Pipeline实现的JT808Gateway的抽象库 + 基于Pipeline实现的JT808Gateway的抽象库 + JT808.Gateway.Abstractions + JT808.Gateway.Abstractions + 1.0.0-preview2 + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs b/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs new file mode 100644 index 0000000..6c631de --- /dev/null +++ b/src/JT808.Gateway.Abstractions/JT808GatewayConstants.cs @@ -0,0 +1,11 @@ +namespace JT808.Gateway.Abstractions +{ + public static class JT808GatewayConstants + { + public const string SessionOnline= "JT808SessionOnline"; + public const string SessionOffline = "JT808SessionOffline"; + public const string SessionTopic = "jt808session"; + public const string MsgTopic = "jt808msgdefault"; + public const string MsgReplyTopic = "jt808msgreplydefault"; + } +} diff --git a/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto b/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto new file mode 100644 index 0000000..042fc89 --- /dev/null +++ b/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto @@ -0,0 +1,63 @@ +syntax = "proto3"; + +option csharp_namespace = "JT808.Gateway.GrpcService"; + +package JT808GatewayGrpc; + +service JT808Gateway{ + // 会话服务-获取会话服务集合 + rpc GetTcpSessionAll(Empty) returns (TcpSessionInfoReply); + // 会话服务-通过设备终端号移除对应会话 + rpc RemoveSessionByTerminalPhoneNo(SessionRemoveRequest) returns (SessionRemoveReply); + // 统一下发信息 + rpc UnificationSend(UnificationSendRequest) returns (UnificationSendReply); + // 获取Tcp包计数器 + rpc GetTcpAtomicCounter(Empty) returns (TcpAtomicCounterReply); + // 会话服务-获取会话服务集合 + rpc GetUdpSessionAll(Empty) returns (UdpSessionInfoReply); + // 获取Udp包计数器 + rpc GetUdpAtomicCounter(Empty) returns (UdpAtomicCounterReply); +} + +message Empty{} + +message TcpSessionInfoReply{ + repeated SessionInfo TcpSessions=1; +} +message UdpSessionInfoReply{ + repeated SessionInfo UdpSessions=1; +} + +message SessionInfo{ + string StartTime=1; + string LastActiveTime=2; + string TerminalPhoneNo=3; + string RemoteAddressIP=4; +} + +message SessionRemoveRequest{ + string TerminalPhoneNo=1; +} + +message SessionRemoveReply{ + bool Success = 1; +} + +message UnificationSendRequest{ + string TerminalPhoneNo=1; + bytes Data=2; +} + +message UnificationSendReply{ + bool Success = 1; +} + +message TcpAtomicCounterReply{ + int64 MsgSuccessCount=1; + int64 MsgFailCount=2; +} + +message UdpAtomicCounterReply{ + int64 MsgSuccessCount=1; + int64 MsgFailCount=2; +} \ No newline at end of file diff --git a/src/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Test/JT808.Gateway.Test.csproj new file mode 100644 index 0000000..e07a52b --- /dev/null +++ b/src/JT808.Gateway.Test/JT808.Gateway.Test.csproj @@ -0,0 +1,28 @@ + + + + netcoreapp3.1 + + false + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + diff --git a/src/JT808.Gateway.Test/PipeTest.cs b/src/JT808.Gateway.Test/PipeTest.cs new file mode 100644 index 0000000..470de6d --- /dev/null +++ b/src/JT808.Gateway.Test/PipeTest.cs @@ -0,0 +1,100 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Threading; +using System.Threading.Tasks; +using Xunit; + +namespace JT808.Gateway.Test +{ + public class PipeTest + { + [Fact] + public void Test1() + { + var reader = new ReadOnlySequence(new byte[] { 0x7E, 0, 1, 2, 0x7E}); + SequenceReader seqReader = new SequenceReader(reader); + int index = 0; + byte mark = 0; + long totalConsumed = 0; + List packages = new List(); + while (!seqReader.End) + { + if (seqReader.IsNext(0x7E, advancePast: true)) + { + if (mark == 1) + { + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + index++; + if (seqReader.End) break; + seqReader.Advance(1); + mark = 0; + } + mark++; + } + else + { + seqReader.Advance(1); + } + index++; + } + Assert.Equal(5, index); + Assert.Single(packages); + Assert.Equal(5, seqReader.Consumed); + } + + [Fact] + public void Test2() + { + var reader = new ReadOnlySequence(new byte[] { 0x7E, 0, 1, 2, 0x7E, 0x7E, 0, 1, 0x7E, 0x7E, 2, 2, 2 }); + SequenceReader seqReader = new SequenceReader(reader); + int index = 0; + byte mark = 0; + long totalConsumed = 0; + List packages = new List(); + while (!seqReader.End) + { + if (seqReader.IsNext(0x7E, advancePast: true)) + { + if (mark == 1) + { + var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).ToArray(); + packages.Add(package); + totalConsumed += (seqReader.Consumed - totalConsumed); + index++; + if (seqReader.End) break; + seqReader.Advance(1); + mark = 0; + } + mark++; + } + else + { + seqReader.Advance(1); + } + index++; + } + Assert.Equal(13, index); + Assert.Equal(2,packages.Count); + Assert.Equal(9, totalConsumed); + Assert.Equal(13, seqReader.Consumed); + } + + [Fact] + public void Test3() + { + Assert.Throws(() => + { + var reader = new ReadOnlySequence(new byte[] { 0, 1, 2, 0x7E }); + SequenceReader seqReader = new SequenceReader(reader); + if (seqReader.TryPeek(out byte beginMark)) + { + if (beginMark != 0x7E) throw new ArgumentException("not 808 packages"); + } + }); + } + } +} diff --git a/src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs b/src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs new file mode 100644 index 0000000..6c2eabd --- /dev/null +++ b/src/JT808.Gateway.Test/Session/JT808SessionManagerTest.cs @@ -0,0 +1,187 @@ +using JT808.Gateway.Session; +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; +using Microsoft.Extensions.Logging; +using System.Net.Sockets; + +namespace JT808.Gateway.Test.Session +{ + public class JT808SessionManagerTest + { + [Fact] + public void TryAddTest() + { + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var result=jT808SessionManager.TryAdd(new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp))); + Assert.True(result); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + } + + [Fact] + public void TryLinkTest() + { + string tno = "123456"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + jT808SessionManager.TryLink(tno, session); + Assert.True(result1); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno)); + } + + /// + /// 用于转发过来的车辆 + /// + [Fact] + public void TryLinkTest1_1_N() + { + string tno1 = "123456"; + string tno2 = "123457"; + string tno3 = "123458"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + jT808SessionManager.TryLink(tno1, session); + jT808SessionManager.TryLink(tno2, session); + jT808SessionManager.TryLink(tno3, session); + Assert.True(result1); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + Assert.Equal(3,jT808SessionManager.TerminalPhoneNoSessions.Count); + jT808SessionManager.RemoveBySessionId(session.SessionID); + Assert.Equal(0, jT808SessionManager.TotalSessionCount); + Assert.Empty(jT808SessionManager.TerminalPhoneNoSessions); + } + + /// + /// 用于转发过来的车辆 + /// + [Fact] + public void TryLinkTest2_1_N() + { + string tno1 = "123456"; + string tno2 = "123457"; + string tno3 = "123458"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + jT808SessionManager.TryLink(tno1, session); + jT808SessionManager.TryLink(tno2, session); + jT808SessionManager.TryLink(tno3, session); + Assert.True(result1); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count); + jT808SessionManager.RemoveByTerminalPhoneNo(tno1); + Assert.Equal(0, jT808SessionManager.TotalSessionCount); + Assert.Empty(jT808SessionManager.TerminalPhoneNoSessions); + } + + /// + /// 转发过来的车辆切换为直连车辆 + /// + [Fact] + public void UpdateLinkTest2_1_N() + { + string tno1 = "123456"; + string tno2 = "123457"; + string tno3 = "123458"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session1 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var session2 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session1); + var result2 = jT808SessionManager.TryAdd(session2); + //转发车辆 + jT808SessionManager.TryLink(tno1, session1); + jT808SessionManager.TryLink(tno2, session1); + //直连车辆 + jT808SessionManager.TryLink(tno3, session2); + + Assert.True(result1); + Assert.True(result2); + Assert.Equal(2, jT808SessionManager.TotalSessionCount); + Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count); + + //将tno2切换为直连车辆 + var session3 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result3 = jT808SessionManager.TryAdd(session3); + jT808SessionManager.TryLink(tno2, session3); + Assert.True(result3); + if (jT808SessionManager.TerminalPhoneNoSessions.TryGetValue(tno2,out string sessionid)) + { + //实际的通道Id + Assert.Equal(session3.SessionID, sessionid); + } + Assert.Equal(3, jT808SessionManager.TotalSessionCount); + Assert.Equal(3, jT808SessionManager.TerminalPhoneNoSessions.Count); + + jT808SessionManager.RemoveByTerminalPhoneNo(tno1); + Assert.Equal(2, jT808SessionManager.TotalSessionCount); + Assert.Equal(2,jT808SessionManager.TerminalPhoneNoSessions.Count); + } + + [Fact] + public void RemoveBySessionIdTest() + { + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + Assert.True(result1); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + jT808SessionManager.RemoveBySessionId(session.SessionID); + Assert.Equal(0, jT808SessionManager.TotalSessionCount); + } + + [Fact] + public void RemoveByTerminalPhoneNoTest() + { + string tno = "123456"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + jT808SessionManager.TryLink(tno, session); + Assert.True(result1); + Assert.Equal(1, jT808SessionManager.TotalSessionCount); + jT808SessionManager.RemoveByTerminalPhoneNo(tno); + Assert.False(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno)); + Assert.Equal(0, jT808SessionManager.TotalSessionCount); + } + + [Fact] + public void SendTest() + { + Assert.Throws(() => + { + string tno = "123456"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session); + jT808SessionManager.TryLink(tno, session); + jT808SessionManager.TrySendByTerminalPhoneNo(tno, new byte[] { 0x7e, 0, 0, 0x7e }); + }); + } + + [Fact] + public void GetTcpAllTest() + { + string tno1 = "123456"; + string tno2 = "123457"; + JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + var session1 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var session2 = new JT808TcpSession(new Socket(SocketType.Stream, ProtocolType.Tcp)); + var result1 = jT808SessionManager.TryAdd(session1); + var result2 = jT808SessionManager.TryAdd(session2); + jT808SessionManager.TryLink(tno1, session1); + jT808SessionManager.TryLink(tno2, session2); + Assert.True(result1); + Assert.True(result2); + Assert.Equal(2, jT808SessionManager.TotalSessionCount); + Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno1)); + Assert.True(jT808SessionManager.TerminalPhoneNoSessions.ContainsKey(tno2)); + var sessions = jT808SessionManager.GetTcpAll(); + Assert.Equal(session1.SessionID, sessions[0].SessionID); + Assert.Equal(session2.SessionID, sessions[1].SessionID); + } + } +} diff --git a/src/JT808.Gateway.TestHosting/Configs/NLog.xsd b/src/JT808.Gateway.TestHosting/Configs/NLog.xsd new file mode 100644 index 0000000..2f57d09 --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Configs/NLog.xsd @@ -0,0 +1,3106 @@ + + + + + + + + + + + + + + + Watch config file for changes and reload automatically. + + + + + Print internal NLog messages to the console. Default value is: false + + + + + Print internal NLog messages to the console error output. Default value is: false + + + + + Write internal NLog messages to the specified file. + + + + + Log level threshold for internal log messages. Default value is: Info. + + + + + Global log level threshold for application log messages. Messages below this level won't be logged.. + + + + + Throw an exception when there is an internal error. Default value is: false. + + + + + Throw an exception when there is a configuration error. If not set, determined by throwExceptions. + + + + + Gets or sets a value indicating whether Variables should be kept on configuration reload. Default value is: false. + + + + + Write internal NLog messages to the System.Diagnostics.Trace. Default value is: false. + + + + + Write timestamps for internal NLog messages. Default value is: true. + + + + + Use InvariantCulture as default culture instead of CurrentCulture. Default value is: false. + + + + + Perform mesage template parsing and formatting of LogEvent messages (true = Always, false = Never, empty = Auto Detect). Default value is: empty. + + + + + + + + + + + + + + Make all targets within this section asynchronous (creates additional threads but the calling thread isn't blocked by any target writes). + + + + + + + + + + + + + + + + + Prefix for targets/layout renderers/filters/conditions loaded from this assembly. + + + + + Load NLog extensions from the specified file (*.dll) + + + + + Load NLog extensions from the specified assembly. Assembly name should be fully qualified. + + + + + + + + + + Name of the logger. May include '*' character which acts like a wildcard. Allowed forms are: *, Name, *Name, Name* and *Name* + + + + + Comma separated list of levels that this rule matches. + + + + + Minimum level that this rule matches. + + + + + Maximum level that this rule matches. + + + + + Level that this rule matches. + + + + + Comma separated list of target names. + + + + + Ignore further rules if this one matches. + + + + + Enable or disable logging rule. Disabled rules are ignored. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the file to be included. You could use * wildcard. The name is relative to the name of the current config file. + + + + + Ignore any errors in the include file. + + + + + + + Variable name. + + + + + Variable value. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Number of log events that should be processed in a batch by the lazy writer thread. + + + + + Limit of full s to write before yielding into Performance is better when writing many small batches, than writing a single large batch + + + + + Action to be taken when the lazy writer thread request queue count exceeds the set limit. + + + + + Limit on the number of requests in the lazy writer thread request queue. + + + + + Time in milliseconds to sleep between batches. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Delay the flush until the LogEvent has been confirmed as written + + + + + Condition expression. Log events who meet this condition will cause a flush on the wrapped target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Number of log events to be buffered. + + + + + Timeout (in milliseconds) after which the contents of buffer will be flushed if there's no write in the specified period of time. Use -1 to disable timed flushes. + + + + + Indicates whether to use sliding timeout. + + + + + Action to take if the buffer overflows. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Encoding to be used. + + + + + Instance of that is used to format log messages. + + + + + End of line value if a newline is appended at the end of log message . + + + + + Maximum message size in bytes. + + + + + Indicates whether to append newline at the end of log message. + + + + + Action that should be taken if the will be more connections than . + + + + + Action that should be taken if the message is larger than maxMessageSize. + + + + + Maximum current connections. 0 = no maximum. + + + + + Indicates whether to keep connection open whenever possible. + + + + + Size of the connection cache (number of connections which are kept alive). + + + + + Network address. + + + + + Maximum queue size. + + + + + NDC item separator. + + + + + Indicates whether to include source info (file name and line number) in the information sent over the network. + + + + + Indicates whether to include dictionary contents. + + + + + Indicates whether to include contents of the stack. + + + + + Indicates whether to include stack contents. + + + + + Indicates whether to include dictionary contents. + + + + + Indicates whether to include call site (class and method name) in the information sent over the network. + + + + + Option to include all properties from the log events + + + + + AppInfo field. By default it's the friendly name of the current AppDomain. + + + + + Indicates whether to include NLog-specific extensions to log4j schema. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + Layout that should be use to calcuate the value for the parameter. + + + + + Viewer parameter name. + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Text to be rendered. + + + + + Header. + + + + + Footer. + + + + + Indicates whether to use default row highlighting rules. + + + + + Indicates whether to auto-check if the console is available. - Disables console writing if Environment.UserInteractive = False (Windows Service) - Disables console writing if Console Standard Input is not available (Non-Console-App) + + + + + The encoding for writing messages to the . + + + + + Indicates whether the error stream (stderr) should be used instead of the output stream (stdout). + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Condition that must be met in order to set the specified foreground and background color. + + + + + Background color. + + + + + Foreground color. + + + + + + + + + + + + + + + + Indicates whether to ignore case when comparing texts. + + + + + Regular expression to be matched. You must specify either text or regex. + + + + + Text to be matched. You must specify either text or regex. + + + + + Indicates whether to match whole words only. + + + + + Compile the ? This can improve the performance, but at the costs of more memory usage. If false, the Regex Cache is used. + + + + + Background color. + + + + + Foreground color. + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Text to be rendered. + + + + + Header. + + + + + Footer. + + + + + Indicates whether to send the log messages to the standard error instead of the standard output. + + + + + Indicates whether to auto-check if the console is available - Disables console writing if Environment.UserInteractive = False (Windows Service) - Disables console writing if Console Standard Input is not available (Non-Console-App) + + + + + The encoding for writing messages to the . + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Obsolete - value will be ignored! The logging code always runs outside of transaction. Gets or sets a value indicating whether to use database transactions. Some data providers require this. + + + + + Database user name. If the ConnectionString is not provided this value will be used to construct the "User ID=" part of the connection string. + + + + + Name of the database provider. + + + + + Database password. If the ConnectionString is not provided this value will be used to construct the "Password=" part of the connection string. + + + + + Indicates whether to keep the database connection open between the log events. + + + + + Database name. If the ConnectionString is not provided this value will be used to construct the "Database=" part of the connection string. + + + + + Name of the connection string (as specified in <connectionStrings> configuration section. + + + + + Connection string. When provided, it overrides the values specified in DBHost, DBUserName, DBPassword, DBDatabase. + + + + + Database host name. If the ConnectionString is not provided this value will be used to construct the "Server=" part of the connection string. + + + + + Connection string using for installation and uninstallation. If not provided, regular ConnectionString is being used. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + Text of the SQL command to be run on each log level. + + + + + Type of the SQL command to be run on each log level. + + + + + + + + + + + + + + + + + + + + + + + Type of the command. + + + + + Connection string to run the command against. If not provided, connection string from the target is used. + + + + + Indicates whether to ignore failures. + + + + + Command text. + + + + + + + + + + + + + + Layout that should be use to calcuate the value for the parameter. + + + + + Database parameter name. + + + + + Database parameter precision. + + + + + Database parameter scale. + + + + + Database parameter size. + + + + + + + + + + + + + + + + Name of the target. + + + + + Text to be rendered. + + + + + Header. + + + + + Footer. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Layout that renders event Category. + + + + + Layout that renders event ID. + + + + + Name of the Event Log to write to. This can be System, Application or any user-defined name. + + + + + Name of the machine on which Event Log service is running. + + + + + Value to be used as the event Source. + + + + + Action to take if the message is larger than the option. + + + + + Optional entrytype. When not set, or when not convertable to then determined by + + + + + Maximum Event log size in kilobytes. If null, the value won't be set. Default is 512 Kilobytes as specified by Eventlog API + + + + + Message length limit to write to the Event Log. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Indicates whether to return to the first target after any successful write. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Text to be rendered. + + + + + Header. + + + + + Footer. + + + + + File encoding. + + + + + Line ending mode. + + + + + Way file archives are numbered. + + + + + Name of the file to be used for an archive. + + + + + Indicates whether to automatically archive log files every time the specified time passes. + + + + + Size in bytes above which log files will be automatically archived. Warning: combining this with isn't supported. We cannot create multiple archive files, if they should have the same name. Choose: + + + + + Indicates whether to compress archive files into the zip archive format. + + + + + Maximum number of archive files that should be kept. + + + + + Gets or set a value indicating whether a managed file stream is forced, instead of using the native implementation. + + + + + Is the an absolute or relative path? + + + + + Cleanup invalid values in a filename, e.g. slashes in a filename. If set to true, this can impact the performance of massive writes. If set to false, nothing gets written when the filename is wrong. + + + + + Whether or not this target should just discard all data that its asked to write. Mostly used for when testing NLog Stack except final write + + + + + Is the an absolute or relative path? + + + + + Value indicationg whether file creation calls should be synchronized by a system global mutex. + + + + + Maximum number of log filenames that should be stored as existing. + + + + + Indicates whether the footer should be written only when the file is archived. + + + + + Name of the file to write to. + + + + + Value specifying the date format to use when archiving files. + + + + + Indicates whether to archive old log file on startup. + + + + + Indicates whether to create directories if they do not exist. + + + + + File attributes (Windows only). + + + + + Indicates whether to delete old log file on startup. + + + + + Indicates whether to replace file contents on each write instead of appending log message at the end. + + + + + Indicates whether to enable log file(s) to be deleted. + + + + + Number of times the write is appended on the file before NLog discards the log message. + + + + + Indicates whether concurrent writes to the log file by multiple processes on the same host. + + + + + Indicates whether to keep log file open instead of opening and closing it on each logging event. + + + + + Indicates whether concurrent writes to the log file by multiple processes on different network hosts. + + + + + Number of files to be kept open. Setting this to a higher value may improve performance in a situation where a single File target is writing to many files (such as splitting by level or by logger). + + + + + Maximum number of seconds that files are kept open. If this number is negative the files are not automatically closed after a period of inactivity. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + Log file buffer size in bytes. + + + + + Indicates whether to automatically flush the file buffers after each log message. + + + + + Delay in milliseconds to wait before attempting to write to the file again. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Condition expression. Log events who meet this condition will be forwarded to the wrapped target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Windows domain name to change context to. + + + + + Required impersonation level. + + + + + Type of the logon provider. + + + + + Logon Type. + + + + + User account password. + + + + + Indicates whether to revert to the credentials of the process instead of impersonating another user. + + + + + Username to change context to. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Interval in which messages will be written up to the number of messages. + + + + + Maximum allowed number of messages written per . + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Endpoint address. + + + + + Name of the endpoint configuration in WCF configuration file. + + + + + Indicates whether to use a WCF service contract that is one way (fire and forget) or two way (request-reply) + + + + + Client ID. + + + + + Indicates whether to include per-event properties in the payload sent to the server. + + + + + Indicates whether to use binary message encoding. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + Layout that should be use to calculate the value for the parameter. + + + + + Name of the parameter. + + + + + Type of the parameter. + + + + + Type of the parameter. Obsolete alias for + + + + + Parameter can combine multiple LogEvents into a single parameter value + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Text to be rendered. + + + + + Header. + + + + + Footer. + + + + + Indicates whether to send message as HTML instead of plain text. + + + + + Encoding to be used for sending e-mail. + + + + + Indicates whether to add new lines between log entries. + + + + + CC email addresses separated by semicolons (e.g. john@domain.com;jane@domain.com). + + + + + Recipients' email addresses separated by semicolons (e.g. john@domain.com;jane@domain.com). + + + + + BCC email addresses separated by semicolons (e.g. john@domain.com;jane@domain.com). + + + + + Mail message body (repeated for each log message send in one mail). + + + + + Mail subject. + + + + + Sender's email address (e.g. joe@domain.com). + + + + + Indicates the SMTP client timeout. + + + + + Priority used for sending mails. + + + + + Indicates whether NewLine characters in the body should be replaced with tags. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + SMTP Server to be used for sending. + + + + + SMTP Authentication mode. + + + + + Username used to connect to SMTP server (used when SmtpAuthentication is set to "basic"). + + + + + Password used to authenticate against SMTP server (used when SmtpAuthentication is set to "basic"). + + + + + Indicates whether SSL (secure sockets layer) should be used when communicating with SMTP server. + + + + + Port number that SMTP Server is listening on. + + + + + Indicates whether the default Settings from System.Net.MailSettings should be used. + + + + + Folder where applications save mail messages to be processed by the local SMTP server. + + + + + Specifies how outgoing email messages will be handled. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Class name. + + + + + Method name. The method must be public and static. Use the AssemblyQualifiedName , https://msdn.microsoft.com/en-us/library/system.type.assemblyqualifiedname(v=vs.110).aspx e.g. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Encoding to be used. + + + + + End of line value if a newline is appended at the end of log message . + + + + + Maximum message size in bytes. + + + + + Indicates whether to append newline at the end of log message. + + + + + Action that should be taken if the will be more connections than . + + + + + Action that should be taken if the message is larger than maxMessageSize. + + + + + Network address. + + + + + Size of the connection cache (number of connections which are kept alive). + + + + + Indicates whether to keep connection open whenever possible. + + + + + Maximum current connections. 0 = no maximum. + + + + + Maximum queue size. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Encoding to be used. + + + + + Instance of that is used to format log messages. + + + + + End of line value if a newline is appended at the end of log message . + + + + + Maximum message size in bytes. + + + + + Indicates whether to append newline at the end of log message. + + + + + Action that should be taken if the will be more connections than . + + + + + Action that should be taken if the message is larger than maxMessageSize. + + + + + Maximum current connections. 0 = no maximum. + + + + + Indicates whether to keep connection open whenever possible. + + + + + Size of the connection cache (number of connections which are kept alive). + + + + + Network address. + + + + + Maximum queue size. + + + + + NDC item separator. + + + + + Indicates whether to include source info (file name and line number) in the information sent over the network. + + + + + Indicates whether to include dictionary contents. + + + + + Indicates whether to include contents of the stack. + + + + + Indicates whether to include stack contents. + + + + + Indicates whether to include dictionary contents. + + + + + Indicates whether to include call site (class and method name) in the information sent over the network. + + + + + Option to include all properties from the log events + + + + + AppInfo field. By default it's the friendly name of the current AppDomain. + + + + + Indicates whether to include NLog-specific extensions to log4j schema. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Indicates whether to perform layout calculation. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Indicates whether performance counter should be automatically created. + + + + + Name of the performance counter category. + + + + + Counter help text. + + + + + Name of the performance counter. + + + + + Performance counter type. + + + + + The value by which to increment the counter. + + + + + Performance counter instance name. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Default filter to be applied when no specific rule matches. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + Condition to be tested. + + + + + Resulting filter to be applied when the condition matches. + + + + + + + + + + + + + Name of the target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + Name of the target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + Number of times to repeat each log message. + + + + + + + + + + + + + + + + + Name of the target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + Number of retries that should be attempted on the wrapped target in case of a failure. + + + + + Time to wait between retries in milliseconds. + + + + + + + + + + + + + + + Name of the target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + Name of the target. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + Name of the target. + + + + + Layout used to format log messages. + + + + + Always use independent of + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + + + + + + + + + + + + + + + + + + + + + + + + Name of the target. + + + + + Should we include the BOM (Byte-order-mark) for UTF? Influences the property. This will only work for UTF-8. + + + + + Target supports reuse of internal buffers, and doesn't have to constantly allocate new buffers Required for legacy NLog-targets, that expects buffers to remain stable after Write-method exit + + + + + Encoding. + + + + + Value whether escaping be done according to the old NLog style (Very non-standard) + + + + + Value whether escaping be done according to Rfc3986 (Supports Internationalized Resource Identifiers - IRIs) + + + + + Web service method name. Only used with Soap. + + + + + Web service namespace. Only used with Soap. + + + + + Indicates whether to pre-authenticate the HttpWebRequest (Requires 'Authorization' in parameters) + + + + + Protocol to be used when calling web service. + + + + + Web service URL. + + + + + Name of the root XML element, if POST of XML document chosen. If so, this property must not be null. (see and ). + + + + + (optional) root namespace of the XML document, if POST of XML document chosen. (see and ). + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Footer layout. + + + + + Header layout. + + + + + Body layout (can be repeated multiple times). + + + + + Custom column delimiter value (valid when ColumnDelimiter is set to 'Custom'). + + + + + Column delimiter. + + + + + Quote Character. + + + + + Quoting mode. + + + + + Indicates whether CVS should include header. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Layout of the column. + + + + + Name of the column. + + + + + + + + + + + + + + + + + + List of property names to exclude when is true + + + + + Option to include all properties from the log events + + + + + Indicates whether to include contents of the dictionary. + + + + + Indicates whether to include contents of the dictionary. + + + + + Option to render the empty object value {} + + + + + Option to suppress the extra spaces in the output json + + + + + + + + + + + + + + + Determines wether or not this attribute will be Json encoded. + + + + + Indicates whether to escape non-ascii characters + + + + + Layout that will be rendered as the attribute's value. + + + + + Name of the attribute. + + + + + + + + + + + + + + Footer layout. + + + + + Header layout. + + + + + Body layout (can be repeated multiple times). + + + + + + + + + + + + + + + + + + Option to include all properties from the log events + + + + + Indicates whether to include contents of the dictionary. + + + + + Indicates whether to include contents of the dictionary. + + + + + Indicates whether to include contents of the stack. + + + + + Indicates whether to include contents of the stack. + + + + + + + + + + + + + + Layout text. + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + Condition expression. + + + + + + + + + + + + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + Indicates whether to ignore case when comparing strings. + + + + + Layout to be used to filter log messages. + + + + + Substring to be matched. + + + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + String to compare the layout to. + + + + + Indicates whether to ignore case when comparing strings. + + + + + Layout to be used to filter log messages. + + + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + Indicates whether to ignore case when comparing strings. + + + + + Layout to be used to filter log messages. + + + + + Substring to be matched. + + + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + String to compare the layout to. + + + + + Indicates whether to ignore case when comparing strings. + + + + + Layout to be used to filter log messages. + + + + + + + + + + + + + + + + + + + + + + + + Action to be taken when filter matches. + + + + + Layout to be used to filter log messages. + + + + + Default number of unique filter values to expect, will automatically increase if needed + + + + + Append FilterCount to the when an event is no longer filtered + + + + + Insert FilterCount value into when an event is no longer filtered + + + + + Applies the configured action to the initial logevent that starts the timeout period. Used to configure that it should ignore all events until timeout. + + + + + Max number of unique filter values to expect simultaneously + + + + + Max length of filter values, will truncate if above limit + + + + + Default buffer size for the internal buffers + + + + + Reuse internal buffers, and doesn't have to constantly allocate new buffers + + + + + How long before a filter expires, and logging is accepted again + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config b/src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config new file mode 100644 index 0000000..8e74617 --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Configs/nlog.Unix.config @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config b/src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config new file mode 100644 index 0000000..5a7e44e --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config @@ -0,0 +1,36 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/src/JT808.Gateway.TestHosting/Dockerfile b/src/JT808.Gateway.TestHosting/Dockerfile new file mode 100644 index 0000000..6a33faf --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Dockerfile @@ -0,0 +1,23 @@ +#See https://aka.ms/containerfastmode to understand how Visual Studio uses this Dockerfile to build your images for faster debugging. + +FROM mcr.microsoft.com/dotnet/core/runtime:3.1-buster-slim AS base +EXPOSE 808/tcp +WORKDIR /app + +FROM mcr.microsoft.com/dotnet/core/sdk:3.1-buster AS build +WORKDIR /src +COPY ["JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj", "JT808.Gateway.TestHosting/"] +COPY ["JT808.Gateway/JT808.Gateway.csproj", "JT808.Gateway/"] +COPY ["JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj", "JT808.Gateway.Abstractions/"] +RUN dotnet restore "JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj" +COPY . . +WORKDIR "/src/JT808.Gateway.TestHosting" +RUN dotnet build "JT808.Gateway.TestHosting.csproj" -c Release -o /app/build + +FROM build AS publish +RUN dotnet publish "JT808.Gateway.TestHosting.csproj" -c Release -o /app/publish + +FROM base AS final +WORKDIR /app +COPY --from=publish /app/publish . +ENTRYPOINT ["dotnet", "JT808.Gateway.TestHosting.dll"] \ No newline at end of file diff --git a/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj b/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj new file mode 100644 index 0000000..8d084b2 --- /dev/null +++ b/src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj @@ -0,0 +1,35 @@ + + + + Exe + netcoreapp3.1 + Linux + + + + + + + + + + + + + + + + Always + + + Always + + + Always + + + Always + + + + diff --git a/src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs b/src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs new file mode 100644 index 0000000..c233069 --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Jobs/CallGrpcClientJob.cs @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using JT808.Gateway.Configurations; +using JT808.Gateway.GrpcService; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System.Text.Json; + +namespace JT808.Gateway.TestHosting.Jobs +{ + public class CallGrpcClientJob :IHostedService + { + private Channel channel; + private readonly ILogger Logger; + private Grpc.Core.Metadata AuthMetadata; + public CallGrpcClientJob( + ILoggerFactory loggerFactory, + JT808Configuration configuration) + { + Logger = loggerFactory.CreateLogger("CallGrpcClientJob"); + channel = new Channel($"{configuration.WebApiHost}:{configuration.WebApiPort}", + ChannelCredentials.Insecure); + AuthMetadata = new Grpc.Core.Metadata(); + AuthMetadata.Add("token", configuration.WebApiToken); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Task.Run(() => + { + while (!cancellationToken.IsCancellationRequested) + { + JT808Gateway.JT808GatewayClient jT808GatewayClient = new JT808Gateway.JT808GatewayClient(channel); + try + { + var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty(), AuthMetadata); + var result2 = jT808GatewayClient.GetTcpSessionAll(new Empty(), AuthMetadata); + Logger.LogInformation($"[GetTcpAtomicCounter]:{JsonSerializer.Serialize(result1)}"); + Logger.LogInformation($"[GetTcpSessionAll]:{JsonSerializer.Serialize(result2)}"); + } + catch (Exception ex) + { + Logger.LogError(ex, "Call Grpc Error"); + } + try + { + var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty()); + } + catch (RpcException ex) + { + Logger.LogError($"{ex.StatusCode.ToString()}-{ex.Message}"); + } + Thread.Sleep(3000); + } + }, cancellationToken); + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + channel.ShutdownAsync(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway.TestHosting/Program.cs b/src/JT808.Gateway.TestHosting/Program.cs new file mode 100644 index 0000000..7d8573e --- /dev/null +++ b/src/JT808.Gateway.TestHosting/Program.cs @@ -0,0 +1,46 @@ +using System; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using JT808.Protocol; +using Microsoft.Extensions.Configuration; +using NLog.Extensions.Logging; +using JT808.Gateway.TestHosting.Jobs; + +namespace JT808.Gateway.TestHosting +{ + class Program + { + static async Task Main(string[] args) + { + var serverHostBuilder = new HostBuilder() + .ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory) + .AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) + .AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true); + }) + .ConfigureLogging((context, logging) => + { + Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}"); + NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config"); + logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true }); + logging.SetMinimumLevel(LogLevel.Trace); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + services.AddJT808Configure() + .AddJT808Gateway() + .AddTcp() + .AddUdp() + .AddGrpc(); + //services.AddHostedService(); + }); + + await serverHostBuilder.RunConsoleAsync(); + } + } +} diff --git a/src/JT808.Gateway.TestHosting/startup.txt b/src/JT808.Gateway.TestHosting/startup.txt new file mode 100644 index 0000000..0a51e3b --- /dev/null +++ b/src/JT808.Gateway.TestHosting/startup.txt @@ -0,0 +1,4 @@ +pm2 start "dotnet JT808.DotNetty.CleintBenchmark.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.DotNetty.CleintBenchmark" -o "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/out.log" -e "/data/pm2Logs/JT808.DotNetty.CleintBenchmark/error.log" + + +pm2 start "dotnet JT808.Gateway.TestHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT808.Gateway.808" -o "/data/pm2Logs/JT808.Gateway/out.log" -e "/data/pm2Logs/JT808.Gateway/error.log" \ No newline at end of file diff --git a/src/JT808.Gateway.sln b/src/JT808.Gateway.sln new file mode 100644 index 0000000..3719e74 --- /dev/null +++ b/src/JT808.Gateway.sln @@ -0,0 +1,43 @@ + +Microsoft Visual Studio Solution File, Format Version 12.00 +# Visual Studio Version 16 +VisualStudioVersion = 16.0.29409.12 +MinimumVisualStudioVersion = 10.0.40219.1 +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway", "JT808.Gateway\JT808.Gateway.csproj", "{4C8A2546-8333-416D-B123-91062B630087}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{AE40AFE0-0950-442C-A74C-10CDF53E9F36}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808.Gateway.Test\JT808.Gateway.Test.csproj", "{28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}" +EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {4C8A2546-8333-416D-B123-91062B630087}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {4C8A2546-8333-416D-B123-91062B630087}.Debug|Any CPU.Build.0 = Debug|Any CPU + {4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.ActiveCfg = Release|Any CPU + {4C8A2546-8333-416D-B123-91062B630087}.Release|Any CPU.Build.0 = Release|Any CPU + {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AE40AFE0-0950-442C-A74C-10CDF53E9F36}.Release|Any CPU.Build.0 = Release|Any CPU + {28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Debug|Any CPU.Build.0 = Debug|Any CPU + {28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Release|Any CPU.ActiveCfg = Release|Any CPU + {28C9BC4D-7B28-4E65-971A-7156E5EE0ADF}.Release|Any CPU.Build.0 = Release|Any CPU + {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU + {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU + {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(ExtensibilityGlobals) = postSolution + SolutionGuid = {AA9303A7-6FB3-4572-88AA-3302E85330D1} + EndGlobalSection +EndGlobal diff --git a/src/JT808.Gateway/Configurations/JT808Configuration.cs b/src/JT808.Gateway/Configurations/JT808Configuration.cs new file mode 100644 index 0000000..c6d3edd --- /dev/null +++ b/src/JT808.Gateway/Configurations/JT808Configuration.cs @@ -0,0 +1,43 @@ +using JT808.Gateway.Enums; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configurations +{ + public class JT808Configuration + { + public int TcpPort { get; set; } = 808; + public int UdpPort { get; set; } = 808; + public int WebApiPort { get; set; } = 828; + public string WebApiHost{ get; set; } = "localhost"; + /// + /// WebApi 默认token 123456 + /// + public string WebApiToken { get; set; } = "123456"; + public int SoBacklog { get; set; } = 8192; + public int MiniNumBufferSize { get; set; } = 8096; + /// + /// Tcp读超时 + /// 默认10分钟检查一次 + /// + public int TcpReaderIdleTimeSeconds { get; set; } = 60*10; + /// + /// Tcp 60s检查一次 + /// + public int TcpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; + /// + /// Udp读超时 + /// + public int UdpReaderIdleTimeSeconds { get; set; } = 60; + /// + /// Udp 60s检查一次 + /// + public int UdpReceiveTimeoutCheckTimeSeconds { get; set; } = 60; + /// + /// 队列类型 + /// 默认内存队列 + /// + public JT808MessageQueueType MessageQueueType { get; set; } = JT808MessageQueueType.InMemory; + } +} diff --git a/src/JT808.Gateway/Enums/JT808MessageQueueType.cs b/src/JT808.Gateway/Enums/JT808MessageQueueType.cs new file mode 100644 index 0000000..a83f6a0 --- /dev/null +++ b/src/JT808.Gateway/Enums/JT808MessageQueueType.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Enums +{ + public enum JT808MessageQueueType:byte + { + /// + /// 使用内存队列 + /// + InMemory=1, + /// + /// 使用第三方队列 + /// + InPlug=2 + } +} diff --git a/src/JT808.Gateway/Interfaces/IJT808Session.cs b/src/JT808.Gateway/Interfaces/IJT808Session.cs new file mode 100644 index 0000000..14e1b0d --- /dev/null +++ b/src/JT808.Gateway/Interfaces/IJT808Session.cs @@ -0,0 +1,26 @@ +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Generic; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Interfaces +{ + public interface IJT808Session + { + /// + /// 终端手机号 + /// + string TerminalPhoneNo { get; set; } + string SessionID { get; } + Socket Client { get; set; } + DateTime StartTime { get; set; } + DateTime ActiveTime { get; set; } + JT808TransportProtocolType TransportProtocolType { get;} + CancellationTokenSource ReceiveTimeout { get; set; } + EndPoint RemoteEndPoint { get; set; } + void Close(); + } +} diff --git a/src/JT808.Gateway/Internal/JT808GatewayBuilderDefault.cs b/src/JT808.Gateway/Internal/JT808GatewayBuilderDefault.cs new file mode 100644 index 0000000..1ef7110 --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808GatewayBuilderDefault.cs @@ -0,0 +1,20 @@ +using JT808.Gateway.Abstractions; +using JT808.Protocol; + +namespace JT808.Gateway.Internal +{ + public class JT808GatewayBuilderDefault : IJT808GatewayBuilder + { + public IJT808Builder JT808Builder { get; } + + public JT808GatewayBuilderDefault(IJT808Builder builder) + { + JT808Builder = builder; + } + + public IJT808Builder Builder() + { + return JT808Builder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs b/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs new file mode 100644 index 0000000..610dd97 --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs @@ -0,0 +1,28 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Internal; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Internal +{ + internal class JT808MsgProducerDefault : IJT808MsgProducer + { + private readonly JT808MsgService JT808MsgService; + public string TopicName => JT808GatewayConstants.MsgTopic; + public JT808MsgProducerDefault(JT808MsgService jT808MsgService) + { + JT808MsgService = jT808MsgService; + } + public void Dispose() + { + + } + public ValueTask ProduceAsync(string terminalNo, byte[] data) + { + JT808MsgService.MsgQueue.Add((terminalNo, data)); + return default; + } + } +} diff --git a/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs b/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs new file mode 100644 index 0000000..4276be3 --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs @@ -0,0 +1,201 @@ +using JT808.Gateway.Abstractions; +using JT808.Protocol; +using JT808.Protocol.Enums; +using JT808.Protocol.Extensions; +using JT808.Protocol.MessageBody; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Internal +{ + internal class JT808MsgReplyConsumerDefault : IJT808MsgReplyConsumer + { + private readonly JT808MsgService JT808MsgService; + + private readonly JT808Serializer JT808Serializer; + + private delegate byte[] MethodDelegate(JT808HeaderPackage headerPackage); + + private Dictionary HandlerDict; + public JT808MsgReplyConsumerDefault( + IJT808Config jT808Config, + JT808MsgService jT808MsgService) + { + JT808MsgService = jT808MsgService; + this.JT808Serializer = jT808Config.GetSerializer(); + HandlerDict = new Dictionary { + {JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001}, + {JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102}, + {JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002}, + {JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003}, + {JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100}, + {JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 }, + {JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 }, + {JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 } + }; + } + public CancellationTokenSource Cts =>new CancellationTokenSource(); + + public string TopicName => JT808GatewayConstants.MsgReplyTopic; + + public void Dispose() + { + Cts.Dispose(); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(() => + { + foreach(var item in JT808MsgService.MsgQueue.GetConsumingEnumerable()) + { + try + { + var package = JT808Serializer.HeaderDeserialize(item.Data); + if (HandlerDict.TryGetValue(package.Header.MsgId, out var func)) + { + var buffer = func(package); + if (buffer != null) + { + callback((item.TerminalNo, buffer)); + } + } + } + catch + { + + } + } + }, Cts.Token); + } + + public void Subscribe() + { + + } + + public void Unsubscribe() + { + Cts.Cancel(); + } + + /// + /// 终端通用应答 + /// 平台无需回复 + /// 实现自己的业务 + /// + /// + /// + public byte[] Msg0x0001(JT808HeaderPackage request) + { + return null; + } + + /// + /// 终端心跳 + /// + /// + /// + public byte[] Msg0x0002(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + + /// + /// 终端注销 + /// + /// + /// + public byte[] Msg0x0003(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + + /// + /// 终端注册 + /// + /// + /// + public byte[] Msg0x0100(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8100() + { + Code = "J" + request.Header.TerminalPhoneNo, + JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功, + AckMsgNum = request.Header.MsgNum + })); + } + /// + /// 终端鉴权 + /// + /// + /// + public byte[] Msg0x0102(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + + /// + /// 位置信息汇报 + /// + /// + /// + public byte[] Msg0x0200(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + + /// + /// 定位数据批量上传 + /// + /// + /// + public byte[] Msg0x0704(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + + /// + /// 数据上行透传 + /// + /// + /// + public byte[] Msg0x0900(JT808HeaderPackage request) + { + return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001() + { + AckMsgId = request.Header.MsgId, + JT808PlatformResult = JT808PlatformResult.成功, + MsgNum = request.Header.MsgNum + })); + } + } +} diff --git a/src/JT808.Gateway/Internal/JT808MsgService.cs b/src/JT808.Gateway/Internal/JT808MsgService.cs new file mode 100644 index 0000000..c1d6095 --- /dev/null +++ b/src/JT808.Gateway/Internal/JT808MsgService.cs @@ -0,0 +1,11 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Internal +{ + internal class JT808MsgService + { + public System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)> MsgQueue { get; set; } = new System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)>(); + } +} diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj new file mode 100644 index 0000000..fdbfd6e --- /dev/null +++ b/src/JT808.Gateway/JT808.Gateway.csproj @@ -0,0 +1,31 @@ + + + + netstandard2.1 + 8.0 + Copyright 2019. + SmallChi(Koike) + false + false + LICENSE + true + 基于Pipeline实现的JT808Gateway的网络库 + 基于Pipeline实现的JT808Gateway的网络库 + JT808.Gateway + JT808.Gateway + 1.0.0-preview2 + + + + + + + + + + + + + + + diff --git a/src/JT808.Gateway/JT808GatewayExtensions.cs b/src/JT808.Gateway/JT808GatewayExtensions.cs new file mode 100644 index 0000000..0f2891d --- /dev/null +++ b/src/JT808.Gateway/JT808GatewayExtensions.cs @@ -0,0 +1,75 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Configurations; +using JT808.Gateway.Internal; +using JT808.Gateway.Services; +using JT808.Gateway.Session; +using JT808.Protocol; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Runtime.CompilerServices; + +[assembly: InternalsVisibleTo("JT808.Gateway.TestHosting")] +[assembly: InternalsVisibleTo("JT808.Gateway.Test")] +namespace JT808.Gateway +{ + public static partial class JT808GatewayExtensions + { + public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder) + { + IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); + server.JT808Builder.Services.TryAddSingleton(); + server.AddJT808Core(); + return server; + } + + public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder,Action config) + { + IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); + server.JT808Builder.Services.Configure(config); + server.AddJT808Core(); + return server; + } + + public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder, IConfiguration configuration) + { + IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder); + server.JT808Builder.Services.Configure(configuration.GetSection("JT808Configuration")); + server.AddJT808Core(); + return server; + } + + public static IJT808GatewayBuilder AddTcp(this IJT808GatewayBuilder config) + { + config.JT808Builder.Services.AddHostedService(); + config.JT808Builder.Services.AddHostedService(); + return config; + } + + public static IJT808GatewayBuilder AddUdp(this IJT808GatewayBuilder config) + { + config.JT808Builder.Services.AddHostedService(); + config.JT808Builder.Services.AddHostedService(); + return config; + } + + public static IJT808GatewayBuilder AddGrpc(this IJT808GatewayBuilder config) + { + config.JT808Builder.Services.AddHostedService(); + return config; + } + + private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config) + { + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.TryAddSingleton(); + config.JT808Builder.Services.AddHostedService(); + return config; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway/JT808GrpcServer.cs b/src/JT808.Gateway/JT808GrpcServer.cs new file mode 100644 index 0000000..e430873 --- /dev/null +++ b/src/JT808.Gateway/JT808GrpcServer.cs @@ -0,0 +1,54 @@ +using System; +using System.Threading; +using System.Threading.Tasks; +using Grpc.Core; +using JT808.Gateway.Configurations; +using JT808.Gateway.GrpcService; +using JT808.Gateway.Services; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace JT808.Gateway +{ + public class JT808GrpcServer : IHostedService + { + private readonly ILogger Logger; + private readonly JT808Configuration Configuration; + private readonly IServiceProvider ServiceProvider; + private Server server; + public JT808GrpcServer( + IServiceProvider serviceProvider, + JT808Configuration jT808Configuration, + ILoggerFactory loggerFactory) + { + Logger = loggerFactory.CreateLogger("JT808GrpcServer"); + Configuration = jT808Configuration; + ServiceProvider = serviceProvider; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + server = new Server + { + Services = { JT808Gateway.BindService(new JT808GatewayService(ServiceProvider)) }, + Ports = { new ServerPort(Configuration.WebApiHost, Configuration.WebApiPort, ServerCredentials.Insecure) } + }; + Logger.LogInformation($"JT808 Grpc Server start at {Configuration.WebApiHost}:{Configuration.WebApiPort}."); + try + { + server.Start(); + } + catch (Exception ex) + { + Logger.LogError(ex, "JT808 Grpc Server start error"); + } + return Task.CompletedTask; + } + public Task StopAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("JT808 Grpc Server Stop"); + server.ShutdownAsync(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway/JT808TcpServer.cs b/src/JT808.Gateway/JT808TcpServer.cs new file mode 100644 index 0000000..969cab8 --- /dev/null +++ b/src/JT808.Gateway/JT808TcpServer.cs @@ -0,0 +1,226 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Configurations; +using JT808.Gateway.Enums; +using JT808.Gateway.Services; +using JT808.Gateway.Session; +using JT808.Protocol; +using JT808.Protocol.Exceptions; +using JT808.Protocol.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace JT808.Gateway +{ + public class JT808TcpServer:IHostedService + { + private Socket server; + + private const byte beginAndEndMark = 0x7e; + + private readonly ILogger Logger; + + private readonly JT808SessionManager SessionManager; + + private readonly IJT808MsgProducer MsgProducer; + + private readonly JT808Serializer Serializer; + + private readonly JT808AtomicCounterService AtomicCounterService; + + private readonly JT808Configuration Configuration; + + public JT808TcpServer( + JT808Configuration jT808Configuration, + IJT808Config jT808Config, + ILoggerFactory loggerFactory, + JT808SessionManager jT808SessionManager, + IJT808MsgProducer jT808MsgProducer, + JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory) + { + SessionManager = jT808SessionManager; + Logger = loggerFactory.CreateLogger("JT808TcpServer"); + Serializer = jT808Config.GetSerializer(); + MsgProducer = jT808MsgProducer; + AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp); + Configuration = jT808Configuration; + var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.TcpPort); + server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true); + server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true); + server.LingerState = new LingerOption(false, 0); + server.Bind(IPEndPoint); + server.Listen(jT808Configuration.SoBacklog); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Logger.LogInformation($"JT808 TCP Server start at {IPAddress.Any}:{Configuration.TcpPort}."); + Task.Run(async() => { + while (!cancellationToken.IsCancellationRequested) + { + var socket = await server.AcceptAsync(); + JT808TcpSession jT808TcpSession = new JT808TcpSession(socket); + await Task.Factory.StartNew(async (state) => + { + var session = (JT808TcpSession)state; + SessionManager.TryAdd(session); + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Connected]:{session.Client.RemoteEndPoint}"); + } + var pipe = new Pipe(); + Task writing = FillPipeAsync(session, pipe.Writer); + Task reading = ReadPipeAsync(session, pipe.Reader); + await Task.WhenAll(reading, writing); + SessionManager.RemoveBySessionId(session.SessionID); + }, jT808TcpSession); + } + }, cancellationToken); + return Task.CompletedTask; + } + private async Task FillPipeAsync(JT808TcpSession session, PipeWriter writer) + { + while (true) + { + try + { + Memory memory = writer.GetMemory(Configuration.MiniNumBufferSize); + //设备多久没发数据就断开连接 Receive Timeout. + int bytesRead = await session.Client.ReceiveAsync(memory, SocketFlags.None, session.ReceiveTimeout.Token); + if (bytesRead == 0) + { + break; + } + writer.Advance(bytesRead); + } + catch(OperationCanceledException) + { + Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}"); + break; + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}"); + break; + } + FlushResult result = await writer.FlushAsync(); + if (result.IsCompleted) + { + break; + } + } + writer.Complete(); + } + private async Task ReadPipeAsync(JT808TcpSession session, PipeReader reader) + { + while (true) + { + ReadResult result = await reader.ReadAsync(); + if (result.IsCompleted) + { + break; + } + ReadOnlySequence buffer = result.Buffer; + SequencePosition consumed = buffer.Start; + SequencePosition examined = buffer.End; + try + { + if (result.IsCanceled) break; + if (buffer.Length > 0) + { + ReaderBuffer(ref buffer, session,out consumed, out examined); + } + } + catch (Exception ex) + { + SessionManager.RemoveBySessionId(session.SessionID); + break; + } + finally + { + reader.AdvanceTo(consumed, examined); + } + } + reader.Complete(); + } + private void ReaderBuffer(ref ReadOnlySequence buffer, JT808TcpSession session, out SequencePosition consumed, out SequencePosition examined) + { + consumed = buffer.Start; + examined = buffer.End; + SequenceReader seqReader = new SequenceReader(buffer); + if (seqReader.TryPeek(out byte beginMark)) + { + if (beginMark != beginAndEndMark) throw new ArgumentException("Not JT808 Packages."); + } + byte mark = 0; + long totalConsumed = 0; + while (!seqReader.End) + { + if (seqReader.IsNext(beginAndEndMark, advancePast: true)) + { + if (mark == 1) + { + try + { + var package = Serializer.HeaderDeserialize(seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan); + AtomicCounterService.MsgSuccessIncrement(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); + //设直连模式和转发模式的会话如何处理 + SessionManager.TryLink(package.Header.TerminalPhoneNo, session); + if(Configuration.MessageQueueType == JT808MessageQueueType.InMemory) + { + MsgProducer.ProduceAsync(session.SessionID, package.OriginalData.ToArray()); + } + else + { + MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); + } + } + catch (JT808Exception ex) + { + AtomicCounterService.MsgFailIncrement(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); + Logger.LogError(ex,$"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}"); + } + totalConsumed += (seqReader.Consumed - totalConsumed); + if (seqReader.End) break; + seqReader.Advance(1); + mark = 0; + } + mark++; + } + else + { + seqReader.Advance(1); + } + } + if (seqReader.Length== totalConsumed) + { + examined = consumed = buffer.End; + } + else + { + consumed = buffer.GetPosition(totalConsumed); + } + } + public Task StopAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("808 Tcp Server Stop"); + if (server?.Connected ?? false) + server.Shutdown(SocketShutdown.Both); + server?.Close(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway/JT808UdpServer.cs b/src/JT808.Gateway/JT808UdpServer.cs new file mode 100644 index 0000000..749bcb0 --- /dev/null +++ b/src/JT808.Gateway/JT808UdpServer.cs @@ -0,0 +1,134 @@ +using System; +using System.Buffers; +using System.Collections.Generic; +using System.IO.Pipelines; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Configurations; +using JT808.Gateway.Enums; +using JT808.Gateway.Services; +using JT808.Gateway.Session; +using JT808.Protocol; +using JT808.Protocol.Exceptions; +using JT808.Protocol.Extensions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace JT808.Gateway +{ + public class JT808UdpServer : IHostedService + { + private Socket server; + + private readonly ILogger Logger; + + private readonly JT808SessionManager SessionManager; + + private readonly IJT808MsgProducer MsgProducer; + + private readonly JT808Serializer Serializer; + + private readonly JT808AtomicCounterService AtomicCounterService; + + private readonly JT808Configuration Configuration; + + private IPEndPoint LocalIPEndPoint; + + public JT808UdpServer( + JT808Configuration jT808Configuration, + IJT808Config jT808Config, + ILoggerFactory loggerFactory, + JT808SessionManager jT808SessionManager, + IJT808MsgProducer jT808MsgProducer, + JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory) + { + SessionManager = jT808SessionManager; + Logger = loggerFactory.CreateLogger("JT808UdpServer"); + Serializer = jT808Config.GetSerializer(); + MsgProducer = jT808MsgProducer; + AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp); + Configuration = jT808Configuration; + LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.UdpPort); + server = new Socket(LocalIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp); + server.Bind(LocalIPEndPoint); + } + + public Task StartAsync(CancellationToken cancellationToken) + { + Logger.LogInformation($"JT808 Udp Server start at {IPAddress.Any}:{Configuration.UdpPort}."); + Task.Run(async() => { + while (!cancellationToken.IsCancellationRequested) + { + var buffer = ArrayPool.Shared.Rent(Configuration.MiniNumBufferSize); + try + { + var segment = new ArraySegment(buffer); + SocketReceiveMessageFromResult result = await server.ReceiveMessageFromAsync(segment, SocketFlags.None, LocalIPEndPoint); + ReaderBuffer(buffer.AsSpan(0, result.ReceivedBytes), server, result); + } + catch(AggregateException ex) + { + Logger.LogError(ex, "Receive MessageFrom Async"); + } + catch(Exception ex) + { + Logger.LogError(ex, $"Received Bytes"); + } + finally + { + ArrayPool.Shared.Return(buffer); + } + } + }, cancellationToken); + return Task.CompletedTask; + } + private void ReaderBuffer(ReadOnlySpan buffer, Socket socket,SocketReceiveMessageFromResult receiveMessageFromResult) + { + try + { + var package = Serializer.HeaderDeserialize(buffer); + AtomicCounterService.MsgSuccessIncrement(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}"); + if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}"); + //设直连模式和转发模式的会话如何处理 + string sessionId= SessionManager.TryLink(package.Header.TerminalPhoneNo, socket, receiveMessageFromResult.RemoteEndPoint); + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}"); + } + if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory) + { + MsgProducer.ProduceAsync(sessionId, package.OriginalData.ToArray()); + } + else + { + MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray()); + } + } + catch (JT808Exception ex) + { + AtomicCounterService.MsgFailIncrement(); + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); + Logger.LogError(ex, $"[HeaderDeserialize ErrorCode]:{ ex.ErrorCode}-{buffer.ToArray().ToHexString()}"); + } + catch (Exception ex) + { + if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Fail Counter]:{AtomicCounterService.MsgFailCount}"); + Logger.LogError(ex, $"[ReaderBuffer]:{ buffer.ToArray().ToHexString()}"); + } + } + public Task StopAsync(CancellationToken cancellationToken) + { + Logger.LogInformation("808 Udp Server Stop"); + if (server?.Connected ?? false) + server.Shutdown(SocketShutdown.Both); + server?.Close(); + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs b/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs new file mode 100644 index 0000000..6362905 --- /dev/null +++ b/src/JT808.Gateway/Metadata/JT808AtomicCounter.cs @@ -0,0 +1,49 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.Gateway.Metadata +{ + /// + /// + /// + /// + internal class JT808AtomicCounter + { + long counter = 0; + + public JT808AtomicCounter(long initialCount = 0) + { + this.counter = initialCount; + } + + public void Reset() + { + Interlocked.Exchange(ref counter, 0); + } + + public long Increment() + { + return Interlocked.Increment(ref counter); + } + + public long Add(long len) + { + return Interlocked.Add(ref counter,len); + } + + public long Decrement() + { + return Interlocked.Decrement(ref counter); + } + + public long Count + { + get + { + return Interlocked.Read(ref counter); + } + } + } +} diff --git a/src/JT808.Gateway/Services/JT808AtomicCounterService.cs b/src/JT808.Gateway/Services/JT808AtomicCounterService.cs new file mode 100644 index 0000000..cddd31f --- /dev/null +++ b/src/JT808.Gateway/Services/JT808AtomicCounterService.cs @@ -0,0 +1,52 @@ +using JT808.Gateway.Metadata; + +namespace JT808.Gateway.Services +{ + /// + /// 计数包服务 + /// + public class JT808AtomicCounterService + { + private readonly JT808AtomicCounter MsgSuccessCounter; + + private readonly JT808AtomicCounter MsgFailCounter; + + public JT808AtomicCounterService() + { + MsgSuccessCounter=new JT808AtomicCounter(); + MsgFailCounter = new JT808AtomicCounter(); + } + + public void Reset() + { + MsgSuccessCounter.Reset(); + MsgFailCounter.Reset(); + } + + public long MsgSuccessIncrement() + { + return MsgSuccessCounter.Increment(); + } + + public long MsgSuccessCount + { + get + { + return MsgSuccessCounter.Count; + } + } + + public long MsgFailIncrement() + { + return MsgFailCounter.Increment(); + } + + public long MsgFailCount + { + get + { + return MsgFailCounter.Count; + } + } + } +} diff --git a/src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs b/src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs new file mode 100644 index 0000000..e4c9536 --- /dev/null +++ b/src/JT808.Gateway/Services/JT808AtomicCounterServiceFactory.cs @@ -0,0 +1,30 @@ +using JT808.Gateway.Abstractions.Enums; +using System; +using System.Collections.Concurrent; + +namespace JT808.Gateway.Services +{ + public class JT808AtomicCounterServiceFactory + { + private readonly ConcurrentDictionary cache; + + public JT808AtomicCounterServiceFactory() + { + cache = new ConcurrentDictionary(); + } + + public JT808AtomicCounterService Create(JT808TransportProtocolType type) + { + if(cache.TryGetValue(type,out var service)) + { + return service; + } + else + { + var serviceNew = new JT808AtomicCounterService(); + cache.TryAdd(type, serviceNew); + return serviceNew; + } + } + } +} diff --git a/src/JT808.Gateway/Services/JT808GatewayService.cs b/src/JT808.Gateway/Services/JT808GatewayService.cs new file mode 100644 index 0000000..cf0b7d5 --- /dev/null +++ b/src/JT808.Gateway/Services/JT808GatewayService.cs @@ -0,0 +1,144 @@ +using System; +using System.Linq; +using JT808.Gateway.GrpcService; +using Grpc.Core; +using System.Threading.Tasks; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Session; +using Microsoft.Extensions.DependencyInjection; +using static Grpc.Core.Metadata; +using Microsoft.Extensions.Options; +using JT808.Gateway.Configurations; + +namespace JT808.Gateway.Services +{ + public class JT808GatewayService: JT808Gateway.JT808GatewayBase + { + private readonly JT808AtomicCounterService jT808TcpAtomicCounterService; + + private readonly JT808AtomicCounterService jT808UdpAtomicCounterService; + + private readonly JT808SessionManager jT808SessionManager; + + private readonly IOptionsMonitor ConfigurationOptionsMonitor; + + public JT808GatewayService( + IOptionsMonitor configurationOptionsMonitor, + JT808SessionManager jT808SessionManager, + JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory + ) + { + this.jT808SessionManager = jT808SessionManager; + this.ConfigurationOptionsMonitor = configurationOptionsMonitor; + this.jT808TcpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp); + this.jT808UdpAtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp); + } + + public JT808GatewayService(IServiceProvider serviceProvider) + { + this.jT808SessionManager = serviceProvider.GetRequiredService(); + this.jT808TcpAtomicCounterService = serviceProvider.GetRequiredService().Create(JT808TransportProtocolType.tcp); + this.jT808UdpAtomicCounterService = serviceProvider.GetRequiredService().Create(JT808TransportProtocolType.udp); + this.ConfigurationOptionsMonitor = serviceProvider.GetRequiredService>(); + } + + public override Task GetTcpSessionAll(Empty request, ServerCallContext context) + { + Auth(context); + var result = jT808SessionManager.GetTcpAll(); + TcpSessionInfoReply reply = new TcpSessionInfoReply(); + foreach (var item in result) + { + reply.TcpSessions.Add(new SessionInfo + { + LastActiveTime = item.ActiveTime.ToString("yyyy-MM-dd HH:mm:ss"), + StartTime = item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"), + RemoteAddressIP = item.RemoteEndPoint.ToString(), + TerminalPhoneNo = item.TerminalPhoneNo + }); + } + return Task.FromResult(reply); + } + + public override Task RemoveSessionByTerminalPhoneNo(SessionRemoveRequest request, ServerCallContext context) + { + Auth(context); + try + { + jT808SessionManager.RemoveByTerminalPhoneNo(request.TerminalPhoneNo); + return Task.FromResult(new SessionRemoveReply { Success = true }); + } + catch (Exception) + { + return Task.FromResult(new SessionRemoveReply { Success = false }); + } + } + + public override Task GetUdpSessionAll(Empty request, ServerCallContext context) + { + Auth(context); + var result = jT808SessionManager.GetUdpAll(); + UdpSessionInfoReply reply = new UdpSessionInfoReply(); + foreach (var item in result) + { + reply.UdpSessions.Add(new SessionInfo + { + LastActiveTime = item.ActiveTime.ToString("yyyy-MM-dd HH:mm:ss"), + StartTime = item.StartTime.ToString("yyyy-MM-dd HH:mm:ss"), + RemoteAddressIP = item.RemoteEndPoint.ToString(), + TerminalPhoneNo = item.TerminalPhoneNo + }); + } + + return Task.FromResult(reply); + } + + public override Task UnificationSend(UnificationSendRequest request, ServerCallContext context) + { + Auth(context); + try + { + var flag = jT808SessionManager.TrySendByTerminalPhoneNo(request.TerminalPhoneNo, request.Data.ToByteArray()); + return Task.FromResult(new UnificationSendReply { Success = flag }); + } + catch (Exception) + { + return Task.FromResult(new UnificationSendReply { Success = false }); + } + } + + public override Task GetTcpAtomicCounter(Empty request, ServerCallContext context) + { + Auth(context); + TcpAtomicCounterReply reply = new TcpAtomicCounterReply(); + reply.MsgFailCount=jT808TcpAtomicCounterService.MsgFailCount; + reply.MsgSuccessCount=jT808TcpAtomicCounterService.MsgSuccessCount; + return Task.FromResult(reply); + } + + public override Task GetUdpAtomicCounter(Empty request, ServerCallContext context) + { + Auth(context); + UdpAtomicCounterReply reply = new UdpAtomicCounterReply(); + reply.MsgFailCount = jT808UdpAtomicCounterService.MsgFailCount; + reply.MsgSuccessCount = jT808UdpAtomicCounterService.MsgSuccessCount; + return Task.FromResult(reply); + } + + private void Auth(ServerCallContext context) + { + Entry tokenEntry = context.RequestHeaders.FirstOrDefault(w => w.Key == "token"); + if (tokenEntry != null) + { + if(tokenEntry.Value != ConfigurationOptionsMonitor.CurrentValue.WebApiToken) + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Unauthenticated, "token error")); + } + } + else + { + throw new Grpc.Core.RpcException(new Status(StatusCode.Unauthenticated,"token empty")); + } + } + } +} diff --git a/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs new file mode 100644 index 0000000..d163715 --- /dev/null +++ b/src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs @@ -0,0 +1,52 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Configurations; +using JT808.Gateway.Enums; +using JT808.Gateway.Session; +using Microsoft.Extensions.Hosting; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Services +{ + internal class JT808MsgReplyHostedService : IHostedService + { + private readonly JT808SessionManager JT808SessionManager; + + private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer; + private readonly JT808Configuration Configuration; + public JT808MsgReplyHostedService( + JT808Configuration jT808Configuration, + IJT808MsgReplyConsumer jT808MsgReplyConsumer, + JT808SessionManager jT808SessionManager) + { + JT808MsgReplyConsumer = jT808MsgReplyConsumer; + JT808SessionManager = jT808SessionManager; + Configuration = jT808Configuration; + } + + public Task StartAsync(CancellationToken cancellationToken) + { + if(Configuration.MessageQueueType== JT808MessageQueueType.InMemory) + { + JT808MsgReplyConsumer.OnMessage(item => + { + JT808SessionManager.TrySendBySessionId(item.TerminalNo, item.Data); + }); + JT808MsgReplyConsumer.Subscribe(); + } + return Task.CompletedTask; + } + + public Task StopAsync(CancellationToken cancellationToken) + { + if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory) + { + JT808MsgReplyConsumer.Unsubscribe(); + } + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs b/src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs new file mode 100644 index 0000000..8fdda8c --- /dev/null +++ b/src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs @@ -0,0 +1,58 @@ +using JT808.Gateway.Configurations; +using JT808.Gateway.Session; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Services +{ + internal class JT808TcpReceiveTimeoutHostedService : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT808SessionManager SessionManager; + + private readonly JT808Configuration Configuration; + public JT808TcpReceiveTimeoutHostedService( + JT808Configuration jT808Configuration, + ILoggerFactory loggerFactory, + JT808SessionManager jT808SessionManager + ) + { + SessionManager = jT808SessionManager; + Logger = loggerFactory.CreateLogger("JT808TcpReceiveTimeout"); + Configuration = jT808Configuration; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + foreach (var item in SessionManager.GetTcpAll()) + { + if (item.ActiveTime.AddSeconds(Configuration.TcpReaderIdleTimeSeconds) < DateTime.Now) + { + item.ReceiveTimeout.Cancel(); + } + } + Logger.LogInformation($"[Check Receive Timeout]"); + Logger.LogInformation($"[Session Online Count]:{SessionManager.TcpSessionCount}"); + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Timeout]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.TcpReceiveTimeoutCheckTimeSeconds), stoppingToken); + } + } + } + } +} diff --git a/src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs b/src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs new file mode 100644 index 0000000..3f04f30 --- /dev/null +++ b/src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs @@ -0,0 +1,63 @@ +using JT808.Gateway.Configurations; +using JT808.Gateway.Session; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Services +{ + internal class JT808UdpReceiveTimeoutHostedService : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT808SessionManager SessionManager; + + private readonly JT808Configuration Configuration; + public JT808UdpReceiveTimeoutHostedService( + JT808Configuration jT808Configuration, + ILoggerFactory loggerFactory, + JT808SessionManager jT808SessionManager + ) + { + SessionManager = jT808SessionManager; + Logger = loggerFactory.CreateLogger("JT808UdpReceiveTimeout"); + Configuration = jT808Configuration; + } + + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + while (!stoppingToken.IsCancellationRequested) + { + try + { + List sessionIds = new List(); + foreach (var item in SessionManager.GetUdpAll()) + { + if (item.ActiveTime.AddSeconds(Configuration.UdpReaderIdleTimeSeconds) < DateTime.Now) + { + sessionIds.Add(item.SessionID); + } + } + foreach(var item in sessionIds) + { + SessionManager.RemoveBySessionId(item); + } + Logger.LogInformation($"[Check Receive Timeout]"); + Logger.LogInformation($"[Session Online Count]:{SessionManager.UdpSessionCount}"); + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Receive Timeout]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.UdpReceiveTimeoutCheckTimeSeconds), stoppingToken); + } + } + } + } +} diff --git a/src/JT808.Gateway/Session/JT808SessionManager.cs b/src/JT808.Gateway/Session/JT808SessionManager.cs new file mode 100644 index 0000000..a7f74cc --- /dev/null +++ b/src/JT808.Gateway/Session/JT808SessionManager.cs @@ -0,0 +1,224 @@ +using JT808.Gateway.Abstractions; +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Interfaces; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Net; +using System.Net.Sockets; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Session +{ + /// + /// + /// 不支持变态类型:既发TCP和UDP + /// + public class JT808SessionManager + { + private readonly ILogger logger; + private readonly IJT808SessionProducer JT808SessionProducer; + public ConcurrentDictionary Sessions { get; } + public ConcurrentDictionary TerminalPhoneNoSessions { get; } + public JT808SessionManager( + IJT808SessionProducer jT808SessionProducer, + ILoggerFactory loggerFactory + ) + { + JT808SessionProducer = jT808SessionProducer; + Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + logger = loggerFactory.CreateLogger("JT808SessionManager"); + } + + public JT808SessionManager(ILoggerFactory loggerFactory) + { + Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + TerminalPhoneNoSessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); + logger = loggerFactory.CreateLogger("JT808SessionManager"); + } + + public int TotalSessionCount + { + get + { + return Sessions.Count; + } + } + + public int TcpSessionCount + { + get + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Count(); + } + } + + public int UdpSessionCount + { + get + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Count(); + } + } + + internal void TryLink(string terminalPhoneNo, IJT808Session session) + { + session.ActiveTime = DateTime.Now; + session.TerminalPhoneNo = terminalPhoneNo; + Sessions.TryUpdate(session.SessionID, session, session); + TerminalPhoneNoSessions.AddOrUpdate(terminalPhoneNo, session.SessionID, (key, oldValue)=> + { + if(session.SessionID!= oldValue) + { + //会话通知 + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, key); + return session.SessionID; + } + return oldValue; + }); + } + + public string TryLink(string terminalPhoneNo, Socket socket, EndPoint remoteEndPoint) + { + string sessionId = string.Empty; + if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out sessionId)) + { + if (Sessions.TryGetValue(sessionId, out IJT808Session sessionInfo)) + { + sessionInfo.ActiveTime = DateTime.Now; + sessionInfo.TerminalPhoneNo = terminalPhoneNo; + sessionInfo.RemoteEndPoint = remoteEndPoint; + Sessions.TryUpdate(sessionId, sessionInfo, sessionInfo); + } + } + else + { + JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint); + Sessions.TryAdd(session.SessionID, session); + TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session.SessionID); + sessionId = session.SessionID; + } + //会话通知 + //使用场景: + //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, + //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 + //有设备关联上来可以进行通知 例如:使用Redis发布订阅 + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); + return sessionId; + } + + internal bool TryAdd(IJT808Session session) + { + return Sessions.TryAdd(session.SessionID, session); + } + + public bool TrySendByTerminalPhoneNo(string terminalPhoneNo, byte[] data) + { + if(TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo,out var sessionid)) + { + if (Sessions.TryGetValue(sessionid, out var session)) + { + if (session.TransportProtocolType == JT808TransportProtocolType.tcp) + { + session.Client.Send(data, SocketFlags.None); + } + else + { + session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint); + } + return true; + } + else + { + return false; + } + } + else + { + return false; + } + } + + public bool TrySendBySessionId(string sessionId, byte[] data) + { + if (Sessions.TryGetValue(sessionId, out var session)) + { + if(session.TransportProtocolType== JT808TransportProtocolType.tcp) + { + session.Client.Send(data, SocketFlags.None); + } + else + { + session.Client.SendTo(data, SocketFlags.None, session.RemoteEndPoint); + } + return true; + } + else + { + return false; + } + } + + public void RemoveByTerminalPhoneNo(string terminalPhoneNo) + { + if (TerminalPhoneNoSessions.TryGetValue(terminalPhoneNo, out var removeSessionId)) + { + // 处理转发过来的是数据 这时候通道对设备是1对多关系,需要清理垃圾数据 + //1.用当前会话的通道Id找出通过转发过来的其他设备的终端号 + var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == removeSessionId).Select(s => s.Key).ToList(); + //2.存在则一个个移除 + string tmpTerminalPhoneNo = terminalPhoneNo; + if (terminalPhoneNos.Count > 0) + { + //3.移除包括当前的设备号 + foreach (var item in terminalPhoneNos) + { + TerminalPhoneNoSessions.TryRemove(item, out _); + } + tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); + } + if (Sessions.TryRemove(removeSessionId, out var removeSession)) + { + removeSession.Close(); + if(logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}"); + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo); + } + } + } + + public void RemoveBySessionId(string sessionId) + { + if(Sessions.TryRemove(sessionId,out var removeSession)) + { + var terminalPhoneNos = TerminalPhoneNoSessions.Where(w => w.Value == sessionId).Select(s => s.Key).ToList(); + if (terminalPhoneNos.Count > 0) + { + foreach (var item in terminalPhoneNos) + { + TerminalPhoneNoSessions.TryRemove(item, out _); + } + var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos); + JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOffline, tmpTerminalPhoneNo); + if (logger.IsEnabled(LogLevel.Information)) + logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}"); + } + removeSession.Close(); + } + } + + public List GetTcpAll() + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.tcp).Select(s => (JT808TcpSession)s.Value).ToList(); + } + + public List GetUdpAll() + { + return Sessions.Where(w => w.Value.TransportProtocolType == JT808TransportProtocolType.udp).Select(s => (JT808UdpSession)s.Value).ToList(); + } + } +} diff --git a/src/JT808.Gateway/Session/JT808TcpSession.cs b/src/JT808.Gateway/Session/JT808TcpSession.cs new file mode 100644 index 0000000..70d044a --- /dev/null +++ b/src/JT808.Gateway/Session/JT808TcpSession.cs @@ -0,0 +1,47 @@ +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Interfaces; +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace JT808.Gateway.Session +{ + public class JT808TcpSession: IJT808Session + { + public JT808TcpSession(Socket client) + { + Client = client; + RemoteEndPoint = client.RemoteEndPoint; + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionID = Guid.NewGuid().ToString("N"); + ReceiveTimeout = new CancellationTokenSource(); + } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + public DateTime ActiveTime { get; set; } + public DateTime StartTime { get; set; } + public JT808TransportProtocolType TransportProtocolType { get;} = JT808TransportProtocolType.tcp; + public string SessionID { get; } + public Socket Client { get; set; } + public CancellationTokenSource ReceiveTimeout { get; set; } + public EndPoint RemoteEndPoint { get ; set; } + + public void Close() + { + try + { + Client.Shutdown(SocketShutdown.Both); + } + catch { } + finally + { + Client.Close(); + } + } + } +} diff --git a/src/JT808.Gateway/Session/JT808UdpSession.cs b/src/JT808.Gateway/Session/JT808UdpSession.cs new file mode 100644 index 0000000..bdd4c87 --- /dev/null +++ b/src/JT808.Gateway/Session/JT808UdpSession.cs @@ -0,0 +1,41 @@ +using JT808.Gateway.Abstractions.Enums; +using JT808.Gateway.Interfaces; +using System; +using System.Net; +using System.Net.Sockets; +using System.Threading; + +namespace JT808.Gateway.Session +{ + public class JT808UdpSession: IJT808Session + { + public JT808UdpSession(Socket socket, EndPoint sender) + { + ActiveTime = DateTime.Now; + StartTime = DateTime.Now; + SessionID = Guid.NewGuid().ToString("N"); + ReceiveTimeout = new CancellationTokenSource(); + RemoteEndPoint = sender; + Client = socket; + } + + /// + /// 终端手机号 + /// + public string TerminalPhoneNo { get; set; } + public DateTime ActiveTime { get; set; } + public DateTime StartTime { get; set; } + public JT808TransportProtocolType TransportProtocolType { get; set; } = JT808TransportProtocolType.udp; + + public string SessionID { get; } + + public Socket Client { get; set; } + public CancellationTokenSource ReceiveTimeout { get; set; } + public EndPoint RemoteEndPoint { get; set ; } + + public void Close() + { + + } + } +}