Parcourir la source

Merge branch 'master' of https://github.com/SmallChi/JT808DotNetty

tags/pipeline-1.0.0
waterliu99 il y a 5 ans
Parent
révision
2d105fcec8
21 fichiers modifiés avec 660 ajouts et 0 suppressions
  1. +4
    -0
      src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
  2. +15
    -0
      src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
  3. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
  4. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
  5. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
  6. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
  7. +15
    -0
      src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
  8. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
  9. +13
    -0
      src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
  10. +39
    -0
      src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
  11. +25
    -0
      src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
  12. +66
    -0
      src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
  13. +82
    -0
      src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
  14. +38
    -0
      src/JT808.Gateway.Kafka/JT808MsgProducer.cs
  15. +82
    -0
      src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
  16. +38
    -0
      src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
  17. +48
    -0
      src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
  18. +82
    -0
      src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
  19. +38
    -0
      src/JT808.Gateway.Kafka/JT808SessionProducer.cs
  20. +6
    -0
      src/JT808.Gateway.sln
  21. +4
    -0
      src/JT808.Gateway/JT808.Gateway.csproj

+ 4
- 0
src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj Voir le fichier

@@ -6,6 +6,10 @@
<Authors>SmallChi(Koike)</Authors>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<RepositoryUrl>https://github.com/SmallChi/JT808Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT808Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</license>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Description>基于Pipeline实现的JT808Gateway的抽象库</Description>


+ 15
- 0
src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs Voir le fichier

@@ -0,0 +1,15 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808ConsumerConfig: ConsumerConfig, IOptions<JT808ConsumerConfig>
{
public string TopicName { get; set; }

public JT808ConsumerConfig Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgConsumerConfig>
{
JT808MsgConsumerConfig IOptions<JT808MsgConsumerConfig>.Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions<JT808MsgProducerConfig>
{
JT808MsgProducerConfig IOptions<JT808MsgProducerConfig>.Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgReplyConsumerConfig>
{
JT808MsgReplyConsumerConfig IOptions<JT808MsgReplyConsumerConfig>.Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions<JT808MsgReplyProducerConfig>
{
JT808MsgReplyProducerConfig IOptions<JT808MsgReplyProducerConfig>.Value => this;
}
}

+ 15
- 0
src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs Voir le fichier

@@ -0,0 +1,15 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808ProducerConfig : ProducerConfig,IOptions<JT808ProducerConfig>
{
public string TopicName { get; set; }

public JT808ProducerConfig Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions<JT808SessionConsumerConfig>
{
JT808SessionConsumerConfig IOptions<JT808SessionConsumerConfig>.Value => this;
}
}

+ 13
- 0
src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs Voir le fichier

@@ -0,0 +1,13 @@
using Confluent.Kafka;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Configs.Kafka
{
public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions<JT808SessionProducerConfig>
{
JT808SessionProducerConfig IOptions<JT808SessionProducerConfig>.Value => this;
}
}

+ 39
- 0
src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj Voir le fichier

@@ -0,0 +1,39 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2019.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<RepositoryUrl>https://github.com/SmallChi/JT808Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT808Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</license>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<Version>1.0.0-preview2</Version>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<PackageId>JT808.Gateway.Kafka</PackageId>
<Product>JT808.Gateway.Kafka</Product>
<Description>基于Kafka的JT808消息发布与订阅</Description>
<PackageReleaseNotes>基于Kafka的JT808消息发布与订阅</PackageReleaseNotes>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.3.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
</ItemGroup>

<ItemGroup>
<None Include="..\..\LICENSE" Pack="true" PackagePath="" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj" />
</ItemGroup>

</Project>

+ 25
- 0
src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs Voir le fichier

@@ -0,0 +1,25 @@
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.Kafka
{
internal class JT808ClientBuilderDefault : IJT808ClientBuilder
{
public IJT808Builder JT808Builder { get; }

public JT808ClientBuilderDefault(IJT808Builder builder)
{
JT808Builder = builder;
}

public IJT808Builder Builder()
{
return JT808Builder;
}
}
}

+ 66
- 0
src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs Voir le fichier

@@ -0,0 +1,66 @@
using JJT808.Gateway.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace JT808.Gateway.Kafka
{
public static class JT808ClientKafkaExtensions
{
public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder)
{
return new JT808ClientBuilderDefault(builder);
}
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="configuration">GetSection("JT808MsgConsumerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgConsumerConfig>(configuration.GetSection("JT808MsgConsumerConfig"));
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
return jT808ClientBuilder;
}
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="configuration">GetSection("JT808MsgReplyProducerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyProducerConfig>(configuration.GetSection("JT808MsgReplyProducerConfig"));
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
return jT808ClientBuilder;
}
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig"));
jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
return jT808ClientBuilder;
}
/// <summary>
///
/// </summary>
/// <param name="serviceDescriptors"></param>
/// <param name="configuration">GetSection("JT808SessionConsumerConfig")</param>
/// <returns></returns>
public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
{
jT808ClientBuilder.JT808Builder.Services.Configure<JT808SessionConsumerConfig>(configuration.GetSection("JT808SessionConsumerConfig"));
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
return jT808ClientBuilder;
}
}
}

+ 82
- 0
src/JT808.Gateway.Kafka/JT808MsgConsumer.cs Voir le fichier

@@ -0,0 +1,82 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Kafka
{
public class JT808MsgConsumer : IJT808MsgConsumer
{
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly IConsumer<string, byte[]> consumer;

private readonly ILogger logger;

public string TopicName { get; }

public JT808MsgConsumer(
IOptions<JT808MsgConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build();
TopicName = consumerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger("JT808MsgConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
}
}
}, Cts.Token);
}

public void Subscribe()
{
consumer.Subscribe(TopicName);
}

public void Unsubscribe()
{
consumer.Unsubscribe();
}

public void Dispose()
{
consumer.Close();
consumer.Dispose();
}
}
}

+ 38
- 0
src/JT808.Gateway.Kafka/JT808MsgProducer.cs Voir le fichier

@@ -0,0 +1,38 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Kafka
{
public class JT808MsgProducer : IJT808MsgProducer
{
public string TopicName { get; }

private readonly IProducer<string, byte[]> producer;
public JT808MsgProducer(
IOptions<JT808MsgProducerConfig> producerConfigAccessor)
{
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName;
}

public void Dispose()
{
producer.Dispose();
}

public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
{
Key = terminalNo,
Value = data
});
}
}
}

+ 82
- 0
src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs Voir le fichier

@@ -0,0 +1,82 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Kafka
{
public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly IConsumer<string, byte[]> consumer;

private readonly ILogger logger;

public string TopicName { get; }

public JT808MsgReplyConsumer(
IOptions<JT808MsgReplyConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build();
TopicName = consumerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer");
}

public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
}
}
}, Cts.Token);
}

public void Subscribe()
{
consumer.Subscribe(TopicName);
}

public void Unsubscribe()
{
consumer.Unsubscribe();
}

public void Dispose()
{
consumer.Close();
consumer.Dispose();
}
}
}

+ 38
- 0
src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs Voir le fichier

@@ -0,0 +1,38 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JJT808.Gateway.Kafka
{
public class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
public string TopicName { get;}

private IProducer<string, byte[]> producer;
public JT808MsgReplyProducer(
IOptions<JT808MsgReplyProducerConfig> producerConfigAccessor)
{
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName;
}

public void Dispose()
{
producer.Dispose();
}

public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
await producer.ProduceAsync(TopicName, new Message<string, byte[]>
{
Key = terminalNo,
Value = data
});
}
}
}

+ 48
- 0
src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs Voir le fichier

@@ -0,0 +1,48 @@
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;

namespace JT808.Gateway.Kafka
{
public static class JT808ServerKafkaExtensions
{
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgProducerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
{
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgProducerConfig>(configuration.GetSection("JT808MsgProducerConfig"));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton));
return jT808GatewayBuilder;
}
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
{
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig"));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
return jT808GatewayBuilder;
}
/// <summary>
///
/// </summary>
/// <param name="jT808NettyBuilder"></param>
/// <param name="configuration">GetSection("JT808SessionProducerConfig")</param>
/// <returns></returns>
public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
{
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808SessionProducerConfig>(configuration.GetSection("JT808SessionProducerConfig"));
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton));
return jT808GatewayBuilder;
}
}
}

+ 82
- 0
src/JT808.Gateway.Kafka/JT808SessionConsumer.cs Voir le fichier

@@ -0,0 +1,82 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.Kafka
{
public class JT808SessionConsumer : IJT808SessionConsumer
{
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly IConsumer<string, string> consumer;

private readonly ILogger logger;

public string TopicName { get; }

public JT808SessionConsumer(
IOptions<JT808SessionConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
{
consumer = new ConsumerBuilder<string, string>(consumerConfigAccessor.Value).Build();
TopicName = consumerConfigAccessor.Value.TopicName;
logger = loggerFactory.CreateLogger("JT808SessionConsumer");
}

public void OnMessage(Action<(string Notice, string TerminalNo)> callback)
{
Task.Run(() =>
{
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
}
catch (OperationCanceledException ex)
{
logger.LogError(ex, TopicName);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
}
}
}, Cts.Token);
}

public void Subscribe()
{
consumer.Subscribe(TopicName);
}

public void Unsubscribe()
{
consumer.Unsubscribe();
}

public void Dispose()
{
consumer.Close();
consumer.Dispose();
}
}
}

+ 38
- 0
src/JT808.Gateway.Kafka/JT808SessionProducer.cs Voir le fichier

@@ -0,0 +1,38 @@
using Confluent.Kafka;
using JT808.Gateway.Configs.Kafka;
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.Kafka
{
public class JT808SessionProducer : IJT808SessionProducer
{
public string TopicName { get; }

private readonly IProducer<string, string> producer;
public JT808SessionProducer(
IOptions<JT808SessionProducerConfig> producerConfigAccessor)
{
producer = new ProducerBuilder<string, string>(producerConfigAccessor.Value).Build();
TopicName = producerConfigAccessor.Value.TopicName;
}

public void Dispose()
{
producer.Dispose();
}

public async ValueTask ProduceAsync(string notice,string terminalNo)
{
await producer.ProduceAsync(TopicName, new Message<string, string>
{
Key = notice,
Value = terminalNo
});
}
}
}

+ 6
- 0
src/JT808.Gateway.sln Voir le fichier

@@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Kafka", "JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj", "{274C048E-A8E3-4422-A578-A10A97DF36F2}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -33,6 +35,10 @@ Global
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.Build.0 = Release|Any CPU
{274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
{274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
{274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE


+ 4
- 0
src/JT808.Gateway/JT808.Gateway.csproj Voir le fichier

@@ -7,6 +7,10 @@
<Authors>SmallChi(Koike)</Authors>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<RepositoryUrl>https://github.com/SmallChi/JT808Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT808Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE</license>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Description>基于Pipeline实现的JT808Gateway的网络库</Description>


Chargement…
Annuler
Enregistrer