From ecb34e0b378c01ec752d2f71da2f1ab66004fe33 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Mon, 23 Dec 2019 11:23:11 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E8=BF=98=E5=8E=9Fkafka=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../Configs/JT808ConsumerConfig.cs | 15 ++++ .../Configs/JT808MsgConsumerConfig.cs | 13 +++ .../Configs/JT808MsgProducerConfig.cs | 13 +++ .../Configs/JT808MsgReplyConsumerConfig.cs | 13 +++ .../Configs/JT808MsgReplyProducerConfig.cs | 13 +++ .../Configs/JT808ProducerConfig.cs | 15 ++++ .../Configs/JT808SessionConsumerConfig.cs | 13 +++ .../Configs/JT808SessionProducerConfig.cs | 13 +++ .../JT808.Gateway.Kafka.csproj | 39 +++++++++ .../JT808ClientBuilderDefault.cs | 24 ++++++ .../JT808ClientKafkaExtensions.cs | 66 +++++++++++++++ src/JT808.Gateway.Kafka/JT808MsgConsumer.cs | 82 +++++++++++++++++++ src/JT808.Gateway.Kafka/JT808MsgProducer.cs | 38 +++++++++ .../JT808MsgReplyConsumer.cs | 82 +++++++++++++++++++ .../JT808MsgReplyProducer.cs | 38 +++++++++ .../JT808ServerKafkaExtensions.cs | 48 +++++++++++ .../JT808SessionConsumer.cs | 82 +++++++++++++++++++ .../JT808SessionProducer.cs | 38 +++++++++ 18 files changed, 645 insertions(+) create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs create mode 100644 src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj create mode 100644 src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs create mode 100644 src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs create mode 100644 src/JT808.Gateway.Kafka/JT808MsgConsumer.cs create mode 100644 src/JT808.Gateway.Kafka/JT808MsgProducer.cs create mode 100644 src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs create mode 100644 src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs create mode 100644 src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs create mode 100644 src/JT808.Gateway.Kafka/JT808SessionConsumer.cs create mode 100644 src/JT808.Gateway.Kafka/JT808SessionProducer.cs diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs new file mode 100644 index 0000000..9d2aa55 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808ConsumerConfig: ConsumerConfig, IOptions + { + public string TopicName { get; set; } + + public JT808ConsumerConfig Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs new file mode 100644 index 0000000..53746e2 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs new file mode 100644 index 0000000..d55b2ce --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs new file mode 100644 index 0000000..79f4c3c --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgReplyConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs new file mode 100644 index 0000000..9b62e6e --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgReplyProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs new file mode 100644 index 0000000..fca47cd --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808ProducerConfig : ProducerConfig,IOptions + { + public string TopicName { get; set; } + + public JT808ProducerConfig Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs new file mode 100644 index 0000000..6b57a0a --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808SessionConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs new file mode 100644 index 0000000..bba6871 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions + { + JT808SessionProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj new file mode 100644 index 0000000..f68e24b --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj @@ -0,0 +1,39 @@ + + + + netstandard2.0 + 8.0 + Copyright 2018. + SmallChi(Koike) + https://github.com/SmallChi/JT808DotNetty + https://github.com/SmallChi/JT808DotNetty + https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE + https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE + false + 1.0.0-preview1 + false + LICENSE + true + JT808.Gateway.Kafka + JT808.Gateway.Kafka + 基于Kafka的JT808消息发布与订阅 + 基于Kafka的JT808消息发布与订阅 + + + + + + + + + + + + + + + + + + + diff --git a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs new file mode 100644 index 0000000..3279c64 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs @@ -0,0 +1,24 @@ +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Kafka +{ + internal class JT808ClientBuilderDefault : IJT808ClientBuilder + { + public IJT808Builder JT808Builder { get; } + + public JT808ClientBuilderDefault(IJT808Builder builder) + { + JT808Builder = builder; + } + + public IJT808Builder Builder() + { + return JT808Builder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs new file mode 100644 index 0000000..b334097 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs @@ -0,0 +1,66 @@ +using JJT808.Gateway.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using JT808.Protocol; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.Gateway.Kafka +{ + public static class JT808ClientKafkaExtensions + { + public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder) + { + return new JT808ClientBuilderDefault(builder); + } + /// + /// + /// + /// + /// GetSection("JT808MsgConsumerConfig") + /// + public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyProducerConfig") + /// + public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyProducerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyConsumerConfig") + /// + public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808SessionConsumerConfig") + /// + public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs new file mode 100644 index 0000000..56818f2 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgConsumer : IJT808MsgConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808MsgConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808MsgConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs new file mode 100644 index 0000000..67d6d1b --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgProducer : IJT808MsgProducer + { + public string TopicName { get; } + + private readonly IProducer producer; + public JT808MsgProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string terminalNo, byte[] data) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = terminalNo, + Value = data + }); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs new file mode 100644 index 0000000..004a391 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808MsgReplyConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs new file mode 100644 index 0000000..f29e9be --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JJT808.Gateway.Kafka +{ + public class JT808MsgReplyProducer : IJT808MsgReplyProducer + { + public string TopicName { get;} + + private IProducer producer; + public JT808MsgReplyProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string terminalNo, byte[] data) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = terminalNo, + Value = data + }); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs new file mode 100644 index 0000000..e8e1dc1 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs @@ -0,0 +1,48 @@ +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.Gateway.Kafka +{ + public static class JT808ServerKafkaExtensions + { + /// + /// + /// + /// + /// GetSection("JT808MsgProducerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgProducerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyConsumerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808SessionProducerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionProducerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs new file mode 100644 index 0000000..9ccf830 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808SessionConsumer : IJT808SessionConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808SessionConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + } + + public void OnMessage(Action<(string Notice, string TerminalNo)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs new file mode 100644 index 0000000..3b6494f --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808SessionProducer : IJT808SessionProducer + { + public string TopicName { get; } + + private readonly IProducer producer; + public JT808SessionProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string notice,string terminalNo) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = notice, + Value = terminalNo + }); + } + } +} From 88ad8fba0b69c4b2f728f2dfb5ba5a286ba8d351 Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Mon, 23 Dec 2019 11:23:32 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E5=A2=9E=E5=8A=A0kafka=E9=A1=B9=E7=9B=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT808.Gateway.Abstractions.csproj | 4 +++ .../JT808.Gateway.Kafka.csproj | 28 +++++++++---------- .../JT808ClientBuilderDefault.cs | 3 +- .../JT808ClientKafkaExtensions.cs | 2 +- src/JT808.Gateway.Kafka/JT808MsgConsumer.cs | 2 +- src/JT808.Gateway.Kafka/JT808MsgProducer.cs | 4 +-- .../JT808MsgReplyConsumer.cs | 2 +- .../JT808MsgReplyProducer.cs | 4 +-- .../JT808ServerKafkaExtensions.cs | 2 +- .../JT808SessionConsumer.cs | 2 +- .../JT808SessionProducer.cs | 4 +-- src/JT808.Gateway.sln | 6 ++++ src/JT808.Gateway/JT808.Gateway.csproj | 4 +++ 13 files changed, 41 insertions(+), 26 deletions(-) diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj index de3eb76..4774af9 100644 --- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj +++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj @@ -6,6 +6,10 @@ SmallChi(Koike) false false + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE LICENSE true 基于Pipeline实现的JT808Gateway的抽象库 diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj index f68e24b..da06104 100644 --- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj @@ -1,16 +1,16 @@  - netstandard2.0 + netstandard2.1 8.0 - Copyright 2018. + Copyright 2019. SmallChi(Koike) - https://github.com/SmallChi/JT808DotNetty - https://github.com/SmallChi/JT808DotNetty - https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE - https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE false - 1.0.0-preview1 + 1.0.0-preview2 false LICENSE true @@ -20,12 +20,12 @@ 基于Kafka的JT808消息发布与订阅 - - - - - - + + + + + + @@ -33,7 +33,7 @@ - + diff --git a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs index 3279c64..d34e239 100644 --- a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs +++ b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs @@ -1,4 +1,5 @@ -using JT808.Protocol; +using JT808.Gateway.Abstractions; +using JT808.Protocol; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using System; diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs index b334097..0875b36 100644 --- a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs +++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs @@ -1,6 +1,6 @@ using JJT808.Gateway.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using JT808.Protocol; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs index 56818f2..da9d71e 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs index 67d6d1b..e644927 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; @@ -26,7 +26,7 @@ namespace JT808.Gateway.Kafka producer.Dispose(); } - public async Task ProduceAsync(string terminalNo, byte[] data) + public async ValueTask ProduceAsync(string terminalNo, byte[] data) { await producer.ProduceAsync(TopicName, new Message { diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs index 004a391..340fd30 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs index f29e9be..a609273 100644 --- a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; @@ -26,7 +26,7 @@ namespace JJT808.Gateway.Kafka producer.Dispose(); } - public async Task ProduceAsync(string terminalNo, byte[] data) + public async ValueTask ProduceAsync(string terminalNo, byte[] data) { await producer.ProduceAsync(TopicName, new Message { diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs index e8e1dc1..e4e493b 100644 --- a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs +++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs @@ -1,5 +1,5 @@ using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs index 9ccf830..83b4d31 100644 --- a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs +++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs index 3b6494f..9049a0d 100644 --- a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs +++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs @@ -1,6 +1,6 @@ using Confluent.Kafka; using JT808.Gateway.Configs.Kafka; -using JT808.Gateway.PubSub; +using JT808.Gateway.Abstractions; using Microsoft.Extensions.Options; using System; using System.Collections.Generic; @@ -26,7 +26,7 @@ namespace JT808.Gateway.Kafka producer.Dispose(); } - public async Task ProduceAsync(string notice,string terminalNo) + public async ValueTask ProduceAsync(string notice,string terminalNo) { await producer.ProduceAsync(TopicName, new Message { diff --git a/src/JT808.Gateway.sln b/src/JT808.Gateway.sln index 3719e74..e171f8b 100644 --- a/src/JT808.Gateway.sln +++ b/src/JT808.Gateway.sln @@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Kafka", "JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj", "{274C048E-A8E3-4422-A578-A10A97DF36F2}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -33,6 +35,10 @@ Global {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU {3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.Build.0 = Release|Any CPU + {274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj index fdbfd6e..ff072a0 100644 --- a/src/JT808.Gateway/JT808.Gateway.csproj +++ b/src/JT808.Gateway/JT808.Gateway.csproj @@ -7,6 +7,10 @@ SmallChi(Koike) false false + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE + https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE LICENSE true 基于Pipeline实现的JT808Gateway的网络库