@@ -0,0 +1,15 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808ConsumerConfig: ConsumerConfig, IOptions<JT808ConsumerConfig> | |||||
{ | |||||
public string TopicName { get; set; } | |||||
public JT808ConsumerConfig Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgConsumerConfig> | |||||
{ | |||||
JT808MsgConsumerConfig IOptions<JT808MsgConsumerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions<JT808MsgProducerConfig> | |||||
{ | |||||
JT808MsgProducerConfig IOptions<JT808MsgProducerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgReplyConsumerConfig> | |||||
{ | |||||
JT808MsgReplyConsumerConfig IOptions<JT808MsgReplyConsumerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions<JT808MsgReplyProducerConfig> | |||||
{ | |||||
JT808MsgReplyProducerConfig IOptions<JT808MsgReplyProducerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,15 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808ProducerConfig : ProducerConfig,IOptions<JT808ProducerConfig> | |||||
{ | |||||
public string TopicName { get; set; } | |||||
public JT808ProducerConfig Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions<JT808SessionConsumerConfig> | |||||
{ | |||||
JT808SessionConsumerConfig IOptions<JT808SessionConsumerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,13 @@ | |||||
using Confluent.Kafka; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Configs.Kafka | |||||
{ | |||||
public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions<JT808SessionProducerConfig> | |||||
{ | |||||
JT808SessionProducerConfig IOptions<JT808SessionProducerConfig>.Value => this; | |||||
} | |||||
} |
@@ -0,0 +1,39 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<TargetFramework>netstandard2.0</TargetFramework> | |||||
<LangVersion>8.0</LangVersion> | |||||
<Copyright>Copyright 2018.</Copyright> | |||||
<Authors>SmallChi(Koike)</Authors> | |||||
<RepositoryUrl>https://github.com/SmallChi/JT808DotNetty</RepositoryUrl> | |||||
<PackageProjectUrl>https://github.com/SmallChi/JT808DotNetty</PackageProjectUrl> | |||||
<licenseUrl>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</licenseUrl> | |||||
<license>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</license> | |||||
<GeneratePackageOnBuild>false</GeneratePackageOnBuild> | |||||
<Version>1.0.0-preview1</Version> | |||||
<SignAssembly>false</SignAssembly> | |||||
<PackageLicenseFile>LICENSE</PackageLicenseFile> | |||||
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance> | |||||
<PackageId>JT808.Gateway.Kafka</PackageId> | |||||
<Product>JT808.Gateway.Kafka</Product> | |||||
<Description>基于Kafka的JT808消息发布与订阅</Description> | |||||
<PackageReleaseNotes>基于Kafka的JT808消息发布与订阅</PackageReleaseNotes> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Confluent.Kafka" Version="1.2.1" /> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.0.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<None Include="..\..\LICENSE" Pack="true" PackagePath="" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,24 @@ | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
internal class JT808ClientBuilderDefault : IJT808ClientBuilder | |||||
{ | |||||
public IJT808Builder JT808Builder { get; } | |||||
public JT808ClientBuilderDefault(IJT808Builder builder) | |||||
{ | |||||
JT808Builder = builder; | |||||
} | |||||
public IJT808Builder Builder() | |||||
{ | |||||
return JT808Builder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,66 @@ | |||||
using JJT808.Gateway.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using JT808.Protocol; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public static class JT808ClientKafkaExtensions | |||||
{ | |||||
public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder) | |||||
{ | |||||
return new JT808ClientBuilderDefault(builder); | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <param name="configuration">GetSection("JT808MsgConsumerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgConsumerConfig>(configuration.GetSection("JT808MsgConsumerConfig")); | |||||
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgConsumer, JT808MsgConsumer>(); | |||||
return jT808ClientBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <param name="configuration">GetSection("JT808MsgReplyProducerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyProducerConfig>(configuration.GetSection("JT808MsgReplyProducerConfig")); | |||||
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>(); | |||||
return jT808ClientBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808NettyBuilder"></param> | |||||
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig")); | |||||
jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); | |||||
return jT808ClientBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <param name="configuration">GetSection("JT808SessionConsumerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808ClientBuilder.JT808Builder.Services.Configure<JT808SessionConsumerConfig>(configuration.GetSection("JT808SessionConsumerConfig")); | |||||
jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionConsumer, JT808SessionConsumer>(); | |||||
return jT808ClientBuilder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,82 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public class JT808MsgConsumer : IJT808MsgConsumer | |||||
{ | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly IConsumer<string, byte[]> consumer; | |||||
private readonly ILogger logger; | |||||
public string TopicName { get; } | |||||
public JT808MsgConsumer( | |||||
IOptions<JT808MsgConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | |||||
TopicName = consumerConfigAccessor.Value.TopicName; | |||||
logger = loggerFactory.CreateLogger("JT808MsgConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||||
var data = consumer.Consume(Cts.Token); | |||||
if (logger.IsEnabled(LogLevel.Debug)) | |||||
{ | |||||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||||
} | |||||
callback((data.Key, data.Value)); | |||||
} | |||||
catch (ConsumeException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (OperationCanceledException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
consumer.Subscribe(TopicName); | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
consumer.Unsubscribe(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
consumer.Close(); | |||||
consumer.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public class JT808MsgProducer : IJT808MsgProducer | |||||
{ | |||||
public string TopicName { get; } | |||||
private readonly IProducer<string, byte[]> producer; | |||||
public JT808MsgProducer( | |||||
IOptions<JT808MsgProducerConfig> producerConfigAccessor) | |||||
{ | |||||
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); | |||||
TopicName = producerConfigAccessor.Value.TopicName; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
producer.Dispose(); | |||||
} | |||||
public async Task ProduceAsync(string terminalNo, byte[] data) | |||||
{ | |||||
await producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
{ | |||||
Key = terminalNo, | |||||
Value = data | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,82 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer | |||||
{ | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly IConsumer<string, byte[]> consumer; | |||||
private readonly ILogger logger; | |||||
public string TopicName { get; } | |||||
public JT808MsgReplyConsumer( | |||||
IOptions<JT808MsgReplyConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | |||||
TopicName = consumerConfigAccessor.Value.TopicName; | |||||
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||||
{ | |||||
Task.Run(() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||||
var data = consumer.Consume(Cts.Token); | |||||
if (logger.IsEnabled(LogLevel.Debug)) | |||||
{ | |||||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||||
} | |||||
callback((data.Key, data.Value)); | |||||
} | |||||
catch (ConsumeException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (OperationCanceledException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
consumer.Subscribe(TopicName); | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
consumer.Unsubscribe(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
consumer.Close(); | |||||
consumer.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JJT808.Gateway.Kafka | |||||
{ | |||||
public class JT808MsgReplyProducer : IJT808MsgReplyProducer | |||||
{ | |||||
public string TopicName { get;} | |||||
private IProducer<string, byte[]> producer; | |||||
public JT808MsgReplyProducer( | |||||
IOptions<JT808MsgReplyProducerConfig> producerConfigAccessor) | |||||
{ | |||||
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); | |||||
TopicName = producerConfigAccessor.Value.TopicName; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
producer.Dispose(); | |||||
} | |||||
public async Task ProduceAsync(string terminalNo, byte[] data) | |||||
{ | |||||
await producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
{ | |||||
Key = terminalNo, | |||||
Value = data | |||||
}); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,48 @@ | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public static class JT808ServerKafkaExtensions | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808NettyBuilder"></param> | |||||
/// <param name="configuration">GetSection("JT808MsgProducerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgProducerConfig>(configuration.GetSection("JT808MsgProducerConfig")); | |||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808NettyBuilder"></param> | |||||
/// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig")); | |||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <param name="jT808NettyBuilder"></param> | |||||
/// <param name="configuration">GetSection("JT808SessionProducerConfig")</param> | |||||
/// <returns></returns> | |||||
public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) | |||||
{ | |||||
jT808GatewayBuilder.JT808Builder.Services.Configure<JT808SessionProducerConfig>(configuration.GetSection("JT808SessionProducerConfig")); | |||||
jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); | |||||
return jT808GatewayBuilder; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,82 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public class JT808SessionConsumer : IJT808SessionConsumer | |||||
{ | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
private readonly IConsumer<string, string> consumer; | |||||
private readonly ILogger logger; | |||||
public string TopicName { get; } | |||||
public JT808SessionConsumer( | |||||
IOptions<JT808SessionConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
consumer = new ConsumerBuilder<string, string>(consumerConfigAccessor.Value).Build(); | |||||
TopicName = consumerConfigAccessor.Value.TopicName; | |||||
logger = loggerFactory.CreateLogger("JT808SessionConsumer"); | |||||
} | |||||
public void OnMessage(Action<(string Notice, string TerminalNo)> callback) | |||||
{ | |||||
Task.Run(() => | |||||
{ | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||||
var data = consumer.Consume(Cts.Token); | |||||
if (logger.IsEnabled(LogLevel.Debug)) | |||||
{ | |||||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||||
} | |||||
callback((data.Key, data.Value)); | |||||
} | |||||
catch (ConsumeException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (OperationCanceledException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
} | |||||
} | |||||
}, Cts.Token); | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
consumer.Subscribe(TopicName); | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
consumer.Unsubscribe(); | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
consumer.Close(); | |||||
consumer.Dispose(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,38 @@ | |||||
using Confluent.Kafka; | |||||
using JT808.Gateway.Configs.Kafka; | |||||
using JT808.Gateway.PubSub; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading.Tasks; | |||||
namespace JT808.Gateway.Kafka | |||||
{ | |||||
public class JT808SessionProducer : IJT808SessionProducer | |||||
{ | |||||
public string TopicName { get; } | |||||
private readonly IProducer<string, string> producer; | |||||
public JT808SessionProducer( | |||||
IOptions<JT808SessionProducerConfig> producerConfigAccessor) | |||||
{ | |||||
producer = new ProducerBuilder<string, string>(producerConfigAccessor.Value).Build(); | |||||
TopicName = producerConfigAccessor.Value.TopicName; | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
producer.Dispose(); | |||||
} | |||||
public async Task ProduceAsync(string notice,string terminalNo) | |||||
{ | |||||
await producer.ProduceAsync(TopicName, new Message<string, string> | |||||
{ | |||||
Key = notice, | |||||
Value = terminalNo | |||||
}); | |||||
} | |||||
} | |||||
} |