diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs new file mode 100644 index 0000000..9d2aa55 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808ConsumerConfig: ConsumerConfig, IOptions + { + public string TopicName { get; set; } + + public JT808ConsumerConfig Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs new file mode 100644 index 0000000..53746e2 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs new file mode 100644 index 0000000..d55b2ce --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs new file mode 100644 index 0000000..79f4c3c --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgReplyConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs new file mode 100644 index 0000000..9b62e6e --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgReplyProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs new file mode 100644 index 0000000..fca47cd --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808ProducerConfig : ProducerConfig,IOptions + { + public string TopicName { get; set; } + + public JT808ProducerConfig Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs new file mode 100644 index 0000000..6b57a0a --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808SessionConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs new file mode 100644 index 0000000..bba6871 --- /dev/null +++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Configs.Kafka +{ + public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions + { + JT808SessionProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj new file mode 100644 index 0000000..f68e24b --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj @@ -0,0 +1,39 @@ + + + + netstandard2.0 + 8.0 + Copyright 2018. + SmallChi(Koike) + https://github.com/SmallChi/JT808DotNetty + https://github.com/SmallChi/JT808DotNetty + https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE + https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE + false + 1.0.0-preview1 + false + LICENSE + true + JT808.Gateway.Kafka + JT808.Gateway.Kafka + 基于Kafka的JT808消息发布与订阅 + 基于Kafka的JT808消息发布与订阅 + + + + + + + + + + + + + + + + + + + diff --git a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs new file mode 100644 index 0000000..3279c64 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs @@ -0,0 +1,24 @@ +using JT808.Protocol; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.Gateway.Kafka +{ + internal class JT808ClientBuilderDefault : IJT808ClientBuilder + { + public IJT808Builder JT808Builder { get; } + + public JT808ClientBuilderDefault(IJT808Builder builder) + { + JT808Builder = builder; + } + + public IJT808Builder Builder() + { + return JT808Builder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs new file mode 100644 index 0000000..b334097 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs @@ -0,0 +1,66 @@ +using JJT808.Gateway.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using JT808.Protocol; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.Gateway.Kafka +{ + public static class JT808ClientKafkaExtensions + { + public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder) + { + return new JT808ClientBuilderDefault(builder); + } + /// + /// + /// + /// + /// GetSection("JT808MsgConsumerConfig") + /// + public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyProducerConfig") + /// + public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyProducerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyConsumerConfig") + /// + public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return jT808ClientBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808SessionConsumerConfig") + /// + public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration) + { + jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionConsumerConfig")); + jT808ClientBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808ClientBuilder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs new file mode 100644 index 0000000..56818f2 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgConsumer : IJT808MsgConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808MsgConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808MsgConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs new file mode 100644 index 0000000..67d6d1b --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgProducer : IJT808MsgProducer + { + public string TopicName { get; } + + private readonly IProducer producer; + public JT808MsgProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string terminalNo, byte[] data) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = terminalNo, + Value = data + }); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs new file mode 100644 index 0000000..004a391 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808MsgReplyConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); + } + + public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs new file mode 100644 index 0000000..f29e9be --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JJT808.Gateway.Kafka +{ + public class JT808MsgReplyProducer : IJT808MsgReplyProducer + { + public string TopicName { get;} + + private IProducer producer; + public JT808MsgReplyProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string terminalNo, byte[] data) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = terminalNo, + Value = data + }); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs new file mode 100644 index 0000000..e8e1dc1 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs @@ -0,0 +1,48 @@ +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.Gateway.Kafka +{ + public static class JT808ServerKafkaExtensions + { + /// + /// + /// + /// + /// GetSection("JT808MsgProducerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgProducerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyConsumerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808SessionProducerConfig") + /// + public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration) + { + jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionProducerConfig")); + jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton)); + return jT808GatewayBuilder; + } + } +} \ No newline at end of file diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs new file mode 100644 index 0000000..9ccf830 --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs @@ -0,0 +1,82 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808SessionConsumer : IJT808SessionConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808SessionConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + } + + public void OnMessage(Action<(string Notice, string TerminalNo)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs new file mode 100644 index 0000000..3b6494f --- /dev/null +++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs @@ -0,0 +1,38 @@ +using Confluent.Kafka; +using JT808.Gateway.Configs.Kafka; +using JT808.Gateway.PubSub; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.Gateway.Kafka +{ + public class JT808SessionProducer : IJT808SessionProducer + { + public string TopicName { get; } + + private readonly IProducer producer; + public JT808SessionProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string notice,string terminalNo) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = notice, + Value = terminalNo + }); + } + } +}