From 74a70a3e8a7e2d74909f04d5f35609136749525f Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Fri, 12 Apr 2019 23:22:29 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E7=94=9F=E4=BA=A7=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E6=A8=A1=E5=BC=8F=E5=8F=8A=E5=A2=9E=E5=8A=A0=E5=8D=95?= =?UTF-8?q?=E5=85=83=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT809.KafkaService/JT809Consumer.cs | 152 +++++------------ .../JT809.KafkaService/JT809ConsumerBase.cs | 34 ++++ .../JT809KafkaServiceExtensions.cs | 4 +- .../JT809PartitionConsumer.cs | 160 ++++++++++++++++++ .../JT809PartitionProducer.cs | 153 +++++++++++++++++ .../JT809.KafkaService/JT809Producer.cs | 116 +++---------- .../JT809.KafkaService/JT809ProducerBase.cs | 24 +++ .../JT809_GpsPositio_Producer.cs | 20 +-- .../JT809_GpsPosition_Consumer.cs | 20 +-- .../JT809.KafkaService/JT809_Same_Consumer.cs | 15 +- .../JT809.KafkaService/JT809_Same_Producer.cs | 19 +-- .../JT809.KafkaServiceTest/ConsumerTest.cs | 42 +++++ .../ConsumerTestService.cs | 19 +++ .../JT809.KafkaServiceTest.csproj | 37 ++++ .../ProducerPartitionTest.cs | 38 +++++ .../JT809.KafkaServiceTest/ProducerTest.cs | 31 ++++ .../ProducerTestService.cs | 20 +++ .../TestConsumerBase.cs | 37 ++++ .../TestProducerBase.cs | 37 ++++ .../TestProducerPartitionBase.cs | 37 ++++ .../appsettings.Partition.json | 23 +++ .../JT809.KafkaServiceTest/appsettings.json | 23 +++ .../IJT809Consumer.cs | 2 +- .../JT809TopicOptions.cs | 14 ++ src/JT809.DotNetty.sln | 7 + 25 files changed, 817 insertions(+), 267 deletions(-) create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs index 31da532..8546c7e 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Threading; @@ -8,137 +9,81 @@ using System.Threading.Tasks; namespace JT809.KafkaService { - public abstract class JT809Consumer : IJT808ConsumerOfT + public abstract class JT809Consumer : JT809ConsumerBase { private bool _disposed = false; - public CancellationTokenSource Cts => new CancellationTokenSource(); + public override CancellationTokenSource Cts => new CancellationTokenSource(); - public virtual string TopicName => JT809Constants.JT809TopicName; + protected ILogger logger { get; } - private readonly ILogger logger; - - private List topicPartitionList; - - private IList> consumers; - - protected abstract JT809PartitionOptions PartitionOptions { get; } - - protected virtual Deserializer Deserializer { get; } + protected override IList> Consumers { get; } protected JT809Consumer( - ConsumerConfig consumerConfig, - ILoggerFactory loggerFactory) + IOptions topicOptionsAccessor, + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + : base(topicOptionsAccessor.Value.TopicName, consumerConfigAccessor.Value) { logger = loggerFactory.CreateLogger("JT809Consumer"); - CreateTopicPartition(); - consumers = new List>(); - foreach(var topicPartition in topicPartitionList) - { - ConsumerBuilder consumerBuilder = new ConsumerBuilder(consumerConfig); - consumerBuilder.SetErrorHandler((consumer, error) => - { - logger.LogError(error.Reason); - }); - if (Deserializer != null) - { - consumerBuilder.SetValueDeserializer(Deserializer); - } - if (PartitionOptions.Partition > 1) - { - consumerBuilder.SetPartitionsAssignedHandler((c, p) => { - p.Add(topicPartition); - }); - } - consumers.Add(consumerBuilder.Build()); - } - } - - private void CreateTopicPartition() - { - topicPartitionList = new List(); - if (PartitionOptions.Partition > 1) + Consumers = new List>(); + ConsumerBuilder consumerBuilder = new ConsumerBuilder(ConsumerConfig); + consumerBuilder.SetErrorHandler((consumer, error) => { - if(PartitionOptions.AssignPartitions!=null && PartitionOptions.AssignPartitions.Count>0) - { - foreach(var p in PartitionOptions.AssignPartitions) - { - topicPartitionList.Add(new TopicPartition(TopicName, new Partition(p))); - } - } - else - { - for (int i = 0; i < PartitionOptions.Partition; i++) - { - topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); - } - } - } - else + logger.LogError(error.Reason); + }); + if (Deserializer != null) { - for (int i = 0; i < PartitionOptions.Partition; i++) - { - topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); - } + consumerBuilder.SetValueDeserializer(Deserializer); } + Consumers.Add(consumerBuilder.Build()); } - public void OnMessage(Action<(string MsgId, T data)> callback) + public override void OnMessage(Action<(string MsgId, T Data)> callback) { - logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}"); - for (int i = 0; i < consumers.Count; i++) + Task.Run(() => { - Task.Factory.StartNew((num) => + while (!Cts.IsCancellationRequested) { - int n = (int)num; - while (!Cts.IsCancellationRequested) + try { - try - { - //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 - //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 - //consumers[n].Assign(topicPartitionList[n]); - var data = consumers[n].Consume(Cts.Token); - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); - } - callback((data.Key, data.Value)); - } - catch (ConsumeException ex) - { - logger.LogError(ex, TopicName); - Thread.Sleep(1000); - } - catch (Exception ex) + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + //consumers[n].Assign(topicPartitionList[n]); + var data = Consumers[0].Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) { - logger.LogError(ex, TopicName); - Thread.Sleep(1000); + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); } + callback((data.Key, data.Value)); } - }, i, Cts.Token); - } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + } + }, Cts.Token); } - public void Subscribe() + public override void Subscribe() { if (_disposed) return; //仅有一个分区才需要订阅 - if (topicPartitionList.Count == 1) - { - consumers[0].Subscribe(TopicName); - } + Consumers[0].Subscribe(TopicName); } - public void Unsubscribe() + public override void Unsubscribe() { if (_disposed) return; - foreach(var c in consumers) - { - c.Unsubscribe(); - } + Consumers[0].Unsubscribe(); } - public void Dispose() + public override void Dispose() { Dispose(true); GC.SuppressFinalize(true); @@ -156,11 +101,8 @@ namespace JT809.KafkaService if (disposing) { Cts.Cancel(); - foreach (var c in consumers) - { - c.Close(); - c.Dispose(); - } + Consumers[0].Close(); + Consumers[0].Dispose(); Cts.Dispose(); } _disposed = true; diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs new file mode 100644 index 0000000..efd6bb6 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs @@ -0,0 +1,34 @@ +using Confluent.Kafka; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809.KafkaService +{ + public abstract class JT809ConsumerBase : IJT808ConsumerOfT + { + public string TopicName { get; } + + public ConsumerConfig ConsumerConfig { get; } + + protected JT809ConsumerBase(string topicName, ConsumerConfig config) + { + ConsumerConfig = config; + TopicName = topicName; + } + + public abstract CancellationTokenSource Cts { get; } + protected abstract IList> Consumers { get; } + + protected virtual Deserializer Deserializer { get; set; } + + public abstract void Dispose(); + public abstract void OnMessage(Action<(string MsgId, T Data)> callback); + public abstract void Subscribe(); + public abstract void Unsubscribe(); + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs index e48d4c5..8551578 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs @@ -18,7 +18,7 @@ namespace JT809.KafkaService { public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) { - serviceDescriptors.Configure(configuration.GetSection("JT809ProducerConfig")); + serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig")); serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_Same_Producer)); serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_GpsPositio_Producer)); return serviceDescriptors; @@ -42,7 +42,7 @@ namespace JT809.KafkaService public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action action = null) { - serviceDescriptors.Configure(configuration.GetSection("JT809ConsumerConfig")); + serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig")); if (configuration.GetSection("JT809PartitionOptions").Exists()) { serviceDescriptors.Configure(configuration.GetSection("JT809PartitionOptions")); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs new file mode 100644 index 0000000..c18c75c --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs @@ -0,0 +1,160 @@ +using Confluent.Kafka; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809.KafkaService +{ + public abstract class JT809PartitionConsumer : JT809ConsumerBase + { + private bool _disposed = false; + public override CancellationTokenSource Cts => new CancellationTokenSource(); + + protected ILogger logger { get; } + + private List topicPartitionList; + + private JT809PartitionOptions partitionOptions; + + protected override IList> Consumers { get; } + + protected JT809PartitionConsumer( + IOptions consumerConfigAccessor, + IOptions partitionOptionsAccessor, + IOptions topicOptionsAccessor, + ILoggerFactory loggerFactory) : base(topicOptionsAccessor.Value.TopicName, consumerConfigAccessor.Value) + { + logger = loggerFactory.CreateLogger("JT809PartitionConsumer"); + partitionOptions = partitionOptionsAccessor.Value; + topicPartitionList = CreateTopicPartition(); + Consumers = CreateConsumers(); + } + + protected virtual IList> CreateConsumers() + { + List> consumers = new List>(); + foreach (var topicPartition in topicPartitionList) + { + ConsumerBuilder consumerBuilder = new ConsumerBuilder(ConsumerConfig); + consumerBuilder.SetErrorHandler((consumer, error) => + { + logger.LogError(error.Reason); + }); + if (Deserializer != null) + { + consumerBuilder.SetValueDeserializer(Deserializer); + } + consumerBuilder.SetPartitionsAssignedHandler((c, p) => + { + p.Add(topicPartition); + }); + consumers.Add(consumerBuilder.Build()); + } + return consumers; + } + + protected virtual List CreateTopicPartition() + { + var topicPartitions = new List(); + if (partitionOptions.AssignPartitions != null && partitionOptions.AssignPartitions.Count > 0) + { + foreach (var p in partitionOptions.AssignPartitions) + { + topicPartitions.Add(new TopicPartition(TopicName, new Partition(p))); + } + } + else + { + for (int i = 0; i < partitionOptions.Partition; i++) + { + topicPartitions.Add(new TopicPartition(TopicName, new Partition(i))); + } + } + return topicPartitions; + } + + public override void OnMessage(Action<(string MsgId, T Data)> callback) + { + if(logger.IsEnabled( LogLevel.Debug)) + logger.LogDebug($"consumers:{Consumers.Count},topicPartitionList:{topicPartitionList.Count}"); + for (int i = 0; i < Consumers.Count; i++) + { + Task.Factory.StartNew((num) => + { + int n = (int)num; + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + //consumers[n].Assign(topicPartitionList[n]); + var data = Consumers[n].Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + } + }, i, Cts.Token); + } + } + + public override void Subscribe() + { + if (_disposed) return; + } + + public override void Unsubscribe() + { + if (_disposed) return; + foreach (var c in Consumers) + { + c.Unsubscribe(); + } + } + + public override void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + + } + + ~JT809PartitionConsumer() + { + Dispose(false); + } + + protected virtual void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) + { + Cts.Cancel(); + foreach (var c in Consumers) + { + c.Close(); + c.Dispose(); + } + Cts.Dispose(); + } + _disposed = true; + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs new file mode 100644 index 0000000..79ad8e9 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs @@ -0,0 +1,153 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; + +namespace JT809.KafkaService +{ + /// + /// + /// + /// + public abstract class JT809PartitionProducer : JT809ProducerBase + { + private bool _disposed = false; + + private ConcurrentDictionary TopicPartitionCache; + + private readonly IJT809ProducerPartitionFactory ProducerPartitionFactory; + + private readonly JT809PartitionOptions PartitionOptions; + + protected override IProducer Producer { get; } + + protected virtual IProducer CreateProducer() + { + ProducerBuilder producerBuilder = new ProducerBuilder(ProducerConfig); + if (Serializer != null) + { + producerBuilder.SetValueSerializer(Serializer); + } + if (PartitionOptions != null) + { + TopicPartitionCache = new ConcurrentDictionary(); + if (PartitionOptions.Partition > 1) + { + using (var adminClient = new AdminClient(Producer.Handle)) + { + try + { + adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = TopicName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait(); + } + catch (AggregateException ex) + { + //{Confluent.Kafka.Admin.CreateTopicsException: An error occurred creating topics: [jt809]: [Topic 'jt809' already exists.].} + if (ex.InnerException is Confluent.Kafka.Admin.CreateTopicsException exception) + { + + } + else + { + //记录日志 + //throw ex.InnerException; + } + } + try + { + //topic IncreaseTo 只增不减 + adminClient.CreatePartitionsAsync( + new List + { + new PartitionsSpecification + { + IncreaseTo = PartitionOptions.Partition, + Topic=TopicName + } + } + ).Wait(); + } + catch (AggregateException ex) + { + //记录日志 + // throw ex.InnerException; + } + } + } + } + return producerBuilder.Build(); + } + + protected JT809PartitionProducer( + IOptions topicOptionAccessor, + ProducerConfig producerConfig, + IJT809ProducerPartitionFactory producerPartitionFactory, + IOptions partitionOptionsAccessor) + : base(topicOptionAccessor.Value.TopicName, producerConfig) + { + PartitionOptions = partitionOptionsAccessor.Value; + ProducerPartitionFactory = producerPartitionFactory; + Producer = CreateProducer(); + } + + public override void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + } + + protected virtual void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) + { + Producer.Dispose(); + } + _disposed = true; + } + + public override async void ProduceAsync(string msgId, string vno_color, T data) + { + if (_disposed) return; + if (PartitionOptions != null) + { + if (PartitionOptions.Partition > 1) + { + if (!TopicPartitionCache.TryGetValue(vno_color, out TopicPartition topicPartition)) + { + topicPartition = new TopicPartition(TopicName, new Partition(ProducerPartitionFactory.CreatePartition(TopicName, msgId, vno_color))); + TopicPartitionCache.TryAdd(vno_color, topicPartition); + } + await Producer.ProduceAsync(topicPartition, new Message + { + Key = msgId, + Value = data + }); + } + else + { + await Producer.ProduceAsync(TopicName, new Message + { + Key = msgId, + Value = data + }); + } + } + else + { + await Producer.ProduceAsync(TopicName, new Message + { + Key = msgId, + Value = data + }); + } + } + + ~JT809PartitionProducer() + { + Dispose(false); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs index cce646f..780c7cb 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Confluent.Kafka.Admin; using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Options; using System; using System.Collections.Concurrent; using System.Collections.Generic; @@ -11,79 +12,31 @@ namespace JT809.KafkaService /// /// /// - public abstract class JT809Producer : IJT809ProducerOfT + public abstract class JT809Producer : JT809ProducerBase { private bool _disposed = false; - public virtual string TopicName => JT809Constants.JT809TopicName; - - private ConcurrentDictionary TopicPartitionCache; - - private IProducer producer; - - protected virtual Serializer Serializer { get; } - - protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } - - protected virtual JT809PartitionOptions PartitionOptions { get; } - - protected JT809Producer(ProducerConfig producerConfig) + protected virtual IProducer CreateProducer() { - ProducerBuilder producerBuilder= new ProducerBuilder(producerConfig); + ProducerBuilder producerBuilder = new ProducerBuilder(ProducerConfig); if (Serializer != null) { producerBuilder.SetValueSerializer(Serializer); } - producer = producerBuilder.Build(); - if (PartitionOptions != null) - { - TopicPartitionCache = new ConcurrentDictionary(); - if (PartitionOptions.Partition > 1) - { - using (var adminClient = new AdminClient(producer.Handle)) - { - try - { - adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = TopicName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait(); - } - catch (AggregateException ex) - { - //{Confluent.Kafka.Admin.CreateTopicsException: An error occurred creating topics: [jt809]: [Topic 'jt809' already exists.].} - if (ex.InnerException is Confluent.Kafka.Admin.CreateTopicsException exception) - { + return producerBuilder.Build(); + } - } - else - { - //记录日志 - //throw ex.InnerException; - } - } - try - { - //topic IncreaseTo 只增不减 - adminClient.CreatePartitionsAsync( - new List - { - new PartitionsSpecification - { - IncreaseTo = PartitionOptions.Partition, - Topic=TopicName - } - } - ).Wait(); - } - catch (AggregateException ex) - { - //记录日志 - // throw ex.InnerException; - } - } - } - } + protected override IProducer Producer { get; } + + protected JT809Producer( + IOptions topicOptionAccessor, + IOptions producerConfigAccessor) + : base(topicOptionAccessor.Value.TopicName, producerConfigAccessor.Value) + { + Producer = CreateProducer(); } - public void Dispose() + public override void Dispose() { Dispose(true); GC.SuppressFinalize(true); @@ -95,46 +48,19 @@ namespace JT809.KafkaService if (_disposed) return; if (disposing) { - producer.Dispose(); + Producer.Dispose(); } _disposed = true; } - public void ProduceAsync(string msgId, string vno_color, T data) + public override async void ProduceAsync(string msgId, string vno_color, T data) { if (_disposed) return; - if (PartitionOptions != null) + await Producer.ProduceAsync(TopicName, new Message { - if (PartitionOptions.Partition > 1) - { - if (!TopicPartitionCache.TryGetValue(vno_color, out TopicPartition topicPartition)) - { - topicPartition = new TopicPartition(TopicName, new Partition(ProducerPartitionFactory.CreatePartition(TopicName, msgId, vno_color))); - TopicPartitionCache.TryAdd(vno_color, topicPartition); - } - producer.ProduceAsync(topicPartition, new Message - { - Key = msgId, - Value = data - }); - } - else - { - producer.ProduceAsync(TopicName, new Message - { - Key = msgId, - Value = data - }); - } - } - else - { - producer.ProduceAsync(TopicName, new Message - { - Key = msgId, - Value = data - }); - } + Key = msgId, + Value = data + }); } ~JT809Producer() diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs new file mode 100644 index 0000000..f7f30d5 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs @@ -0,0 +1,24 @@ +using Confluent.Kafka; +using JT809.PubSub.Abstractions; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService +{ + public abstract class JT809ProducerBase : IJT809ProducerOfT + { + protected JT809ProducerBase(string topicName,ProducerConfig config) + { + ProducerConfig = config; + TopicName = topicName; + } + + public ProducerConfig ProducerConfig { get;} + public string TopicName { get; } + protected abstract IProducer Producer { get;} + protected virtual Serializer Serializer { get; set; } + public abstract void Dispose(); + public abstract void ProduceAsync(string msgId, string vno_color, T data); + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs index 3be99f5..8b81035 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs @@ -8,26 +8,10 @@ namespace JT809.KafkaService { public sealed class JT809_GpsPositio_Producer : JT809Producer { - protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } - - protected override Serializer Serializer => (position) => position.ToByteArray(); - - protected override JT809PartitionOptions PartitionOptions { get; } - - public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) - : this(producerConfigAccessor,null, null ) + public JT809_GpsPositio_Producer(IOptions topicOptionAccessor, IOptions producerConfigAccessor) : base(topicOptionAccessor, producerConfigAccessor) { - } - public JT809_GpsPositio_Producer( - IOptions producerConfigAccessor, - IJT809ProducerPartitionFactory partitionFactory, - IOptions partitionAccessor - ):base(producerConfigAccessor.Value) - { - ProducerPartitionFactory = partitionFactory; - PartitionOptions = partitionAccessor?.Value; - } + protected override Serializer Serializer => (position) => position.ToByteArray(); } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs index 7b1e5e9..3700523 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs @@ -13,26 +13,14 @@ namespace JT809.KafkaService { public sealed class JT809_GpsPosition_Consumer : JT809Consumer { + public JT809_GpsPosition_Consumer(IOptions topicOptionsAccessor, IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base(topicOptionsAccessor, consumerConfigAccessor, loggerFactory) + { + } + protected override Deserializer Deserializer => (data, isNull) => { if (isNull) return default; return new MessageParser(() => new JT809GpsPosition()) .ParseFrom(data.ToArray()); }; - - protected override JT809PartitionOptions PartitionOptions { get; } - - public JT809_GpsPosition_Consumer( - IOptions consumerConfigAccessor, - ILoggerFactory loggerFactory, - IOptions partitionAccessor - ) : base(consumerConfigAccessor.Value, loggerFactory) - { - PartitionOptions = partitionAccessor.Value; - } - - public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor,ILoggerFactory loggerFactory) - : this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) - { - } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs index fae192d..5637673 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs @@ -13,19 +13,8 @@ namespace JT809.KafkaService { public sealed class JT809_Same_Consumer : JT809Consumer { - protected override JT809PartitionOptions PartitionOptions { get; } - - public JT809_Same_Consumer( - IOptions consumerConfigAccessor, - ILoggerFactory loggerFactory, - IOptions partitionAccessor - ) : base(consumerConfigAccessor.Value, loggerFactory) - { - PartitionOptions = partitionAccessor.Value; - } - - public JT809_Same_Consumer(IOptions consumerConfigAccessor,ILoggerFactory loggerFactory) - : this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) + public JT809_Same_Consumer(IOptions topicOptionsAccessor, IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) + : base(topicOptionsAccessor, consumerConfigAccessor, loggerFactory) { } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs index 71df7e2..420a8f6 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs @@ -9,24 +9,9 @@ namespace JT809.KafkaService { public sealed class JT809_Same_Producer : JT809Producer { - protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } - - protected override JT809PartitionOptions PartitionOptions { get; } - - public JT809_Same_Producer(IOptions producerConfigAccessor) - : this(producerConfigAccessor, null, null) - { - - } - - public JT809_Same_Producer( - IOptions producerConfigAccessor, - IJT809ProducerPartitionFactory partitionFactory, - IOptions partitionAccessor - ):base(producerConfigAccessor.Value) + public JT809_Same_Producer(IOptions topicOptionAccessor, IOptions producerConfigAccessor) + : base(topicOptionAccessor, producerConfigAccessor) { - ProducerPartitionFactory = partitionFactory; - PartitionOptions = partitionAccessor?.Value; } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs new file mode 100644 index 0000000..a4f9a4f --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs @@ -0,0 +1,42 @@ +using System; +using Xunit; +using Microsoft.Extensions.DependencyInjection; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; +using System.Threading; + +namespace JT809.KafkaServiceTest +{ + public class ConsumerTest: TestConsumerBase + { + [Fact] + public void Test1() + { + ConsumerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.GpsConsumer.OnMessage((Message)=> + { + Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId); + Assert.Equal("粤A23456", Message.Data.Vno); + Assert.Equal(2, Message.Data.VColor); + Assert.Equal("smallchi", Message.Data.FromChannel); + }); + producerTestService.GpsConsumer.Subscribe(); + + Thread.Sleep(100000); + } + + [Fact] + public void Test2() + { + ConsumerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.SameConsumer.OnMessage((Message) => + { + Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId); + Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data); + }); + producerTestService.SameConsumer.Subscribe(); + + Thread.Sleep(100000); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs new file mode 100644 index 0000000..968e877 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs @@ -0,0 +1,19 @@ +using JT809.GrpcProtos; +using JT809.KafkaService; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaServiceTest +{ + public class ConsumerTestService + { + public JT809Consumer SameConsumer { get; } + public JT809Consumer GpsConsumer { get; } + public ConsumerTestService(JT809Consumer sameConsumer, JT809Consumer gpsConsumer) + { + SameConsumer = sameConsumer; + GpsConsumer = gpsConsumer; + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj new file mode 100644 index 0000000..ddad5da --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj @@ -0,0 +1,37 @@ + + + + netcoreapp2.2 + + false + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers; buildtransitive + + + + + + + + + + Always + + + Always + + + + diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs new file mode 100644 index 0000000..2726ec7 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs @@ -0,0 +1,38 @@ +using System; +using Xunit; +using Microsoft.Extensions.DependencyInjection; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; + +namespace JT809.KafkaServiceTest +{ + public class ProducerPartitionTest : TestProducerBase + { + [Fact] + public void Test1() + { + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition + { + Vno= "A23456", + VColor=2, + GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, + FromChannel="smallchi" + }); + } + + [Fact] + public void Test2() + { + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 }); + } + + [Fact] + public void Test3() + { + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 }); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs new file mode 100644 index 0000000..efbcf61 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs @@ -0,0 +1,31 @@ +using System; +using Xunit; +using Microsoft.Extensions.DependencyInjection; +using JT809.Protocol.Enums; +using JT809.Protocol.Extensions; + +namespace JT809.KafkaServiceTest +{ + public class ProducerTest: TestProducerBase + { + [Fact] + public void Test1() + { + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition + { + Vno= "A23456", + VColor=2, + GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, + FromChannel="smallchi" + }); + } + + [Fact] + public void Test2() + { + ProducerTestService producerTestService = ServiceProvider.GetRequiredService(); + producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 }); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs new file mode 100644 index 0000000..7e9a479 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs @@ -0,0 +1,20 @@ +using JT809.GrpcProtos; +using JT809.KafkaService; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaServiceTest +{ + public class ProducerTestService + { + public JT809Producer SameProducer { get; } + public JT809Producer GpsProducer { get; } + public ProducerTestService(JT809Producer sameProducer, JT809Producer gpsProducer) + { + SameProducer = sameProducer; + GpsProducer = gpsProducer; + } + + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs new file mode 100644 index 0000000..075b0f0 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using JT809.KafkaService; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace JT809.KafkaServiceTest +{ + public class TestConsumerBase + { + public IServiceProvider ServiceProvider { get; } + + public IConfigurationRoot ConfigurationRoot { get; } + public TestConsumerBase() + { + var builder = new ConfigurationBuilder(); + builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + builder.AddJsonFile("appsettings.json"); + ConfigurationRoot = builder.Build(); + + ServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddLogging(configure => + { + configure.AddDebug(); + configure.SetMinimumLevel(LogLevel.Trace); + }); + serviceDescriptors.AddJT809KafkaConsumerService(ConfigurationRoot); + serviceDescriptors.AddSingleton(); + ServiceProvider = serviceDescriptors.BuildServiceProvider(); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs new file mode 100644 index 0000000..5a8f7f6 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using JT809.KafkaService; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace JT809.KafkaServiceTest +{ + public class TestProducerBase + { + public IServiceProvider ServiceProvider { get; } + + public IConfigurationRoot ConfigurationRoot { get; } + public TestProducerBase() + { + var builder = new ConfigurationBuilder(); + builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + builder.AddJsonFile("appsettings.json"); + ConfigurationRoot = builder.Build(); + + ServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddLogging(configure => + { + configure.AddDebug(); + configure.SetMinimumLevel(LogLevel.Trace); + }); + serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot); + serviceDescriptors.AddSingleton(); + ServiceProvider = serviceDescriptors.BuildServiceProvider(); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs new file mode 100644 index 0000000..4b9fd03 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs @@ -0,0 +1,37 @@ +using System; +using System.Collections.Generic; +using System.IO; +using System.Text; +using JT809.KafkaService; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Logging; + +namespace JT809.KafkaServiceTest +{ + public class TestProducerPartitionBase + { + public IServiceProvider ServiceProvider { get; } + + public IConfigurationRoot ConfigurationRoot { get; } + public TestProducerPartitionBase() + { + var builder = new ConfigurationBuilder(); + builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + builder.AddJsonFile("appsettings.Partition.json"); + ConfigurationRoot = builder.Build(); + + ServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.AddSingleton(); + serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + serviceDescriptors.AddLogging(configure => + { + configure.AddDebug(); + configure.SetMinimumLevel(LogLevel.Trace); + }); + serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot); + serviceDescriptors.AddSingleton(); + ServiceProvider = serviceDescriptors.BuildServiceProvider(); + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json new file mode 100644 index 0000000..3c94db0 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json @@ -0,0 +1,23 @@ +{ + "Logging": { + "IncludeScopes": false, + "Debug": { + "LogLevel": { + "Default": "Trace" + } + }, + "Console": { + "LogLevel": { + "Default": "Trace" + } + } + }, + "KafkaProducerConfig": { + "BootstrapServers": "127.0.0.1:9092" + }, + "KafkaConsumerConfig": { + "BootstrapServers": "127.0.0.1:9092", + "EnableAutoCommit": true, + "GroupId": "JT809.Gps.Test" + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json new file mode 100644 index 0000000..3c94db0 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json @@ -0,0 +1,23 @@ +{ + "Logging": { + "IncludeScopes": false, + "Debug": { + "LogLevel": { + "Default": "Trace" + } + }, + "Console": { + "LogLevel": { + "Default": "Trace" + } + } + }, + "KafkaProducerConfig": { + "BootstrapServers": "127.0.0.1:9092" + }, + "KafkaConsumerConfig": { + "BootstrapServers": "127.0.0.1:9092", + "EnableAutoCommit": true, + "GroupId": "JT809.Gps.Test" + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs index bd986b3..7d111e2 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs @@ -11,7 +11,7 @@ namespace JT809.PubSub.Abstractions } public interface IJT808ConsumerOfT :IDisposable { - void OnMessage(Action<(string MsgId, T data)> callback); + void OnMessage(Action<(string MsgId, T Data)> callback); CancellationTokenSource Cts { get; } void Subscribe(); void Unsubscribe(); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs new file mode 100644 index 0000000..aed8ded --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs @@ -0,0 +1,14 @@ +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.PubSub.Abstractions +{ + public class JT809TopicOptions:IOptions + { + public string TopicName { get; set; } = JT809Constants.JT809TopicName; + + public JT809TopicOptions Value => this; + } +} diff --git a/src/JT809.DotNetty.sln b/src/JT809.DotNetty.sln index 23b40f6..93eb1fa 100644 --- a/src/JT809.DotNetty.sln +++ b/src/JT809.DotNetty.sln @@ -25,6 +25,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.GrpcProtos", "JT809.D EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -59,6 +61,10 @@ Global {8119D905-241F-4EFF-B300-1FB474B8C665}.Debug|Any CPU.Build.0 = Debug|Any CPU {8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.ActiveCfg = Release|Any CPU {8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.Build.0 = Release|Any CPU + {22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.Build.0 = Debug|Any CPU + {22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.ActiveCfg = Release|Any CPU + {22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -70,6 +76,7 @@ Global {5F8AFD67-FDA8-40A6-A655-FD855E2CCF26} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} {D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} {8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} + {22F008D5-61F8-4889-80DB-91B37591322F} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D}