瀏覽代碼

1.提取内存队列到单独库中

2.修改服务的扩展方法
3.增加github actions配置
4.修改文档
tags/pipeline-1.0.0
smallchi(Koike) 5 年之前
父節點
當前提交
4aa060466d
共有 45 個文件被更改,包括 460 次插入438 次删除
  1. +41
    -0
      .github/workflows/dotnetcore.yml
  2. +20
    -19
      README.md
  3. +1
    -0
      publish.gateway.bat
  4. +2
    -2
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  5. +4
    -5
      src/JT808.Gateway.Abstractions/JT808ReplyMessageHandler.cs
  6. +34
    -0
      src/JT808.Gateway.InMemoryMQ/JT808.Gateway.InMemoryMQ.csproj
  7. +62
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs
  8. +8
    -8
      src/JT808.Gateway.InMemoryMQ/JT808MsgProducer.cs
  9. +65
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs
  10. +27
    -0
      src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs
  11. +26
    -0
      src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs
  12. +2
    -2
      src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs
  13. +29
    -0
      src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs
  14. +5
    -5
      src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
  15. +3
    -3
      src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
  16. +0
    -4
      src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808.Gateway.MsgIdHandler.csproj
  17. +3
    -3
      src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs
  18. +1
    -3
      src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
  19. +1
    -3
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
  20. +20
    -18
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs
  21. +4
    -4
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs
  22. +1
    -3
      src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
  23. +18
    -16
      src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs
  24. +2
    -4
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj
  25. +10
    -9
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs
  26. +1
    -1
      src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs
  27. +1
    -3
      src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
  28. +11
    -10
      src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs
  29. +2
    -2
      src/JT808.Gateway.Test/JT808.Gateway.Test.csproj
  30. +1
    -1
      src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config
  31. +4
    -2
      src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj
  32. +9
    -8
      src/JT808.Gateway.TestHosting/Program.cs
  33. +3
    -6
      src/JT808.Gateway.TestHosting/appsettings.json
  34. +6
    -0
      src/JT808.Gateway.sln
  35. +1
    -7
      src/JT808.Gateway/Configurations/JT808Configuration.cs
  36. +0
    -18
      src/JT808.Gateway/Enums/JT808MessageQueueType.cs
  37. +0
    -204
      src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs
  38. +2
    -3
      src/JT808.Gateway/JT808.Gateway.csproj
  39. +0
    -11
      src/JT808.Gateway/JT808GatewayExtensions.cs
  40. +3
    -2
      src/JT808.Gateway/JT808GrpcServer.cs
  41. +11
    -14
      src/JT808.Gateway/JT808TcpServer.cs
  42. +5
    -13
      src/JT808.Gateway/JT808UdpServer.cs
  43. +5
    -18
      src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs
  44. +3
    -2
      src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs
  45. +3
    -2
      src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs

+ 41
- 0
.github/workflows/dotnetcore.yml 查看文件

@@ -0,0 +1,41 @@
name: .NET Core

on: [push]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@master
- name: Setup .NET Core
uses: actions/setup-dotnet@master
with:
dotnet-version: 3.1.101
- name: dotnet info
run: dotnet --info
- name: dotnet JT808.DotNetty restore
run: dotnet restore ./src/JT808.DotNetty.sln
- name: dotnet JT808.DotNetty.Core build
run: dotnet build ./src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/JT808.DotNetty.Core.Test.csproj
- name: dotnet JT808.DotNetty.Core test
run: dotnet test ./src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/JT808.DotNetty.Core.Test.csproj
- name: dotnet JT808.DotNetty.Tcp build
run: dotnet build ./src/JT808.DotNetty.Tests/JT808.DotNetty.Tcp.Test/JT808.DotNetty.Tcp.Test.csproj
- name: dotnet JT808.DotNetty.Tcp test
run: dotnet test ./src/JT808.DotNetty.Tests/JT808.DotNetty.Tcp.Test/JT808.DotNetty.Tcp.Test.csproj
- name: dotnet JT808.DotNetty.Udp build
run: dotnet build ./src/JT808.DotNetty.Tests/JT808.DotNetty.Udp.Test/JT808.DotNetty.Udp.Test.csproj
- name: dotnet JT808.DotNetty.Udp test
run: dotnet test ./src/JT808.DotNetty.Tests/JT808.DotNetty.Udp.Test/JT808.DotNetty.Udp.Test.csproj
- name: dotnet JT808.DotNetty.WebApi build
run: dotnet build ./src/JT808.DotNetty.Tests/JT808.DotNetty.WebApi.Test/JT808.DotNetty.WebApi.Test.csproj
- name: dotnet JT808.DotNetty.WebApi test
run: dotnet test ./src/JT808.DotNetty.Tests/JT808.DotNetty.WebApi.Test/JT808.DotNetty.WebApi.Test.csproj
- name: dotnet JT808.Gateway restore
run: dotnet restore ./src/JT808.Gateway.sln
- name: dotnet JT808.Gateway build
run: dotnet build ./src/JT808.Gateway.Test/JT808.Gateway.Test.csproj
- name: dotnet test
run: dotnet test ./src/JT808.Gateway.Test/JT808.Gateway.Test.csproj

+ 20
- 19
README.md 查看文件

@@ -14,7 +14,7 @@

[玩一玩压力测试](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/README.md)

[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE)[![Build Status](https://travis-ci.org/SmallChi/JT808DotNetty.svg?branch=master)](https://travis-ci.org/SmallChi/JT808DotNetty)
[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE)[![Build Status](https://travis-ci.org/SmallChi/JT808DotNetty.svg?branch=master)](https://travis-ci.org/SmallChi/JT808DotNetty)[![Github Build status](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg)]()

## 新网关的优势

@@ -57,7 +57,7 @@

## 基于GRPC的消息业务处理程序

[GRPC协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto)
[GRPC消息业务处理协议](https://github.com/SmallChi/JT808Gateway/blob/master/src/JT808.Gateway.Abstractions/Protos/JT808Gateway.proto)

## 基于DotNetty的NuGet安装

@@ -86,6 +86,7 @@
| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) |
| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) |
| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) |
| Install-Package JT808.Gateway.InMemoryMQ| ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/v/JT808.Gateway.InMemoryMQ.svg) | ![JT808.Gateway.InMemoryMQ](https://img.shields.io/nuget/dt/JT808.Gateway.InMemoryMQ.svg) |
| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) |
| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)|
| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)|
@@ -168,23 +169,23 @@ static async Task Main(string[] args)
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
//用于测试网关
.AddJT808DevelopmentGateway()
//用于生产环境
//.AddJT808Gateway(options =>
//{
// options.TcpPort=8086;
// options.UdpPort=8086;
// options.MessageQueueType = JT808MessageQueueType.InPlug;
//})
.AddTcp()
.AddUdp()
.AddGrpc()
//kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
;
//.AddJT808Gateway(options =>
//{
// options.TcpPort = 808;
// options.UdpPort = 808;
//})
.AddJT808Gateway(hostContext.Configuration)
.AddTcp()
.AddUdp()
.AddGrpc()
//InMemoryMQ
.AddJT808ServerInMemoryMQ()
.AddJT808InMemoryReplyMessage()
//kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
;
//services.AddHostedService<CallGrpcClientJob>();
});



+ 1
- 0
publish.gateway.bat 查看文件

@@ -1,5 +1,6 @@
dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj --no-build --output ../../nupkgs
dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj --no-build --output ../../nupkgs
dotnet pack .\src\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj --no-build --output ../../nupkgs
dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj --no-build --output ../../nupkgs

echo 'push service pacakge...'


+ 2
- 2
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj 查看文件

@@ -29,8 +29,8 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="JT808" Version="2.2.3" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="JT808" Version="2.2.6" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.1" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />


src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageService.cs → src/JT808.Gateway.Abstractions/JT808ReplyMessageHandler.cs 查看文件

@@ -1,5 +1,4 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
@@ -7,16 +6,16 @@ using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.ReplyMessage
namespace JT808.Gateway.Abstractions
{
public class JT808ReplyMessageService
public class JT808ReplyMessageHandler
{
protected Dictionary<ushort, MsgIdMethodDelegate> HandlerDict { get; }

protected delegate byte[] MsgIdMethodDelegate(JT808HeaderPackage package);
protected JT808Serializer JT808Serializer { get; }
protected IJT808MsgReplyProducer JT808MsgReplyProducer { get; }
public JT808ReplyMessageService(
public JT808ReplyMessageHandler(
IJT808Config jT808Config,
IJT808MsgReplyProducer jT808MsgReplyProducer)
{

+ 34
- 0
src/JT808.Gateway.InMemoryMQ/JT808.Gateway.InMemoryMQ.csproj 查看文件

@@ -0,0 +1,34 @@
<Project Sdk="Microsoft.NET.Sdk">

<Import Project="..\Version.props" />
<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2019.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<RepositoryUrl>https://github.com/SmallChi/JT808Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT808Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</license>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageId>JT808.Gateway.InMemoryMQ</PackageId>
<Product>JT808.Gateway.InMemoryMQ</Product>
<Description>基于InMemory的JT808消息发布与订阅</Description>
<PackageReleaseNotes>基于InMemory的JT808消息发布与订阅</PackageReleaseNotes>
<Version>$(JT808GatewayPackageVersion)</Version>
</PropertyGroup>

<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj" />
</ItemGroup>
</Project>

+ 62
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgConsumer.cs 查看文件

@@ -0,0 +1,62 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgConsumer : IJT808MsgConsumer
{
private readonly JT808MsgService JT808MsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();
private readonly ILogger logger;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgConsumer(
JT808MsgService jT808MsgService,
ILoggerFactory loggerFactory)
{
JT808MsgService = jT808MsgService;
logger = loggerFactory.CreateLogger("JT808MsgConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
if (JT808MsgService.TryRead(out var item))
{
callback(item);
}
}
catch(Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs → src/JT808.Gateway.InMemoryMQ/JT808MsgProducer.cs 查看文件

@@ -1,27 +1,27 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Internal;
using JT808.Gateway.InMemoryMQ.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
namespace JT808.Gateway.InMemoryMQ
{
internal class JT808MsgProducerDefault : IJT808MsgProducer
public class JT808MsgProducer : IJT808MsgProducer
{
private readonly JT808MsgService JT808MsgService;
public string TopicName => JT808GatewayConstants.MsgTopic;
public JT808MsgProducerDefault(JT808MsgService jT808MsgService)
public JT808MsgProducer(JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
}
public void Dispose()
{
}
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await JT808MsgService.WriteAsync(terminalNo, data);
}
public void Dispose()
{

}
}
}

+ 65
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgReplyConsumer.cs 查看文件

@@ -0,0 +1,65 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
private readonly JT808ReplyMsgService JT808ReplyMsgService;
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly ILogger logger;

public string TopicName => JT808GatewayConstants.MsgReplyTopic;

public JT808MsgReplyConsumer(
JT808ReplyMsgService jT808ReplyMsgService,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer");
JT808ReplyMsgService = jT808ReplyMsgService;
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
if (JT808ReplyMsgService.TryRead(out var item))
{
callback(item);
}
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Subscribe()
{

}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}
}
}

+ 27
- 0
src/JT808.Gateway.InMemoryMQ/JT808MsgReplyProducer.cs 查看文件

@@ -0,0 +1,27 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ
{
public class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
public string TopicName => JT808GatewayConstants.MsgReplyTopic;
private readonly JT808ReplyMsgService JT808ReplyMsgService;
public JT808MsgReplyProducer(JT808ReplyMsgService jT808ReplyMsgService)
{
JT808ReplyMsgService = jT808ReplyMsgService;
}
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await JT808ReplyMsgService.WriteAsync(terminalNo, data);
}
public void Dispose()
{

}
}
}

+ 26
- 0
src/JT808.Gateway.InMemoryMQ/JT808ServerInMemoryMQExtensions.cs 查看文件

@@ -0,0 +1,26 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.InMemoryMQ.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace JT808.Gateway.InMemoryMQ
{
public static class JT808ServerInMemoryMQExtensions
{
/// <summary>
///
/// </summary>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerInMemoryMQ(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808MsgService>();
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMsgService>();
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgConsumer), typeof(JT808MsgConsumer), ServiceLifetime.Singleton));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyProducer), typeof(JT808MsgReplyProducer), ServiceLifetime.Singleton));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
return jT808GatewayBuilder;
}
}
}

src/JT808.Gateway/Internal/JT808MsgService.cs → src/JT808.Gateway.InMemoryMQ/Services/JT808MsgService.cs 查看文件

@@ -5,9 +5,9 @@ using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
namespace JT808.Gateway.InMemoryMQ.Services
{
internal class JT808MsgService
public class JT808MsgService
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;


+ 29
- 0
src/JT808.Gateway.InMemoryMQ/Services/JT808ReplyMsgService.cs 查看文件

@@ -0,0 +1,29 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.InMemoryMQ.Services
{
public class JT808ReplyMsgService
{
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808ReplyMsgService()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] data)
{
await _channel.Writer.WriteAsync((terminalNo, data));
}

public bool TryRead(out (string TerminalNo, byte[] Data) item)
{
return _channel.Reader.TryRead(out item);
}
}
}

+ 5
- 5
src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj 查看文件

@@ -21,11 +21,11 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.3.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
</ItemGroup>

<ItemGroup>


+ 3
- 3
src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs 查看文件

@@ -11,7 +11,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgProducerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
@@ -23,7 +23,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
@@ -35,7 +35,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <param name="configuration">GetSection("JT808SessionProducerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)


+ 0
- 4
src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808.Gateway.MsgIdHandler.csproj 查看文件

@@ -21,10 +21,6 @@
<PackageReleaseNotes>基于JT808消息业务处理程序服务</PackageReleaseNotes>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
</ItemGroup>
<ItemGroup>
<ProjectReference Include="..\..\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj" />
</ItemGroup>


+ 3
- 3
src/JT808.Gateway.Services/JT808.Gateway.MsgIdHandler/JT808MsgIdHandlerExtensions.cs 查看文件

@@ -8,10 +8,10 @@ namespace JT808.Gateway.MsgIdHandler
{
public static class JT808MsgIdHandlerExtensions
{
public static IJT808ClientBuilder AddJT808MsgIdHandler<TJT808DotNettyMsgIdHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808DotNettyMsgIdHandler: IJT808MsgIdHandler
public static IJT808ClientBuilder AddJT808MsgIdHandler<TJT808MsgIdHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808MsgIdHandler: IJT808MsgIdHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler),typeof(TJT808DotNettyMsgIdHandler));
jT808ClientBuilder.JT808Builder.Services.AddSingleton(typeof(IJT808MsgIdHandler),typeof(TJT808MsgIdHandler));
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808MsgIdHandlerHostedService>();
return jT808ClientBuilder;
}


+ 1
- 3
src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj 查看文件

@@ -22,9 +22,7 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />


+ 1
- 3
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj 查看文件

@@ -21,9 +21,7 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />


+ 20
- 18
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs 查看文件

@@ -10,50 +10,52 @@ namespace JT808.Gateway.ReplyMessage
public static class JT808ReplyMessageExtensions
{
/// <summary>
/// 独享消息应答服务(不同的消费者实例)
/// 消息应答服务(不同的消费者实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808ReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddJT808InPlugReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageService>();
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
return jT808ClientBuilder;
}
/// <summary>
/// 独享消息应答服务(不同的消费者实例)
/// 消息应答服务(不同的消费者实例)
/// </summary>
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808ReplyMessage<TReplyMessageService>(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageService : JT808ReplyMessageService
public static IJT808ClientBuilder AddJT808InPlugReplyMessage<TReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageHandler : JT808ReplyMessageHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageService,TReplyMessageService>();
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
return jT808ClientBuilder;
}
/// <summary>
/// 共享消息应答服务(消费者单实例)
/// 消息应答服务(消费者单实例)
/// </summary>
/// <typeparam name="TReplyMessageService">自定义消息回复服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808ReplyMessage<TReplyMessageService>(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageService : JT808ReplyMessageService
public static IJT808GatewayBuilder AddJT808InMemoryReplyMessage<TReplyMessageHandler>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TReplyMessageHandler : JT808ReplyMessageHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageService, TReplyMessageService>();
return jT808ClientBuilder;
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler, TReplyMessageHandler>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
return jT808GatewayBuilder;
}
/// <summary>
/// 共享消息应答服务(消费者单实例)
/// 消息应答服务(消费者单实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808ReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808GatewayBuilder AddJT808InMemoryReplyMessage(this IJT808GatewayBuilder jT808GatewayBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageService>();
return jT808ClientBuilder;
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808ReplyMessageHandler>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
return jT808GatewayBuilder;
}
}
}

+ 4
- 4
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs 查看文件

@@ -8,20 +8,20 @@ namespace JT808.Gateway.ReplyMessage
public class JT808ReplyMessageHostedService : IHostedService
{
private readonly IJT808MsgConsumer jT808MsgConsumer;
private readonly JT808ReplyMessageService jT808ReplyMessageService;
private readonly JT808ReplyMessageHandler jT808ReplyMessageHandler;

public JT808ReplyMessageHostedService(
JT808ReplyMessageService jT808ReplyMessageService,
JT808ReplyMessageHandler jT808ReplyMessageHandler,
IJT808MsgConsumer jT808MsgConsumer)
{
this.jT808MsgConsumer = jT808MsgConsumer;
this.jT808ReplyMessageService = jT808ReplyMessageService;
this.jT808ReplyMessageHandler = jT808ReplyMessageHandler;
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage(jT808ReplyMessageService.Processor);
jT808MsgConsumer.OnMessage(jT808ReplyMessageHandler.Processor);
return Task.CompletedTask;
}



+ 1
- 3
src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj 查看文件

@@ -22,9 +22,7 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />


+ 18
- 16
src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808SessionNoticeExtensions.cs 查看文件

@@ -10,11 +10,11 @@ namespace JT808.Gateway.SessionNotice
public static class JT808SessionNoticeExtensions
{
/// <summary>
/// 独享消息会话通知服务(不同的消费者实例)
/// 会话通知服务(不同的消费者实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808SessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddJT808InPlugSessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
@@ -22,12 +22,12 @@ namespace JT808.Gateway.SessionNotice
}

/// <summary>
/// 独享消息会话通知服务(不同的消费者实例)
/// 消息会话通知服务(不同的消费者实例)
/// </summary>
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808SessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddJT808InPlugSessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder)
where TSessionNoticeService : JT808SessionNoticeService
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService,TSessionNoticeService>();
@@ -36,27 +36,29 @@ namespace JT808.Gateway.SessionNotice
}

/// <summary>
/// 共享消息会话通知服务(消费者单实例)
/// 消息会话通知服务(消费者单实例)
/// </summary>
/// <typeparam name="TSessionNoticeService">自定义会话通知服务</typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808SessionNotice<TSessionNoticeService>(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808GatewayBuilder AddJT808InMemorySessionNotice<TSessionNoticeService>(this IJT808GatewayBuilder jT808GatewayBuilder)
where TSessionNoticeService : JT808SessionNoticeService
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService, TSessionNoticeService>();
return jT808ClientBuilder;
jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService, TSessionNoticeService>();
jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
return jT808GatewayBuilder;
}

/// <summary>
/// 共享消息会话通知服务(消费者单实例)
/// 消息会话通知服务(消费者单实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808SessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
return jT808ClientBuilder;
}
//public static IJT808GatewayBuilder AddJT808InMemorySessionNotice(this IJT808GatewayBuilder jT808GatewayBuilder)
//{
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808SessionNoticeService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808SessionNoticeHostedService>();
// return jT808GatewayBuilder;
//}
}
}

+ 2
- 4
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808.Gateway.Traffic.csproj 查看文件

@@ -21,10 +21,8 @@
<PackageLicenseFile>LICENSE</PackageLicenseFile>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="CSRedisCore" Version="3.2.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
<PackageReference Include="CSRedisCore" Version="3.3.0" />
</ItemGroup>
<ItemGroup>
<None Include="..\..\..\LICENSE" Pack="true" PackagePath="" />


+ 10
- 9
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceExtensions.cs 查看文件

@@ -9,26 +9,27 @@ namespace JT808.Gateway.Traffic
public static class JT808TrafficServiceExtensions
{
/// <summary>
/// 独享消息流量统计服务(不同的消费者实例)
/// 消息流量统计服务(不同的消费者实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808Traffic(this IJT808ClientBuilder jT808ClientBuilder)
public static IJT808ClientBuilder AddJT808InPlugTraffic(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TrafficService>();
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>();
return jT808ClientBuilder;
}
/// <summary>
/// 共享消息流量统计服务(消费者单实例)
/// 消息流量统计服务(消费者单实例)
/// </summary>
/// <typeparam name="TReplyMessageService"></typeparam>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808Traffic(this IJT808ClientBuilder jT808ClientBuilder)
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TrafficService>();
return jT808ClientBuilder;
}
//public static IJT808GatewayBuilder AddJT808InMemoryTraffic(this IJT808GatewayBuilder jT808GatewayBuilder)
//{
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TrafficService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TrafficServiceHostedService>();
// return jT808GatewayBuilder;
//}
}
}

+ 1
- 1
src/JT808.Gateway.Services/JT808.Gateway.Traffic/JT808TrafficServiceHostedService.cs 查看文件

@@ -23,7 +23,7 @@ namespace JT808.Gateway.Traffic
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage((item)=> {
string str = item.Data.ToHexString();
//string str = item.Data.ToHexString();
jT808TrafficService.Processor(item.TerminalNo, item.Data.Length);
});
return Task.CompletedTask;


+ 1
- 3
src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj 查看文件

@@ -25,9 +25,7 @@
<PackageReference Include="DotNetty.Buffers" Version="0.6.0" />
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport" Version="0.6.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
<PackageReference Include="Polly" Version="7.2.0" />
</ItemGroup>
<ItemGroup>


+ 11
- 10
src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808TransmitExtensions.cs 查看文件

@@ -12,12 +12,12 @@ namespace JT808.Gateway.Transmit
public static class JT808TransmitExtensions
{
/// <summary>
/// 独享转发服务(不同的消费者实例)
/// 转发服务(不同的消费者实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddInprocJT808Transmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration)
public static IJT808ClientBuilder AddJT808InPlugTransmit(this IJT808ClientBuilder jT808ClientBuilder,IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
@@ -25,16 +25,17 @@ namespace JT808.Gateway.Transmit
return jT808ClientBuilder;
}
/// <summary>
/// 共享转发服务(消费者单实例)
/// 转发服务(消费者单实例)
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <param name="jT808GatewayBuilder"></param>
/// <param name="configuration"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddShareJT808Transmit(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
jT808ClientBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
return jT808ClientBuilder;
}
//public static IJT808GatewayBuilder AddJT808InMemoryTransmit(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
//{
// jT808GatewayBuilder.JT808Builder.Services.Configure<RemoteServerOptions>(configuration.GetSection("RemoteServerOptions"));
// jT808GatewayBuilder.JT808Builder.Services.AddSingleton<JT808TransmitService>();
// jT808GatewayBuilder.JT808Builder.Services.AddHostedService<JT808TransmitHostedService>();
// return jT808GatewayBuilder;
//}
}
}

+ 2
- 2
src/JT808.Gateway.Test/JT808.Gateway.Test.csproj 查看文件

@@ -7,7 +7,7 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="3.1.1" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.4.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
<PackageReference Include="xunit" Version="2.4.1" />
@@ -15,7 +15,7 @@
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
<PackageReference Include="coverlet.collector" Version="1.1.0">
<PackageReference Include="coverlet.collector" Version="1.2.0">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>


+ 1
- 1
src/JT808.Gateway.TestHosting/Configs/nlog.Win32NT.config 查看文件

@@ -31,6 +31,6 @@
</target>
</targets>
<rules>
<logger name="*" minlevel="Debug" maxlevel="Fatal" writeTo="Gateway,console"/>
<logger name="*" minlevel="Trace" maxlevel="Fatal" writeTo="Gateway,console"/>
</rules>
</nlog>

+ 4
- 2
src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj 查看文件

@@ -7,14 +7,16 @@
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" />
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.9.10" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj" />
<ProjectReference Include="..\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj" />
<ProjectReference Include="..\JT808.Gateway.Services\JT808.Gateway.ReplyMessage\JT808.Gateway.ReplyMessage.csproj" />
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>



+ 9
- 8
src/JT808.Gateway.TestHosting/Program.cs 查看文件

@@ -7,8 +7,9 @@ using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using NLog.Extensions.Logging;
using JT808.Gateway.TestHosting.Jobs;
using JT808.Gateway.Enums;
using JT808.Gateway.Kafka;
using JT808.Gateway.InMemoryMQ;
using JT808.Gateway.ReplyMessage;

namespace JT808.Gateway.TestHosting
{
@@ -35,18 +36,18 @@ namespace JT808.Gateway.TestHosting
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
//用于测试网关
.AddJT808DevelopmentGateway()
//用于生产环境
//.AddJT808Gateway(options =>
//{
// options.TcpPort=8086;
// options.UdpPort=8086;
// options.MessageQueueType = JT808MessageQueueType.InPlug;
//})
// options.TcpPort = 808;
// options.UdpPort = 808;
//})
.AddJT808Gateway(hostContext.Configuration)
.AddTcp()
.AddUdp()
.AddGrpc()
//InMemoryMQ
.AddJT808ServerInMemoryMQ()
.AddJT808InMemoryReplyMessage()
//kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)


+ 3
- 6
src/JT808.Gateway.TestHosting/appsettings.json 查看文件

@@ -1,9 +1,6 @@
{
"Logging": {
"LogLevel": {
"Default": "Debug",
"Microsoft": "Warning",
"Microsoft.Hosting.Lifetime": "Warning"
}
"JT808Configuration": {
"TcpPort": 808,
"UdpPort": 808
}
}

+ 6
- 0
src/JT808.Gateway.sln 查看文件

@@ -27,6 +27,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Transmit", "J
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Traffic", "JT808.Gateway.Services\JT808.Gateway.Traffic\JT808.Gateway.Traffic.csproj", "{8FCC6D65-8A49-4AE7-8B19-F255100849D6}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.InMemoryMQ", "JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj", "{F7460E8F-B23E-4407-8802-375DE37BED00}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -77,6 +79,10 @@ Global
{8FCC6D65-8A49-4AE7-8B19-F255100849D6}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8FCC6D65-8A49-4AE7-8B19-F255100849D6}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8FCC6D65-8A49-4AE7-8B19-F255100849D6}.Release|Any CPU.Build.0 = Release|Any CPU
{F7460E8F-B23E-4407-8802-375DE37BED00}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{F7460E8F-B23E-4407-8802-375DE37BED00}.Debug|Any CPU.Build.0 = Debug|Any CPU
{F7460E8F-B23E-4407-8802-375DE37BED00}.Release|Any CPU.ActiveCfg = Release|Any CPU
{F7460E8F-B23E-4407-8802-375DE37BED00}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE


+ 1
- 7
src/JT808.Gateway/Configurations/JT808Configuration.cs 查看文件

@@ -1,5 +1,4 @@
using JT808.Gateway.Enums;
using System;
using System;
using System.Collections.Generic;
using System.Text;

@@ -34,10 +33,5 @@ namespace JT808.Gateway.Configurations
/// Udp 60s检查一次
/// </summary>
public int UdpReceiveTimeoutCheckTimeSeconds { get; set; } = 60;
/// <summary>
/// 队列类型
/// 默认内存队列
/// </summary>
public JT808MessageQueueType MessageQueueType { get; set; } = JT808MessageQueueType.InMemory;
}
}

+ 0
- 18
src/JT808.Gateway/Enums/JT808MessageQueueType.cs 查看文件

@@ -1,18 +0,0 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Enums
{
public enum JT808MessageQueueType:byte
{
/// <summary>
/// 使用内存队列
/// </summary>
InMemory=1,
/// <summary>
/// 使用第三方队列
/// </summary>
InPlug=2
}
}

+ 0
- 204
src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs 查看文件

@@ -1,204 +0,0 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
internal class JT808MsgReplyConsumerDefault : IJT808MsgReplyConsumer
{
private readonly JT808MsgService JT808MsgService;

private readonly JT808Serializer JT808Serializer;

private delegate byte[] MethodDelegate(JT808HeaderPackage headerPackage);

private Dictionary<ushort, MethodDelegate> HandlerDict;
public JT808MsgReplyConsumerDefault(
IJT808Config jT808Config,
JT808MsgService jT808MsgService)
{
JT808MsgService = jT808MsgService;
this.JT808Serializer = jT808Config.GetSerializer();
HandlerDict = new Dictionary<ushort, MethodDelegate> {
{JT808MsgId.终端通用应答.ToUInt16Value(), Msg0x0001},
{JT808MsgId.终端鉴权.ToUInt16Value(), Msg0x0102},
{JT808MsgId.终端心跳.ToUInt16Value(), Msg0x0002},
{JT808MsgId.终端注销.ToUInt16Value(), Msg0x0003},
{JT808MsgId.终端注册.ToUInt16Value(), Msg0x0100},
{JT808MsgId.位置信息汇报.ToUInt16Value(),Msg0x0200 },
{JT808MsgId.定位数据批量上传.ToUInt16Value(),Msg0x0704 },
{JT808MsgId.数据上行透传.ToUInt16Value(),Msg0x0900 }
};
}
public CancellationTokenSource Cts =>new CancellationTokenSource();

public string TopicName => JT808GatewayConstants.MsgReplyTopic;

public void Dispose()
{
Cts.Dispose();
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
if(JT808MsgService.TryRead(out var item))
{
JT808HeaderPackage package = JT808Serializer.HeaderDeserialize(item.Data);
if (HandlerDict.TryGetValue(package.Header.MsgId, out var func))
{
var buffer = func(package);
if (buffer != null)
{
callback((item.TerminalNo, buffer));
}
}
}
}
catch
{

}
}
}, Cts.Token);
}

public void Subscribe()
{
}

public void Unsubscribe()
{
Cts.Cancel();
}

/// <summary>
/// 终端通用应答
/// 平台无需回复
/// 实现自己的业务
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0001(JT808HeaderPackage request)
{
return null;
}

/// <summary>
/// 终端心跳
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0002(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 终端注销
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0003(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 终端注册
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0100(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.终端注册应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8100()
{
Code = "J" + request.Header.TerminalPhoneNo,
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功,
AckMsgNum = request.Header.MsgNum
}));
}
/// <summary>
/// 终端鉴权
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0102(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 位置信息汇报
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0200(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 定位数据批量上传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0704(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}

/// <summary>
/// 数据上行透传
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x0900(JT808HeaderPackage request)
{
return JT808Serializer.Serialize(JT808MsgId.平台通用应答.Create(request.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = request.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = request.Header.MsgNum
}));
}
}
}

+ 2
- 3
src/JT808.Gateway/JT808.Gateway.csproj 查看文件

@@ -21,10 +21,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.1" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
</ItemGroup>

<ItemGroup>


+ 0
- 11
src/JT808.Gateway/JT808GatewayExtensions.cs 查看文件

@@ -16,14 +16,6 @@ namespace JT808.Gateway
{
public static partial class JT808GatewayExtensions
{
public static IJT808GatewayBuilder AddJT808DevelopmentGateway(this IJT808Builder jt808Builder)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
server.JT808Builder.Services.TryAddSingleton<JT808Configuration>();
server.AddJT808Core();
return server;
}

public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder,Action<JT808Configuration> config)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
@@ -63,10 +55,7 @@ namespace JT808.Gateway
private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.TryAddSingleton<JT808AtomicCounterServiceFactory>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgProducer, JT808MsgProducerDefault>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumerDefault>();
config.JT808Builder.Services.TryAddSingleton<JT808SessionManager>();
config.JT808Builder.Services.TryAddSingleton<JT808MsgService>();
config.JT808Builder.Services.AddHostedService<JT808MsgReplyHostedService>();
return config;
}


+ 3
- 2
src/JT808.Gateway/JT808GrpcServer.cs 查看文件

@@ -7,6 +7,7 @@ using JT808.Gateway.GrpcService;
using JT808.Gateway.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace JT808.Gateway
{
@@ -18,11 +19,11 @@ namespace JT808.Gateway
private Server server;
public JT808GrpcServer(
IServiceProvider serviceProvider,
JT808Configuration jT808Configuration,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger("JT808GrpcServer");
Configuration = jT808Configuration;
Configuration = jT808ConfigurationAccessor.Value;
ServiceProvider = serviceProvider;
}



+ 11
- 14
src/JT808.Gateway/JT808TcpServer.cs 查看文件

@@ -10,7 +10,6 @@ using System.Threading.Tasks;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
@@ -18,6 +17,7 @@ using JT808.Protocol.Exceptions;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace JT808.Gateway
{
@@ -38,7 +38,7 @@ namespace JT808.Gateway
private readonly JT808Configuration Configuration;

public JT808TcpServer(
JT808Configuration jT808Configuration,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager,
@@ -50,14 +50,14 @@ namespace JT808.Gateway
Serializer = jT808Config.GetSerializer();
MsgProducer = jT808MsgProducer;
AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp);
Configuration = jT808Configuration;
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.TcpPort);
Configuration = jT808ConfigurationAccessor.Value;
var IPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.TcpPort);
server = new Socket(IPEndPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.NoDelay, true);
server.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.KeepAlive, true);
server.LingerState = new LingerOption(false, 0);
server.Bind(IPEndPoint);
server.Listen(jT808Configuration.SoBacklog);
server.Listen(Configuration.SoBacklog);
}

public Task StartAsync(CancellationToken cancellationToken)
@@ -106,6 +106,11 @@ namespace JT808.Gateway
Logger.LogError($"[Receive Timeout]:{session.Client.RemoteEndPoint}");
break;
}
catch (System.Net.Sockets.SocketException ex)
{
Logger.LogError($"[{ex.SocketErrorCode.ToString()},{ex.Message}]:{session.Client.RemoteEndPoint}");
break;
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Error]:{session.Client.RemoteEndPoint}");
@@ -174,16 +179,8 @@ namespace JT808.Gateway
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {session.Client.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
//设直连模式和转发模式的会话如何处理
SessionManager.TryLink(package.Header.TerminalPhoneNo, session);
if(Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
MsgProducer.ProduceAsync(session.SessionID, package.OriginalData.ToArray());
}
else
{
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
catch (JT808Exception ex)
{


+ 5
- 13
src/JT808.Gateway/JT808UdpServer.cs 查看文件

@@ -10,7 +10,6 @@ using System.Threading.Tasks;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Services;
using JT808.Gateway.Session;
using JT808.Protocol;
@@ -18,6 +17,7 @@ using JT808.Protocol.Exceptions;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace JT808.Gateway
{
@@ -40,7 +40,7 @@ namespace JT808.Gateway
private IPEndPoint LocalIPEndPoint;

public JT808UdpServer(
JT808Configuration jT808Configuration,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
IJT808Config jT808Config,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager,
@@ -52,8 +52,8 @@ namespace JT808.Gateway
Serializer = jT808Config.GetSerializer();
MsgProducer = jT808MsgProducer;
AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.udp);
Configuration = jT808Configuration;
LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, jT808Configuration.UdpPort);
Configuration = jT808ConfigurationAccessor.Value;
LocalIPEndPoint = new System.Net.IPEndPoint(IPAddress.Any, Configuration.UdpPort);
server = new Socket(LocalIPEndPoint.AddressFamily, SocketType.Dgram, ProtocolType.Udp);
server.Bind(LocalIPEndPoint);
}
@@ -95,20 +95,12 @@ namespace JT808.Gateway
AtomicCounterService.MsgSuccessIncrement();
if (Logger.IsEnabled(LogLevel.Debug)) Logger.LogDebug($"[Atomic Success Counter]:{AtomicCounterService.MsgSuccessCount}");
if (Logger.IsEnabled(LogLevel.Trace)) Logger.LogTrace($"[Accept Hex {receiveMessageFromResult.RemoteEndPoint}]:{package.OriginalData.ToArray().ToHexString()}");
//设直连模式和转发模式的会话如何处理
string sessionId= SessionManager.TryLink(package.Header.TerminalPhoneNo, socket, receiveMessageFromResult.RemoteEndPoint);
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
}
if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
MsgProducer.ProduceAsync(sessionId, package.OriginalData.ToArray());
}
else
{
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
MsgProducer.ProduceAsync(package.Header.TerminalPhoneNo, package.OriginalData.ToArray());
}
catch (JT808Exception ex)
{


+ 5
- 18
src/JT808.Gateway/Services/JT808MsgReplyHostedService.cs 查看文件

@@ -1,11 +1,7 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Configurations;
using JT808.Gateway.Enums;
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

@@ -16,36 +12,27 @@ namespace JT808.Gateway.Services
private readonly JT808SessionManager JT808SessionManager;

private readonly IJT808MsgReplyConsumer JT808MsgReplyConsumer;
private readonly JT808Configuration Configuration;
public JT808MsgReplyHostedService(
JT808Configuration jT808Configuration,
IJT808MsgReplyConsumer jT808MsgReplyConsumer,
JT808SessionManager jT808SessionManager)
{
JT808MsgReplyConsumer = jT808MsgReplyConsumer;
JT808SessionManager = jT808SessionManager;
Configuration = jT808Configuration;
}

public Task StartAsync(CancellationToken cancellationToken)
{
if(Configuration.MessageQueueType== JT808MessageQueueType.InMemory)
JT808MsgReplyConsumer.OnMessage(item =>
{
JT808MsgReplyConsumer.OnMessage(item =>
{
JT808SessionManager.TrySendBySessionId(item.TerminalNo, item.Data);
});
JT808MsgReplyConsumer.Subscribe();
}
JT808SessionManager.TrySendByTerminalPhoneNo(item.TerminalNo, item.Data);
});
JT808MsgReplyConsumer.Subscribe();
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
if (Configuration.MessageQueueType == JT808MessageQueueType.InMemory)
{
JT808MsgReplyConsumer.Unsubscribe();
}
JT808MsgReplyConsumer.Unsubscribe();
return Task.CompletedTask;
}
}


+ 3
- 2
src/JT808.Gateway/Services/JT808TcpReceiveTimeoutHostedService.cs 查看文件

@@ -2,6 +2,7 @@
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
@@ -18,14 +19,14 @@ namespace JT808.Gateway.Services

private readonly JT808Configuration Configuration;
public JT808TcpReceiveTimeoutHostedService(
JT808Configuration jT808Configuration,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager
)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808TcpReceiveTimeout");
Configuration = jT808Configuration;
Configuration = jT808ConfigurationAccessor.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)


+ 3
- 2
src/JT808.Gateway/Services/JT808UdpReceiveTimeoutHostedService.cs 查看文件

@@ -2,6 +2,7 @@
using JT808.Gateway.Session;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
@@ -18,14 +19,14 @@ namespace JT808.Gateway.Services

private readonly JT808Configuration Configuration;
public JT808UdpReceiveTimeoutHostedService(
JT808Configuration jT808Configuration,
IOptions<JT808Configuration> jT808ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT808SessionManager jT808SessionManager
)
{
SessionManager = jT808SessionManager;
Logger = loggerFactory.CreateLogger("JT808UdpReceiveTimeout");
Configuration = jT808Configuration;
Configuration = jT808ConfigurationAccessor.Value;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)


Loading…
取消
儲存