浏览代码

1.增加简单的内存队列实现各个服务

2.将流量统计换成接口方式,使用内存作为默认实现
3.修改文档
tags/pipeline-1.0.0
smallchi(Koike) 5 年前
父节点
当前提交
136132548c
共有 56 个文件被更改,包括 1719 次插入146 次删除
  1. +18
    -18
      README.md
  2. +18
    -0
      src/JT808.Gateway.Abstractions/Enums/JT808ConsumerType.cs
  3. +12
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgConsumerFactory.cs
  4. +13
    -0
      src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumerFactory.cs
  5. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgIdHandlerConsumer.cs
  6. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgLoggingConsumer.cs
  7. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageConsumer.cs
  8. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageLoggingConsumer.cs
  9. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTrafficConsumer.cs
  10. +60
    -0
      src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTransmitConsumer.cs
  11. +13
    -1
      src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs
  12. +23
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerFactory.cs
  13. +32
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerInMemoryHostedService.cs
  14. +23
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumerFactory.cs
  15. +17
    -1
      src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs
  16. +339
    -1
      src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs
  17. +63
    -0
      src/JT808.Gateway.InMemoryMQ/JT808SessionConsumer.cs
  18. +30
    -0
      src/JT808.Gateway.InMemoryMQ/JT808SessionProducer.cs
  19. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgIdHandlerService.cs
  20. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgLoggingService.cs
  21. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageLoggingService.cs
  22. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageService.cs
  23. +2
    -17
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs
  24. +29
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgServiceBase.cs
  25. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTrafficService.cs
  26. +14
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTransmitService.cs
  27. +2
    -16
      src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs
  28. +28
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808SessionService.cs
  29. +9
    -1
      src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs
  30. +35
    -0
      src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerInMemoryHostedService.cs
  31. +37
    -0
      src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgDownLoggingInMemoryHostedService.cs
  32. +10
    -1
      src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs
  33. +37
    -0
      src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgUpLoggingInMemoryHostedService.cs
  34. +5
    -5
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs
  35. +35
    -0
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageInMemoryHostedService.cs
  36. +9
    -9
      src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs
  37. +37
    -0
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/IJT808Traffic.cs
  38. +0
    -4
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj
  39. +0
    -32
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficService.cs
  40. +21
    -8
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs
  41. +5
    -4
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs
  42. +40
    -0
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceInMemoryHostedService.cs
  43. +0
    -9
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/TrafficRedisClient.cs
  44. +8
    -8
      src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs
  45. +34
    -0
      src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitInMemoryHostedService.cs
  46. +3
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj
  47. +124
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808MsgProducerTest.cs
  48. +32
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808SessionServiceTest.cs
  49. +22
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgIdHandler.cs
  50. +22
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgLogging.cs
  51. +5
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj
  52. +49
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/TrafficJob.cs
  53. +21
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs
  54. +1
    -1
      src/JT808.Gateway.sln
  55. +11
    -7
      src/JT808.Gateway/JT808TcpServer.cs
  56. +1
    -1
      src/Version.props

+ 18
- 18
README.md 查看文件

@@ -1,9 +1,9 @@
# JT808Gateway

基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理

基于Pipeline封装的JT808Pipeline支持TCP/UDP通用消息业务处理

基于DotNetty封装的JT808DotNetty支持TCP/UDP通用消息业务处理

[了解JT808协议进这边](https://github.com/SmallChi/JT808)

[了解JT809协议进这边](https://github.com/SmallChi/JT809)
@@ -59,6 +59,22 @@

[GRPC消息业务处理协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto)

## 基于core 3.1 Pipeline的NuGet安装

| Package Name | Version | Downloads |
| --------------------- | -------------------------------------------------- | --------------------------------------------------- |
| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) |
| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) |
| Install-Package JT808.Gateway.Client| ![JT808.Gateway.Client](https://img.shields.io/nuget/v/JT808.Gateway.Client.svg) | ![JT808.Gateway.Client](https://img.shields.io/nuget/dt/JT808.Gateway.Client.svg) |
| Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) |
| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) |
| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) |
| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)|
| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)|
| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)|
| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)|
| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)|

## 基于DotNetty的NuGet安装

| Package Name | Version | Downloads |
@@ -79,22 +95,6 @@
| Install-Package JT808.DotNetty.Kafka | ![JT808.DotNetty.Kafka](https://img.shields.io/nuget/v/JT808.DotNetty.Kafka.svg) | ![JT808.DotNetty.Kafka](https://img.shields.io/nuget/dt/JT808.DotNetty.Kafka.svg) |
| Install-Package JT808.DotNetty.RabbitMQ | ![JT808.DotNetty.RabbitMQ](https://img.shields.io/nuget/v/JT808.DotNetty.RabbitMQ.svg) | ![JT808.DotNetty.RabbitMQ](https://img.shields.io/nuget/dt/JT808.DotNetty.RabbitMQ.svg) |

## 基于core 3.1 Pipeline的NuGet安装

| Package Name | Version | Downloads |
| --------------------- | -------------------------------------------------- | --------------------------------------------------- |
| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) |
| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) |
| Install-Package JT808.Gateway.Client| ![JT808.Gateway.Client](https://img.shields.io/nuget/v/JT808.Gateway.Client.svg) | ![JT808.Gateway.Client](https://img.shields.io/nuget/dt/JT808.Gateway.Client.svg) |
| Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) |
| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) |
| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) |
| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)|
| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)|
| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)|
| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)|
| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)|

## 举个栗子

### Pipeline


+ 18
- 0
src/JT808.Gateway.Abstractions/Enums/JT808ConsumerType.cs 查看文件

@@ -0,0 +1,18 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions.Enums
{
[Flags]
public enum JT808ConsumerType:int
{
MsgIdHandlerConsumer=1,
MsgLoggingConsumer=2,
ReplyMessageConsumer=4,
TrafficConsumer=8,
TransmitConsumer=16,
ReplyMessageLoggingConsumer = 32,
All = 64,
}
}

+ 12
- 0
src/JT808.Gateway.Abstractions/IJT808MsgConsumerFactory.cs 查看文件

@@ -0,0 +1,12 @@
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgConsumerFactory
{
IJT808MsgConsumer Create(JT808ConsumerType consumerType);
}
}

+ 13
- 0
src/JT808.Gateway.Abstractions/IJT808MsgReplyConsumerFactory.cs 查看文件

@@ -0,0 +1,13 @@
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.Gateway.Abstractions
{
public interface IJT808MsgReplyConsumerFactory
{
IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType);
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgIdHandlerConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgIdHandlerConsumer : IJT808MsgConsumer
{
private readonly JT808MsgIdHandlerService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgIdHandlerConsumer(
JT808MsgIdHandlerService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgIdHandlerConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgLoggingConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgLoggingConsumer : IJT808MsgConsumer
{
private readonly JT808MsgLoggingService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgLoggingConsumer(
JT808MsgLoggingService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgLoggingConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgReplyMessageConsumer : IJT808MsgConsumer
{
private readonly JT808MsgReplyMessageService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgReplyMessageConsumer(
JT808MsgReplyMessageService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgReplyMessageConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgReplyMessageLoggingConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgReplyMessageLoggingConsumer : IJT808MsgReplyConsumer
{
private readonly JT808MsgReplyMessageLoggingService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgReplyMessageLoggingConsumer(
JT808MsgReplyMessageLoggingService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgReplyMessageLoggingConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTrafficConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgTrafficConsumer : IJT808MsgConsumer
{
private readonly JT808MsgTrafficService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgTrafficConsumer(
JT808MsgTrafficService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgTrafficConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 60
- 0
src/JT808.Gateway.InMemoryMQ/Consumers/JT808MsgTransmitConsumer.cs 查看文件

@@ -0,0 +1,60 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgTransmitConsumer : IJT808MsgConsumer
{
private readonly JT808MsgTransmitService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgTransmitConsumer(
JT808MsgTransmitService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgTransmitConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 13
- 1
src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs 查看文件

@@ -1,4 +1,5 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
@@ -12,14 +13,17 @@ namespace JT808.Gateway.InMemoryMQ
public class JT808MsgConsumer : IJT808MsgConsumer
{
private readonly JT808MsgService JT808MsgService;
private readonly Func<JT808ConsumerType, JT808MsgServiceBase> func;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgConsumer(
Func<JT808ConsumerType, JT808MsgServiceBase> func,
JT808MsgService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
this.func = func;
logger = loggerFactory.CreateLogger("JT808MsgConsumer");
}

@@ -32,7 +36,15 @@ namespace JT808.Gateway.InMemoryMQ
try
{
var item = await JT808MsgService.ReadAsync(Cts.Token);
callback(item);
foreach(var type in JT808ServerInMemoryMQExtensions.ConsumerTypes)
{
var method = func(type);
if (method != null)
{
await method.WriteAsync(item.TerminalNo, item.Data);
}
}
//callback(item);
}
catch(Exception ex)
{


+ 23
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerFactory.cs 查看文件

@@ -0,0 +1,23 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgConsumerFactory : IJT808MsgConsumerFactory
{
private readonly Func<JT808ConsumerType, IJT808MsgConsumer> factory;

public JT808MsgConsumerFactory(Func<JT808ConsumerType, IJT808MsgConsumer> accesor)
{
factory = accesor;
}

public IJT808MsgConsumer Create(JT808ConsumerType consumerType)
{
return factory(consumerType);
}
}
}

+ 32
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgConsumerInMemoryHostedService.cs 查看文件

@@ -0,0 +1,32 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgConsumerInMemoryHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;

public JT808MsgConsumerInMemoryHostedService(
IJT808MsgConsumer jT808MsgConsumer)
{
this.jT808MsgConsumer = jT808MsgConsumer;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(null);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 23
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumerFactory.cs 查看文件

@@ -0,0 +1,23 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgReplyConsumerFactory : IJT808MsgReplyConsumerFactory
{
private readonly Func<JT808ConsumerType, IJT808MsgReplyConsumer> factory;

public JT808MsgReplyConsumerFactory(Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor)
{
factory = accesor;
}

public IJT808MsgReplyConsumer Create(JT808ConsumerType consumerType)
{
return factory(consumerType);
}
}
}

+ 17
- 1
src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs 查看文件

@@ -1,4 +1,5 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.InMemoryMQ.Services;
using System;
using System.Collections.Generic;
@@ -10,14 +11,29 @@ namespace JT808.Gateway.InMemoryMQ
public class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
public string TopicName => JT808GatewayConstants.MsgReplyTopic;

//JT808ServerInMemoryMQExtensions
private readonly JT808ReplyMsgService JT808ReplyMsgService;
public JT808MsgReplyProducer(JT808ReplyMsgService jT808ReplyMsgService)

private readonly Func<JT808ConsumerType, JT808MsgServiceBase> func;
public JT808MsgReplyProducer(
Func<JT808ConsumerType, JT808MsgServiceBase> func,
JT808ReplyMsgService jT808ReplyMsgService)
{
this.func = func;
JT808ReplyMsgService = jT808ReplyMsgService;
}
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await JT808ReplyMsgService.WriteAsync(terminalNo, data);
if (JT808ServerInMemoryMQExtensions.ReplyMessageLoggingConsumer.HasValue)
{
var method = func(JT808ConsumerType.ReplyMessageLoggingConsumer);
if (method != null)
{
await method.WriteAsync(terminalNo, data);
}
}
}
public void Dispose()
{


+ 339
- 1
src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs 查看文件

@@ -1,25 +1,363 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Runtime.CompilerServices;

[assembly: InternalsVisibleTo("JT808.Gateway.InMemoryMQ.Test")]
namespace JT808.Gateway.InMemoryMQ
{
public static class JT808ServerInMemoryMQExtensions
{
internal static List<JT808ConsumerType> ConsumerTypes { get; private set; }

static JT808ServerInMemoryMQExtensions()
{
ConsumerTypes = new List<JT808ConsumerType>();
}

internal static JT808ConsumerType? ReplyMessageLoggingConsumer { get; private set; }

/// <summary>
///
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder, JT808ConsumerType consumerType)
{
if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All)
{
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer);
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer);
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer);
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer);
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer);
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageLoggingConsumer);
}
else
{
if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer);
}
if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer);
}
if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer);
}
if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer);
}
if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer);
}
if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer)
{
//
ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer;
}
}
jT808GatewayBuilder.AddServerInMemoryConsumers();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMsgService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgConsumerInMemoryHostedService>();
return jT808GatewayBuilder;
}

/// <summary>
///
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder)
public static IJT808GatewayBuilder AddServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder,params JT808ConsumerType[] consumerTypes)
{
if (consumerTypes == null)
{
throw new ArgumentNullException("消费类型不为空!");
}
ConsumerTypes = consumerTypes.ToList();
jT808GatewayBuilder.AddServerInMemoryConsumers();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMsgService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgProducer, JT808MsgProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgConsumerInMemoryHostedService>();
return jT808GatewayBuilder;
}

/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <returns></returns>
internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, JT808ConsumerType consumerType)
{
if ((consumerType & JT808ConsumerType.All) == JT808ConsumerType.All)
{
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer);
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer);
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer);
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer);
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer);
}
else
{
if ((consumerType & JT808ConsumerType.MsgLoggingConsumer) == JT808ConsumerType.MsgLoggingConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.MsgLoggingConsumer);
}
if ((consumerType & JT808ConsumerType.MsgIdHandlerConsumer) == JT808ConsumerType.MsgIdHandlerConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.MsgIdHandlerConsumer);
}
if ((consumerType & JT808ConsumerType.ReplyMessageConsumer) == JT808ConsumerType.ReplyMessageConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.ReplyMessageConsumer);
}
if ((consumerType & JT808ConsumerType.TrafficConsumer) == JT808ConsumerType.TrafficConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.TrafficConsumer);
}
if ((consumerType & JT808ConsumerType.TransmitConsumer) == JT808ConsumerType.TransmitConsumer)
{
ConsumerTypes.Add(JT808ConsumerType.TransmitConsumer);
}
if ((consumerType & JT808ConsumerType.ReplyMessageLoggingConsumer) == JT808ConsumerType.ReplyMessageLoggingConsumer)
{
ReplyMessageLoggingConsumer = JT808ConsumerType.ReplyMessageLoggingConsumer;
}
}
serviceDescriptors.AddServerInMemoryConsumers();
serviceDescriptors.AddSingleton<JT808MsgService>();
serviceDescriptors.AddSingleton<JT808ReplyMsgService>();
serviceDescriptors.AddSingleton<JT808SessionService>();
serviceDescriptors.AddSingleton<IJT808MsgProducer, JT808MsgProducer>();
serviceDescriptors.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
serviceDescriptors.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>();
serviceDescriptors.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
serviceDescriptors.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
serviceDescriptors.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>();
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>();
serviceDescriptors.AddHostedService<JT808MsgConsumerInMemoryHostedService>();
return serviceDescriptors;
}

/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <returns></returns>
internal static IServiceCollection AddServerInMemoryMQ(this IServiceCollection serviceDescriptors, params JT808ConsumerType[] consumerTypes)
{
if (consumerTypes == null)
{
throw new ArgumentNullException("消费类型不为空!");
}
ConsumerTypes = consumerTypes.ToList();
serviceDescriptors.AddServerInMemoryConsumers();
serviceDescriptors.AddSingleton<JT808MsgService>();
serviceDescriptors.AddSingleton<JT808ReplyMsgService>();
serviceDescriptors.AddSingleton<JT808SessionService>();
serviceDescriptors.AddSingleton<IJT808MsgProducer, JT808MsgProducer>();
serviceDescriptors.AddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
serviceDescriptors.AddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumer>();
serviceDescriptors.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
serviceDescriptors.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
serviceDescriptors.AddSingleton<IJT808MsgConsumerFactory, JT808MsgConsumerFactory>();
serviceDescriptors.AddSingleton<IJT808MsgReplyConsumerFactory, JT808MsgReplyConsumerFactory>();
serviceDescriptors.AddHostedService<JT808MsgConsumerInMemoryHostedService>();
return serviceDescriptors;
}


/// <summary>
///
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
private static IJT808GatewayBuilder AddServerInMemoryConsumers(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgIdHandlerService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgLoggingService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageLoggingService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTrafficService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTransmitService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgIdHandlerConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgLoggingConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTrafficConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgTransmitConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgReplyMessageLoggingConsumer>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) =>
{
Func<JT808ConsumerType, IJT808MsgConsumer> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.MsgIdHandlerConsumer:
return factory.GetRequiredService<JT808MsgIdHandlerConsumer>();
case JT808ConsumerType.MsgLoggingConsumer:
return factory.GetRequiredService<JT808MsgLoggingConsumer>();
case JT808ConsumerType.TrafficConsumer:
return factory.GetRequiredService<JT808MsgTrafficConsumer>();
case JT808ConsumerType.TransmitConsumer:
return factory.GetRequiredService<JT808MsgTransmitConsumer>();
case JT808ConsumerType.ReplyMessageConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageConsumer>();
default:
return default;
}
};
return accesor;
});
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) =>
{
Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.ReplyMessageLoggingConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageLoggingConsumer>();
default:
return default;
}
};
return accesor;
});
jT808GatewayBuilder.JT808Builder.Services.AddSingleton((factory) =>
{
Func<JT808ConsumerType, JT808MsgServiceBase> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.MsgIdHandlerConsumer:
return factory.GetRequiredService<JT808MsgIdHandlerService>();
case JT808ConsumerType.MsgLoggingConsumer:
return factory.GetRequiredService<JT808MsgLoggingService>();
case JT808ConsumerType.TrafficConsumer:
return factory.GetRequiredService<JT808MsgTrafficService>();
case JT808ConsumerType.TransmitConsumer:
return factory.GetRequiredService<JT808MsgTransmitService>();
case JT808ConsumerType.ReplyMessageConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageService>();
case JT808ConsumerType.ReplyMessageLoggingConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageLoggingService>();
default:
return default;
}
};
return accesor;
});
return jT808GatewayBuilder;
}

/// <summary>
///
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
private static IServiceCollection AddServerInMemoryConsumers(this IServiceCollection serviceDescriptors)
{
serviceDescriptors.AddSingleton<JT808MsgIdHandlerService>();
serviceDescriptors.AddSingleton<JT808MsgLoggingService>();
serviceDescriptors.AddSingleton<JT808MsgReplyMessageService>();
serviceDescriptors.AddSingleton<JT808MsgReplyMessageLoggingService>();
serviceDescriptors.AddSingleton<JT808MsgTrafficService>();
serviceDescriptors.AddSingleton<JT808MsgTransmitService>();
serviceDescriptors.AddSingleton<JT808MsgIdHandlerConsumer>();
serviceDescriptors.AddSingleton<JT808MsgLoggingConsumer>();
serviceDescriptors.AddSingleton<JT808MsgReplyMessageConsumer>();
serviceDescriptors.AddSingleton<JT808MsgTrafficConsumer>();
serviceDescriptors.AddSingleton<JT808MsgTransmitConsumer>();
serviceDescriptors.AddSingleton<JT808MsgReplyMessageLoggingConsumer>();
serviceDescriptors.AddSingleton((factory) =>
{
Func<JT808ConsumerType, IJT808MsgConsumer> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.MsgIdHandlerConsumer:
return factory.GetRequiredService<JT808MsgIdHandlerConsumer>();
case JT808ConsumerType.MsgLoggingConsumer:
return factory.GetRequiredService<JT808MsgLoggingConsumer>();
case JT808ConsumerType.TrafficConsumer:
return factory.GetRequiredService<JT808MsgTrafficConsumer>();
case JT808ConsumerType.TransmitConsumer:
return factory.GetRequiredService<JT808MsgTransmitConsumer>();
case JT808ConsumerType.ReplyMessageConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageConsumer>();
default:
return default;
}
};
return accesor;
});
serviceDescriptors.AddSingleton((factory) =>
{
Func<JT808ConsumerType, IJT808MsgReplyConsumer> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.ReplyMessageLoggingConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageLoggingConsumer>();
default:
return default;
}
};
return accesor;
});
serviceDescriptors.AddSingleton((factory) =>
{
Func<JT808ConsumerType, JT808MsgServiceBase> accesor = type =>
{
switch (type)
{
case JT808ConsumerType.MsgIdHandlerConsumer:
return factory.GetRequiredService<JT808MsgIdHandlerService>();
case JT808ConsumerType.MsgLoggingConsumer:
return factory.GetRequiredService<JT808MsgLoggingService>();
case JT808ConsumerType.TrafficConsumer:
return factory.GetRequiredService<JT808MsgTrafficService>();
case JT808ConsumerType.TransmitConsumer:
return factory.GetRequiredService<JT808MsgTransmitService>();
case JT808ConsumerType.ReplyMessageConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageService>();
case JT808ConsumerType.ReplyMessageLoggingConsumer:
return factory.GetRequiredService<JT808MsgReplyMessageLoggingService>();
default:
return default;
}
};
return accesor;
});
return serviceDescriptors;
}
}
}

+ 63
- 0
src/JT808.Gateway.InMemoryMQ/JT808SessionConsumer.cs 查看文件

@@ -0,0 +1,63 @@
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.InMemoryMQ.Services;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808SessionConsumer : IJT808SessionConsumer
{
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly ILogger logger;

public string TopicName { get; } = JT808GatewayConstants.SessionTopic;

private readonly JT808SessionService JT808SessionService;
public JT808SessionConsumer(
JT808SessionService jT808SessionService,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT808SessionConsumer");
JT808SessionService = jT808SessionService;
}

public void OnMessage(Action<(string Notice, string TerminalNo)> callback)
{
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808SessionService.ReadAsync(Cts.Token);
callback(item);
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}

public void Subscribe()
{

}
}
}

+ 30
- 0
src/JT808.Gateway.InMemoryMQ/JT808SessionProducer.cs 查看文件

@@ -0,0 +1,30 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808SessionProducer : IJT808SessionProducer
{
public string TopicName { get; } = JT808GatewayConstants.SessionTopic;

private readonly JT808SessionService JT808SessionService;

public JT808SessionProducer(JT808SessionService jT808SessionService)
{
JT808SessionService = jT808SessionService;
}

public async ValueTask ProduceAsync(string notice,string terminalNo)
{
await JT808SessionService.WriteAsync(notice, terminalNo);
}

public void Dispose()
{
}
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgIdHandlerService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgIdHandlerService: JT808MsgServiceBase
{
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgLoggingService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgLoggingService: JT808MsgServiceBase
{
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageLoggingService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgReplyMessageLoggingService : JT808MsgServiceBase
{
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgReplyMessageService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgReplyMessageService: JT808MsgServiceBase
{
}
}

+ 2
- 17
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs 查看文件

@@ -7,23 +7,8 @@ using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgService
public class JT808MsgService: JT808MsgServiceBase
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808MsgService()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] data)
{
await _channel.Writer.WriteAsync((terminalNo, data));
}

public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 29
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgServiceBase.cs 查看文件

@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgServiceBase
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808MsgServiceBase()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] data)
{
await _channel.Writer.WriteAsync((terminalNo, data));
}

public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTrafficService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgTrafficService: JT808MsgServiceBase
{
}
}

+ 14
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808MsgTransmitService.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808MsgTransmitService: JT808MsgServiceBase
{
}
}

+ 2
- 16
src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs 查看文件

@@ -7,22 +7,8 @@ using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808ReplyMsgService
public class JT808ReplyMsgService: JT808MsgServiceBase
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808ReplyMsgService()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] data)
{
await _channel.Writer.WriteAsync((terminalNo, data));
}
public async ValueTask<(string TerminalNo, byte[] Data)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 28
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808SessionService.cs 查看文件

@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808SessionService
{
private readonly Channel<(string Notice, string TerminalNo)> _channel;

public JT808SessionService()
{
_channel = Channel.CreateUnbounded<(string Notice, string TerminalNo)>();
}

public async ValueTask WriteAsync(string notice, string terminalNo)
{
await _channel.Writer.WriteAsync((notice, terminalNo));
}
public async ValueTask<(string Notice, string TerminalNo)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 9
- 1
src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs 查看文件

@@ -8,12 +8,20 @@ namespace JT808.Gateway.MsgIdHandler
{
public static class JT808MsgIdHandlerExtensions
{
public static IJT808ClientBuilder AddJT808MsgIdHandler<TJT808MsgIdHandler>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddMsgIdHandler<TJT808MsgIdHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808MsgIdHandler: IJT808MsgIdHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler),typeof(TJT808MsgIdHandler));
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerHostedService>();
return jT808ClientBuilder;
}

public static IJT808GatewayBuilder AddInMemoryMsgIdHandler<TJT808MsgIdHandler>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TJT808MsgIdHandler : IJT808MsgIdHandler
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler), typeof(TJT808MsgIdHandler));
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerInMemoryHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 35
- 0
src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerInMemoryHostedService.cs 查看文件

@@ -0,0 +1,35 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.MsgIdHandler
{
public class JT808MsgIdHandlerInMemoryHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;

private readonly IJT808MsgIdHandler jT808MsgIdHandler;
public JT808MsgIdHandlerInMemoryHostedService(
IJT808MsgIdHandler jT808MsgIdHandler,
IJT808MsgConsumerFactory jT808MsgConsumerFactory)
{
this.jT808MsgIdHandler = jT808MsgIdHandler;
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer);
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(jT808MsgIdHandler.Processor);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 37
- 0
src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgDownLoggingInMemoryHostedService.cs 查看文件

@@ -0,0 +1,37 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.MsgLogging
{
public class JT808MsgDownLoggingInMemoryHostedService : IHostedService
{
private readonly IJT808MsgReplyConsumer jT808MsgReplyConsumer;
private readonly IJT808MsgLogging jT808MsgLogging;
public JT808MsgDownLoggingInMemoryHostedService(
IJT808MsgLogging jT808MsgLogging,
IJT808MsgReplyConsumerFactory jT808MsgReplyConsumerFactory)
{
this.jT808MsgReplyConsumer = jT808MsgReplyConsumerFactory.Create(JT808ConsumerType.ReplyMessageLoggingConsumer);
this.jT808MsgLogging = jT808MsgLogging;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgReplyConsumer.Subscribe();
jT808MsgReplyConsumer.OnMessage(item=>
{
jT808MsgLogging.Processor(item, JT808MsgLoggingType.down);
});
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgReplyConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 10
- 1
src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgLoggingExtensions.cs 查看文件

@@ -8,7 +8,7 @@ namespace JT808.Gateway.MsgLogging
{
public static class JT808MsgLoggingExtensions
{
public static IJT808ClientBuilder AddJT808MsgLogging<TJT808MsgLogging>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddMsgLogging<TJT808MsgLogging>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808MsgLogging: IJT808MsgLogging
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging),typeof(TJT808MsgLogging));
@@ -16,5 +16,14 @@ namespace JT808.Gateway.MsgLogging
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808MsgUpLoggingHostedService>();
return jT808ClientBuilder;
}

public static IJT808GatewayBuilder AddInMemoryMsgLogging<TJT808MsgLogging>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TJT808MsgLogging : IJT808MsgLogging
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgLogging), typeof(TJT808MsgLogging));
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgDownLoggingInMemoryHostedService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808MsgUpLoggingInMemoryHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 37
- 0
src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808MsgUpLoggingInMemoryHostedService.cs 查看文件

@@ -0,0 +1,37 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.MsgLogging
{
public class JT808MsgUpLoggingInMemoryHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;
private readonly IJT808MsgLogging jT808MsgLogging;
public JT808MsgUpLoggingInMemoryHostedService(
IJT808MsgLogging jT808MsgLogging,
IJT808MsgConsumerFactory jT808MsgConsumerFactory)
{
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer);
this.jT808MsgLogging = jT808MsgLogging;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(item=>
{
jT808MsgLogging.Processor(item, JT808MsgLoggingType.up);
});
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 5
- 5
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs 查看文件

@@ -14,7 +14,7 @@ namespace JT808.Gateway.ReplyMessage
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddInPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
@@ -26,7 +26,7 @@ namespace JT808.Gateway.ReplyMessage
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugReplyMessage<TReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddInPlugReplyMessage<TReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageHandler : JT808ReplyMessageHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>();
@@ -39,11 +39,11 @@ namespace JT808.Gateway.ReplyMessage
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808InMemoryReplyMessage<TReplyMessageHandler>(this IJT808GatewayBuilder jT808GatewayBuilder)
public static IJT808GatewayBuilder AddInMemoryReplyMessage<TReplyMessageHandler>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TReplyMessageHandler : JT808ReplyMessageHandler
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageInMemoryHostedService>();
return jT808GatewayBuilder;
}
/// <summary>
@@ -54,7 +54,7 @@ namespace JT808.Gateway.ReplyMessage
public static IJT808GatewayBuilder AddInMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageInMemoryHostedService>();
return jT808GatewayBuilder;
}
}


+ 35
- 0
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageInMemoryHostedService.cs 查看文件

@@ -0,0 +1,35 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.ReplyMessage
{
public class JT808ReplyMessageInMemoryHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;
private readonly JT808ReplyMessageHandler jT808ReplyMessageHandler;

public JT808ReplyMessageInMemoryHostedService(
JT808ReplyMessageHandler jT808ReplyMessageHandler,
IJT808MsgConsumerFactory jT808MsgConsumerFactory)
{
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer);
this.jT808ReplyMessageHandler = jT808ReplyMessageHandler;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(jT808ReplyMessageHandler.Processor);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 9
- 9
src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs 查看文件

@@ -14,7 +14,7 @@ namespace JT808.Gateway.SessionNotice
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddInPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
@@ -27,7 +27,7 @@ namespace JT808.Gateway.SessionNotice
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugSessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddInPlugSessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder)
where TSessionNoticeService : JT808SessionNoticeService
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService,TSessionNoticeService>();
@@ -41,7 +41,7 @@ namespace JT808.Gateway.SessionNotice
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808InMemorySessionNotice<TSessionNoticeService>(this IJT808GatewayBuilder jT808GatewayBuilder)
public static IJT808GatewayBuilder AddInMemorySessionNotice<TSessionNoticeService>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TSessionNoticeService : JT808SessionNoticeService
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService, TSessionNoticeService>();
@@ -54,11 +54,11 @@ namespace JT808.Gateway.SessionNotice
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
//public static IJT808GatewayBuilder AddJT808InMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder)
//{
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
// return jT808GatewayBuilder;
//}
public static IJT808GatewayBuilder AddInMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 37
- 0
src/JT808.Gateway.Services/JT808.Gateway.Traffic/IJT808Traffic.cs 查看文件

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Collections.Concurrent;
using System.Linq;

namespace JT808.Gateway.Traffic
{
public interface IJT808Traffic
{
long Get(string key);
long Increment(string terminalNo, string field, int len);
List<(string,long)> GetAll();
}

public class JT808TrafficDefault : IJT808Traffic
{
private ConcurrentDictionary<string, long> dict = new ConcurrentDictionary<string, long>();

public long Get(string key)
{
long value;
dict.TryGetValue(key, out value);
return value;
}

public List<(string, long)> GetAll()
{
return dict.Select(s => (s.Key, s.Value)).ToList();
}

public long Increment(string terminalNo, string field, int len)
{
return dict.AddOrUpdate($"{terminalNo}_{field}", len, (id, count) => count + len);
}
}
}

+ 0
- 4
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj 查看文件

@@ -20,10 +20,6 @@
<PackageReleaseNotes>基于JT808设备流量统计服务</PackageReleaseNotes>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
<PackageReference Include="CSRedisCore" Version="3.3.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>


+ 0
- 32
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficService.cs 查看文件

@@ -1,32 +0,0 @@
using Microsoft.Extensions.Configuration;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Traffic
{
public class JT808TrafficService:IDisposable
{
private readonly CSRedis.CSRedisClient redisClien;
public JT808TrafficService(IConfiguration configuration)
{
redisClien = new CSRedis.CSRedisClient(configuration.GetConnectionString("TrafficRedisHost"));
TrafficRedisClient.Initialization(redisClien);
}

public void Dispose()
{
redisClien.Dispose();
}

/// <summary>
/// 按设备每天统计sim卡流量
/// </summary>
/// <param name="terminalNo"></param>
/// <param name="len"></param>
public void Processor(string terminalNo,int len)
{
TrafficRedisClient.HIncrBy(terminalNo, DateTime.Now.ToString("yyyyMMdd"), len);
}
}
}

+ 21
- 8
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs 查看文件

@@ -13,9 +13,10 @@ namespace JT808.Gateway.Traffic
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugTraffic(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddInPlugTraffic<TIJT808Traffic>(this IJT808ClientBuilder jT808ClientBuilder)
where TIJT808Traffic:IJT808Traffic
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TrafficService>();
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic));
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>();
return jT808ClientBuilder;
}
@@ -25,11 +26,23 @@ namespace JT808.Gateway.Traffic
/// <typeparam name="TReplyMessageService"></typeparam>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
//public static IJT808GatewayBuilder AddJT808InMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder)
//{
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TrafficService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>();
// return jT808GatewayBuilder;
//}
public static IJT808GatewayBuilder AddInMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(JT808TrafficDefault));
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceInMemoryHostedService>();
return jT808GatewayBuilder;
}
/// <summary>
/// 消息流量统计服务(消费者单实例)
/// </summary>
/// <typeparam name="TReplyMessageService"></typeparam>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddInMemoryTraffic<TIJT808Traffic>(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808Traffic), typeof(TIJT808Traffic));
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceInMemoryHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 5
- 4
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs 查看文件

@@ -3,20 +3,21 @@ using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Protocol.Extensions;
using JT808.Gateway.Abstractions;
using System;

namespace JT808.Gateway.Traffic
{
public class JT808TrafficServiceHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;
private readonly JT808TrafficService jT808TrafficService;
private readonly IJT808Traffic jT808Traffic;

public JT808TrafficServiceHostedService(
JT808TrafficService jT808TrafficService,
IJT808Traffic jT808Traffic,
IJT808MsgConsumer jT808MsgConsumer)
{
this.jT808MsgConsumer = jT808MsgConsumer;
this.jT808TrafficService = jT808TrafficService;
this.jT808Traffic = jT808Traffic;
}

public Task StartAsync(CancellationToken cancellationToken)
@@ -24,7 +25,7 @@ namespace JT808.Gateway.Traffic
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage((item)=> {
//string str = item.Data.ToHexString();
jT808TrafficService.Processor(item.TerminalNo, item.Data.Length);
jT808Traffic.Increment(item.TerminalNo,DateTime.Now.ToString("yyyyMMdd"), item.Data.Length);
});
return Task.CompletedTask;
}


+ 40
- 0
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceInMemoryHostedService.cs 查看文件

@@ -0,0 +1,40 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Protocol.Extensions;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using System;

namespace JT808.Gateway.Traffic
{
public class JT808TrafficServiceInMemoryHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;
private readonly IJT808Traffic jT808Traffic;

public JT808TrafficServiceInMemoryHostedService(
IJT808Traffic jT808Traffic,
IJT808MsgConsumerFactory jT808MsgConsumerFactory)
{
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TrafficConsumer);
this.jT808Traffic = jT808Traffic;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage((item)=> {
//string str = item.Data.ToHexString();
jT808Traffic.Increment(item.TerminalNo, DateTime.Now.ToString("yyyyMMdd"), item.Data.Length);
});
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 0
- 9
src/JT808.Gateway.Services/JT808.Gateway.Traffic/TrafficRedisClient.cs 查看文件

@@ -1,9 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Traffic
{
class TrafficRedisClient: RedisHelper<TrafficRedisClient>
{ }
}

+ 8
- 8
src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs 查看文件

@@ -17,7 +17,7 @@ namespace JT808.Gateway.Transmit
/// <param name="jT808ClientBuilder"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddJT808InPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration)
public static IJT808ClientBuilder AddInPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
@@ -30,12 +30,12 @@ namespace JT808.Gateway.Transmit
/// <param name="jT808GatewayBuilder"></param>
/// <param name="configuration"></param>
/// <returns></returns>
//public static IJT808GatewayBuilder AddJT808InMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
//{
// jT808GatewayBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TransmitHostedService>();
// return jT808GatewayBuilder;
//}
public static IJT808GatewayBuilder AddInMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
{
jT808GatewayBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TransmitInMemoryHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 34
- 0
src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitInMemoryHostedService.cs 查看文件

@@ -0,0 +1,34 @@
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;

namespace JT808.Gateway.Transmit
{
public class JT808TransmitInMemoryHostedService : IHostedService
{
private readonly JT808TransmitService jT808TransmitService;
private readonly IJT808MsgConsumer jT808MsgConsumer;
public JT808TransmitInMemoryHostedService(
IJT808MsgConsumerFactory jT808MsgConsumerFactory,
JT808TransmitService jT808TransmitService)
{
this.jT808TransmitService = jT808TransmitService;
this.jT808MsgConsumer = jT808MsgConsumerFactory.Create(JT808ConsumerType.TransmitConsumer);
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(jT808TransmitService.Send);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Unsubscribe();
return Task.CompletedTask;
}
}
}

+ 3
- 0
src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808.Gateway.InMemoryMQ.Test.csproj 查看文件

@@ -7,6 +7,8 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging" Version="3.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.2.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
@@ -15,6 +17,7 @@

<ItemGroup>
<ProjectReference Include="..\..\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj" />
<ProjectReference Include="..\..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>

</Project>

+ 124
- 0
src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/JT808MsgProducerTest.cs 查看文件

@@ -0,0 +1,124 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Internal;
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using Xunit;

namespace JT808.Gateway.InMemoryMQ.Test
{
public class JT808MsgProducerTest
{
[Fact]
public void Test1()
{
IServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer | JT808ConsumerType.ReplyMessageConsumer);
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider();
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>();
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 });
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>();
consumer.OnMessage((item) => {
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>();
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer);
msgIdHandlerConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer);
replyMessageConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
}

[Fact]
public void Test2()
{
IServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer);
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider();
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>();
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 });
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>();
consumer.OnMessage((item) => {
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>();
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer);
msgIdHandlerConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer);
replyMessageConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
}

[Fact]
public void Test3()
{
IServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddServerInMemoryMQ(JT808ConsumerType.All);
IServiceProvider serviceProvider = serviceDescriptors.BuildServiceProvider();
IJT808MsgProducer producer = serviceProvider.GetRequiredService<IJT808MsgProducer>();
producer.ProduceAsync("123", new byte[] { 1, 2, 3, 4 });
IJT808MsgConsumer consumer = serviceProvider.GetRequiredService<IJT808MsgConsumer>();
consumer.OnMessage((item) => {
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
IJT808MsgConsumerFactory consumerFactory = serviceProvider.GetRequiredService<IJT808MsgConsumerFactory>();
var msgIdHandlerConsumer = consumerFactory.Create(JT808ConsumerType.MsgIdHandlerConsumer);
msgIdHandlerConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var replyMessageConsumer = consumerFactory.Create(JT808ConsumerType.ReplyMessageConsumer);
replyMessageConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var msgLoggingConsumer = consumerFactory.Create(JT808ConsumerType.MsgLoggingConsumer);
msgLoggingConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var trafficConsumer = consumerFactory.Create(JT808ConsumerType.TrafficConsumer);
trafficConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
var transmitConsumer = consumerFactory.Create(JT808ConsumerType.TransmitConsumer);
transmitConsumer.OnMessage((item) =>
{
Assert.Equal("123", item.TerminalNo);
Assert.Equal(new byte[] { 1, 2, 3, 4 }, item.Data);
});
}
}
}

+ 32
- 0
src/JT808.Gateway.Tests/JT808.Gateway.InMemoryMQ.Test/Services/JT808SessionServiceTest.cs 查看文件

@@ -0,0 +1,32 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using Xunit;

namespace JT808.Gateway.InMemoryMQ.Test.Services
{
public class JT808SessionServiceTest
{
[Fact]
public void Test1()
{
JT808SessionService jT808SessionService = new JT808SessionService();
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456").GetAwaiter().GetResult();
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOffline, "123457").GetAwaiter().GetResult();
jT808SessionService.WriteAsync(JT808GatewayConstants.SessionOnline, "123456,123457").GetAwaiter().GetResult();
var result1 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
var result2 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
var result3 = jT808SessionService.ReadAsync(CancellationToken.None).GetAwaiter().GetResult();
Assert.Equal(JT808GatewayConstants.SessionOnline, result1.Notice);
Assert.Equal("123456", result1.TerminalNo);
Assert.Equal(JT808GatewayConstants.SessionOffline, result2.Notice);
Assert.Equal("123457", result2.TerminalNo);
//转发
Assert.Equal(JT808GatewayConstants.SessionOnline, result3.Notice);
Assert.Equal("123456,123457", result3.TerminalNo);
}
}
}

+ 22
- 0
src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgIdHandler.cs 查看文件

@@ -0,0 +1,22 @@
using JT808.Gateway.MsgIdHandler;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.TestHosting.Impl
{
public class JT808MsgIdHandler : IJT808MsgIdHandler
{
private readonly ILogger Logger;
public JT808MsgIdHandler(ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger("JT808MsgIdHandler");
}
public void Processor((string TerminalNo, byte[] Data) parameter)
{
Logger.LogDebug($"{parameter.TerminalNo}-{parameter.Data.ToHexString()}");
}
}
}

+ 22
- 0
src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Impl/JT808MsgLogging.cs 查看文件

@@ -0,0 +1,22 @@
using JT808.Gateway.MsgLogging;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.TestHosting.Impl
{
public class JT808MsgLogging : IJT808MsgLogging
{
private readonly ILogger Logger;
public JT808MsgLogging(ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger("JT808MsgLogging");
}
public void Processor((string TerminalNo, byte[] Data) parameter, JT808MsgLoggingType jT808MsgLoggingType)
{
Logger.LogDebug($"{jT808MsgLoggingType.ToString()}-{parameter.TerminalNo}-{parameter.Data.ToHexString()}");
}
}
}

+ 5
- 0
src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj 查看文件

@@ -16,6 +16,11 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.MsgIdHandler\JT808.Gateway.MsgIdHandler.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.MsgLogging\JT808.Gateway.MsgLogging.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.SessionNotice\JT808.Gateway.SessionNotice.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.Traffic\JT808.Gateway.Traffic.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.Services\JT808.Gateway.Transmit\JT808.Gateway.Transmit.csproj" />
<ProjectReference Include="..\..\JT808.Gateway\JT808.Gateway.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.Client\JT808.Gateway.Client.csproj" />
<ProjectReference Include="..\..\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj" />


+ 49
- 0
src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Jobs/TrafficJob.cs 查看文件

@@ -0,0 +1,49 @@
using JT808.Gateway.Client;
using JT808.Gateway.Traffic;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.TestHosting.Jobs
{
public class TrafficJob : IHostedService
{
private readonly IJT808Traffic jT808Traffic;
private readonly ILogger Logger;
public TrafficJob(
ILoggerFactory loggerFactory,
IJT808Traffic jT808Traffic)
{
Logger = loggerFactory.CreateLogger("TrafficJob");
this.jT808Traffic = jT808Traffic;
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(2 * 1000);
foreach (var item in jT808Traffic.GetAll())
{
Logger.LogDebug($"{item.Item1}-{item.Item2}");
}
}
}, cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 21
- 2
src/JT808.Gateway.Tests/JT808.Gateway.TestHosting/Program.cs 查看文件

@@ -11,6 +11,13 @@ using JT808.Gateway.Kafka;
using JT808.Gateway.InMemoryMQ;
using JT808.Gateway.ReplyMessage;
using JT808.Gateway.Client;
using JT808.Gateway.SessionNotice;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.MsgIdHandler;
using JT808.Gateway.MsgLogging;
using JT808.Gateway.Traffic;
using JT808.Gateway.Transmit;
using JT808.Gateway.TestHosting.Impl;

namespace JT808.Gateway.TestHosting
{
@@ -48,14 +55,26 @@ namespace JT808.Gateway.TestHosting
.AddTcp()
.AddUdp()
.AddGrpc()
//InMemoryMQ
.AddServerInMemoryMQ()
//InMemoryMQ 按需要加载对应的服务
//注意:不需要的就不用add进来了
.AddServerInMemoryMQ(JT808ConsumerType.All)
//方式1
//.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer| JT808ConsumerType.ReplyMessageConsumer)
//方式2
//.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer,JT808ConsumerType.ReplyMessageConsumer)
.AddInMemoryTraffic()
.AddInMemoryTransmit(hostContext.Configuration)
.AddInMemoryMsgIdHandler<JT808MsgIdHandler>()
.AddInMemoryMsgLogging<JT808MsgLogging>()
.AddInMemorySessionNotice()
.AddInMemoryReplyMessage()
//kafka插件
//.AddServerKafkaMsgProducer(hostContext.Configuration)
//.AddServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddServerKafkaSessionProducer(hostContext.Configuration)
;
//流量统计
//services.AddHostedService<TrafficJob>();
//grpc客户端调用
//services.AddHostedService<CallGrpcClientJob>();
//客户端测试


+ 1
- 1
src/JT808.Gateway.sln 查看文件

@@ -35,7 +35,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.TestHosting", "JT808.Gateway.Tests\JT808.Gateway.TestHosting\JT808.Gateway.TestHosting.csproj", "{69C815FE-3C32-473E-99C9-F3C4B3BCFF81}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.InMemoryMQ.Test", "JT808.Gateway.Tests\JT808.Gateway.InMemoryMQ.Test\JT808.Gateway.InMemoryMQ.Test.csproj", "{E103E89D-3069-4AAE-99CE-2AD633AD351E}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.InMemoryMQ.Test", "JT808.Gateway.Tests\JT808.Gateway.InMemoryMQ.Test\JT808.Gateway.InMemoryMQ.Test.csproj", "{E103E89D-3069-4AAE-99CE-2AD633AD351E}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution


+ 11
- 7
src/JT808.Gateway/JT808TcpServer.cs 查看文件

@@ -45,7 +45,6 @@ namespace JT808.Gateway
IJT808MsgProducer jT808MsgProducer,
JT808AtomicCounterServiceFactory jT808AtomicCounterServiceFactory)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808TcpServer");
Serializer = jT808Config.GetSerializer();
@@ -185,12 +184,17 @@ namespace JT808.Gateway
try
{
contentSpan = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed).FirstSpan;
var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240);
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
//过滤掉不是808标准包(14)
//(头)1+(消息 ID )2+(消息体属性)2+(终端手机号)6+(消息流水号)2+(检验码 )1+(尾)1
if (contentSpan.Length > 14)
{
var package = Serializer.HeaderDeserialize(contentSpan, minBufferSize: 10240);
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
}
catch (JT808Exception ex)
{


+ 1
- 1
src/Version.props 查看文件

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<JT808DotNettyPackageVersion>2.3.1</JT808DotNettyPackageVersion>
<JT808GatewayPackageVersion>1.0.0-preview4</JT808GatewayPackageVersion>
<JT808GatewayPackageVersion>1.0.0-preview5</JT808GatewayPackageVersion>
</PropertyGroup>
</Project>

正在加载...
取消
保存