Procházet zdrojové kódy

pipeline-preview3

1.完善队列模式的服务测试
2.修复应答服务的消息处理注册
tags/pipeline-1.1.0
SmallChi(Koike) před 4 roky
rodič
revize
9fbd8dfc2d
11 změnil soubory, kde provedl 174 přidání a 17 odebrání
  1. +1
    -1
      src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs
  2. +1
    -1
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  3. +1
    -1
      src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
  4. +3
    -3
      src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml
  5. +3
    -3
      src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
  6. +3
    -2
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs
  7. +21
    -3
      src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs
  8. +76
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs
  9. +59
    -0
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs
  10. +5
    -2
      src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs
  11. +1
    -1
      src/Version.props

+ 1
- 1
src/JT808.Gateway.Abstractions/Configurations/JT808Configuration.cs Zobrazit soubor

@@ -35,6 +35,6 @@ namespace JT808.Gateway.Abstractions.Configurations
/// <summary>
/// 网关不做消息业务处理,往队列发送
/// </summary>
public List<uint> FilterMsgIdHandlerForQueue { get; set; }
public List<uint> FilterMsgIdHandlerForQueue { get; set; } = new List<uint>();
}
}

+ 1
- 1
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj Zobrazit soubor

@@ -32,7 +32,7 @@
<Compile Remove="JT808QueueReplyMessageHandler.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="JT808" Version="2.2.13" />
<PackageReference Include="JT808" Version="2.2.14" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.9" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.9" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.9" />


+ 1
- 1
src/JT808.Gateway.Client/JT808.Gateway.Client.csproj Zobrazit soubor

@@ -22,7 +22,7 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="JT808" Version="2.2.13" />
<PackageReference Include="JT808" Version="2.2.14" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.9" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.9" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.3" />


+ 3
- 3
src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.xml Zobrazit soubor

@@ -8,7 +8,7 @@
<summary>
</summary>
<param name="serviceDescriptors"></param>
<param name="jT808ClientBuilder"></param>
<param name="configuration">GetSection("JT808MsgConsumerConfig")</param>
<returns></returns>
</member>
@@ -16,7 +16,7 @@
<summary>
</summary>
<param name="serviceDescriptors"></param>
<param name="jT808ClientBuilder"></param>
<param name="configuration">GetSection("JT808MsgReplyProducerConfig")</param>
<returns></returns>
</member>
@@ -40,7 +40,7 @@
<summary>
</summary>
<param name="serviceDescriptors"></param>
<param name="jT808ClientBuilder"></param>
<param name="configuration">GetSection("JT808SessionConsumerConfig")</param>
<returns></returns>
</member>


+ 3
- 3
src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs Zobrazit soubor

@@ -17,7 +17,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="jT808ClientBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgConsumerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
@@ -29,7 +29,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="jT808ClientBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgReplyProducerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
@@ -65,7 +65,7 @@ namespace JT808.Gateway.Kafka
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="jT808ClientBuilder"></param>
/// <param name="configuration">GetSection("JT808SessionConsumerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)


+ 3
- 2
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageExtensions.cs Zobrazit soubor

@@ -1,6 +1,7 @@

using JT808.Gateway.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;
@@ -10,14 +11,14 @@ namespace JT808.Gateway.ReplyMessage
public static class JT808ReplyMessageExtensions
{
/// <summary>
/// 消息应答服务(不同的消费者实例)
/// 消息应答服务
/// </summary>
/// <param name="jT808ClientBuilder"></param>
/// <returns></returns>
public static IJT808ClientBuilder AddReplyMessage<TJT808ReplyMessageHandler>(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808ReplyMessageHandler : IJT808ReplyMessageHandler
{
jT808ClientBuilder.JT808Builder.Services.AddSingleton(new ServiceDescriptor(typeof(IJT808ReplyMessageHandler),typeof(TJT808ReplyMessageHandler), ServiceLifetime.Singleton));
jT808ClientBuilder.JT808Builder.Services.Add(new ServiceDescriptor(typeof(IJT808ReplyMessageHandler),typeof(TJT808ReplyMessageHandler), ServiceLifetime.Singleton));
jT808ClientBuilder.JT808Builder.Services.AddHostedService<JT808ReplyMessageHostedService>();
return jT808ClientBuilder;
}


+ 21
- 3
src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808ReplyMessageHostedService.cs Zobrazit soubor

@@ -2,6 +2,7 @@
using Microsoft.Extensions.Hosting;
using System.Threading;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;

namespace JT808.Gateway.ReplyMessage
{
@@ -9,20 +10,37 @@ namespace JT808.Gateway.ReplyMessage
{
private IJT808MsgConsumer jT808MsgConsumer;
private IJT808ReplyMessageHandler jT808ReplyMessageHandler;

private IJT808MsgReplyProducer jT808MsgReplyProducer;
private ILogger logger;
public JT808ReplyMessageHostedService(
ILoggerFactory loggerFactory,
IJT808ReplyMessageHandler jT808ReplyMessageHandler,
IJT808MsgReplyProducer jT808MsgReplyProducer,
IJT808MsgConsumer jT808MsgConsumer)
{
this.jT808MsgConsumer = jT808MsgConsumer;
this.jT808MsgReplyProducer = jT808MsgReplyProducer;
this.jT808ReplyMessageHandler = jT808ReplyMessageHandler;
this.logger = loggerFactory.CreateLogger<JT808ReplyMessageHostedService>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage((Message)=> {
jT808ReplyMessageHandler.Processor(Message.TerminalNo, Message.Data);
jT808MsgConsumer.OnMessage(async (Message) =>
{
try
{
var data = jT808ReplyMessageHandler.Processor(Message.TerminalNo, Message.Data);
if (data != null)
{
await jT808MsgReplyProducer.ProduceAsync(Message.TerminalNo, data);
}
}
catch (System.Exception ex)
{
logger.LogError(ex, "");
}
});
return Task.CompletedTask;
}


+ 76
- 0
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808CustomMessageHandlerImpl.cs Zobrazit soubor

@@ -0,0 +1,76 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.Abstractions.Configurations;
using JT808.Gateway.MsgLogging;
using JT808.Gateway.Transmit;
using JT808.Protocol;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.QueueHosting.Impl
{
public class JT808CustomMessageHandlerImpl : JT808MessageHandler
{
private readonly ILogger logger;
public JT808CustomMessageHandlerImpl(
ILoggerFactory loggerFactory,
IOptionsMonitor<JT808Configuration> jT808ConfigurationOptionsMonitor,
IJT808MsgProducer msgProducer,
IJT808MsgReplyLoggingProducer msgReplyLoggingProducer,
IJT808Config jT808Config) : base(jT808ConfigurationOptionsMonitor,
msgProducer,
msgReplyLoggingProducer,
jT808Config)
{
logger = loggerFactory.CreateLogger<JT808CustomMessageHandlerImpl>();
//过滤掉0x0200消息,通过服务服务进行下发应答,可以通过配置文件的方式进行增加修改(支持热更新)
jT808ConfigurationOptionsMonitor.CurrentValue.FilterMsgIdHandlerForQueue.Add(0x0200);
//添加自定义消息
HandlerDict.Add(0x9999, Msg0x9999);
}


/// <summary>
/// 重写消息处理器
/// </summary>
/// <param name="request"></param>
/// <param name="session"></param>
public override byte[] Processor(JT808HeaderPackage request, IJT808Session session)
{
try
{
var down = base.Processor(request, session);
return down;
}
catch (Exception)
{
return default;
}
}

/// <summary>
/// 重写自带的消息
/// </summary>
/// <param name="request"></param>
/// <param name="session"></param>
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("由于过滤了0x0200,网关是不会处理0x0200消息的应答");
var data = base.Msg0x0200(request, session);
return data;
}

/// <summary>
/// 自定义消息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("自定义消息");
return default;
}
}
}

+ 59
- 0
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Impl/JT808ReplyMessageHandlerImpl.cs Zobrazit soubor

@@ -0,0 +1,59 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.QueueHosting.Impl
{
public class JT808ReplyMessageHandlerImpl : IJT808ReplyMessageHandler
{
private ILogger logger;
private JT808Serializer Serializer;

public JT808ReplyMessageHandlerImpl(
IJT808Config jT808Config,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT808ReplyMessageHandlerImpl>();
Serializer = jT808Config.GetSerializer();
}

public byte[] Processor(string TerminalNo, byte[] Data)
{
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"实现消息应答处理,{TerminalNo},{Data.ToHexString()}");
}
var package = Serializer.Deserialize(Data);
if (package.Header.MsgId == 0x0200)
{
if (package.Version == JT808Version.JTT2019)
{
byte[] data = Serializer.Serialize(JT808MsgId.平台通用应答.Create_平台通用应答_2019(package.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = package.Header.MsgNum
}));
return data;
}
else
{
byte[] data = Serializer.Serialize(JT808MsgId.平台通用应答.Create(package.Header.TerminalPhoneNo, new JT808_0x8001()
{
AckMsgId = package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.成功,
MsgNum = package.Header.MsgNum
}));
return data;
}
}
return default;
}
}
}

+ 5
- 2
src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/Program.cs Zobrazit soubor

@@ -15,6 +15,7 @@ using JT808.Gateway.Client;
using JT808.Gateway.QueueHosting.Jobs;
using JT808.Gateway.Kafka;
using JT808.Gateway.WebApiClientTool;
using JT808.Gateway.QueueHosting.Impl;

namespace JT808.Gateway.QueueHosting
{
@@ -48,12 +49,14 @@ namespace JT808.Gateway.QueueHosting
//添加客户端服务
.AddClientKafka()
.AddMsgConsumer(hostContext.Configuration)
//添加消息应答服务
//添加消息应答生产者
.AddMsgReplyProducer(hostContext.Configuration)
//添加消息应答服务并实现消息应答处理
.AddReplyMessage<JT808ReplyMessageHandlerImpl>()
.Builder()
//添加消息应答处理
//.AddReplyMessage();
.AddGateway(hostContext.Configuration)
.AddMessageHandler<JT808CustomMessageHandlerImpl>()
.AddServerKafkaMsgProducer(hostContext.Configuration)
.AddServerKafkaSessionProducer(hostContext.Configuration)
.AddServerKafkaMsgReplyConsumer(hostContext.Configuration)


+ 1
- 1
src/Version.props Zobrazit soubor

@@ -1,6 +1,6 @@
<Project>
<PropertyGroup>
<JT808DotNettyPackageVersion>2.3.2</JT808DotNettyPackageVersion>
<JT808GatewayPackageVersion>1.0.2-preview2</JT808GatewayPackageVersion>
<JT808GatewayPackageVersion>1.0.2-preview3</JT808GatewayPackageVersion>
</PropertyGroup>
</Project>

Načítá se…
Zrušit
Uložit