From 78d3958046f6f17e96ab2d3b8c0934c6416dc1af Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Thu, 11 Apr 2019 23:29:42 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4kafka=E7=9A=84=E7=94=9F?= =?UTF-8?q?=E4=BA=A7=E6=B6=88=E8=B4=B9=E5=BE=85=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT809.KafkaService/JT809Consumer.cs | 169 ++++++++++++++++++ .../JT809KafkaServiceExtensions.cs | 59 ++++++ .../JT809.KafkaService/JT809Producer.cs | 19 +- .../JT809_GpsPositio_Producer.cs | 12 +- .../JT809_GpsPosition_Consumer.cs | 110 ++---------- .../JT809.KafkaService/JT809_Same_Consumer.cs | 32 ++++ .../JT809.KafkaService/JT809_Same_Producer.cs | 32 ++++ .../JT809PartitionOptions.cs | 2 + 8 files changed, 323 insertions(+), 112 deletions(-) create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs new file mode 100644 index 0000000..31da532 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs @@ -0,0 +1,169 @@ +using Confluent.Kafka; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809.KafkaService +{ + public abstract class JT809Consumer : IJT808ConsumerOfT + { + private bool _disposed = false; + public CancellationTokenSource Cts => new CancellationTokenSource(); + + public virtual string TopicName => JT809Constants.JT809TopicName; + + private readonly ILogger logger; + + private List topicPartitionList; + + private IList> consumers; + + protected abstract JT809PartitionOptions PartitionOptions { get; } + + protected virtual Deserializer Deserializer { get; } + + protected JT809Consumer( + ConsumerConfig consumerConfig, + ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger("JT809Consumer"); + CreateTopicPartition(); + consumers = new List>(); + foreach(var topicPartition in topicPartitionList) + { + ConsumerBuilder consumerBuilder = new ConsumerBuilder(consumerConfig); + consumerBuilder.SetErrorHandler((consumer, error) => + { + logger.LogError(error.Reason); + }); + if (Deserializer != null) + { + consumerBuilder.SetValueDeserializer(Deserializer); + } + if (PartitionOptions.Partition > 1) + { + consumerBuilder.SetPartitionsAssignedHandler((c, p) => { + p.Add(topicPartition); + }); + } + consumers.Add(consumerBuilder.Build()); + } + } + + private void CreateTopicPartition() + { + topicPartitionList = new List(); + if (PartitionOptions.Partition > 1) + { + if(PartitionOptions.AssignPartitions!=null && PartitionOptions.AssignPartitions.Count>0) + { + foreach(var p in PartitionOptions.AssignPartitions) + { + topicPartitionList.Add(new TopicPartition(TopicName, new Partition(p))); + } + } + else + { + for (int i = 0; i < PartitionOptions.Partition; i++) + { + topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); + } + } + } + else + { + for (int i = 0; i < PartitionOptions.Partition; i++) + { + topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); + } + } + } + + public void OnMessage(Action<(string MsgId, T data)> callback) + { + logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}"); + for (int i = 0; i < consumers.Count; i++) + { + Task.Factory.StartNew((num) => + { + int n = (int)num; + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + //consumers[n].Assign(topicPartitionList[n]); + var data = consumers[n].Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + } + }, i, Cts.Token); + } + } + + public void Subscribe() + { + if (_disposed) return; + //仅有一个分区才需要订阅 + if (topicPartitionList.Count == 1) + { + consumers[0].Subscribe(TopicName); + } + } + + public void Unsubscribe() + { + if (_disposed) return; + foreach(var c in consumers) + { + c.Unsubscribe(); + } + } + + public void Dispose() + { + Dispose(true); + GC.SuppressFinalize(true); + + } + + ~JT809Consumer() + { + Dispose(false); + } + + protected virtual void Dispose(bool disposing) + { + if (_disposed) return; + if (disposing) + { + Cts.Cancel(); + foreach (var c in consumers) + { + c.Close(); + c.Dispose(); + } + Cts.Dispose(); + } + _disposed = true; + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs new file mode 100644 index 0000000..e48d4c5 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs @@ -0,0 +1,59 @@ +using Confluent.Kafka; +using JT809.GrpcProtos; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService +{ + /// + /// + /// https://github.com/aspnet/Extensions/blob/master/src/DependencyInjection/DI.Specification.Tests/src/DependencyInjectionSpecificationTests.cs + /// https://github.com/aspnet/Extensions/pull/536 + /// + public static class JT809KafkaServiceExtensions + { + public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT809ProducerConfig")); + serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_Same_Producer)); + serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_GpsPositio_Producer)); + return serviceDescriptors; + } + + public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors,Action action) + where TPartitionFactory: IJT809ProducerPartitionFactory + { + serviceDescriptors.Configure(action); + serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); + return serviceDescriptors; + } + + public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration) + where TPartitionFactory : IJT809ProducerPartitionFactory + { + serviceDescriptors.Configure(configuration.GetSection("JT809PartitionOptions")); + serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); + return serviceDescriptors; + } + + public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action action = null) + { + serviceDescriptors.Configure(configuration.GetSection("JT809ConsumerConfig")); + if (configuration.GetSection("JT809PartitionOptions").Exists()) + { + serviceDescriptors.Configure(configuration.GetSection("JT809PartitionOptions")); + } + if (action != null) + { + serviceDescriptors.Configure(action); + } + serviceDescriptors.AddSingleton(typeof(JT809Consumer), typeof(JT809_Same_Consumer)); + serviceDescriptors.AddSingleton(typeof(JT809Consumer), typeof(JT809_GpsPosition_Consumer)); + return serviceDescriptors; + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs index 939abf2..cce646f 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs @@ -7,6 +7,10 @@ using System.Collections.Generic; namespace JT809.KafkaService { + /// + /// + /// + /// public abstract class JT809Producer : IJT809ProducerOfT { private bool _disposed = false; @@ -17,15 +21,20 @@ namespace JT809.KafkaService private IProducer producer; + protected virtual Serializer Serializer { get; } + protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } protected virtual JT809PartitionOptions PartitionOptions { get; } - protected abstract ProducerConfig ProducerConfig { get; } - - protected JT809Producer() + protected JT809Producer(ProducerConfig producerConfig) { - CreateProducer(); + ProducerBuilder producerBuilder= new ProducerBuilder(producerConfig); + if (Serializer != null) + { + producerBuilder.SetValueSerializer(Serializer); + } + producer = producerBuilder.Build(); if (PartitionOptions != null) { TopicPartitionCache = new ConcurrentDictionary(); @@ -74,8 +83,6 @@ namespace JT809.KafkaService } } - protected abstract IProducer CreateProducer(); - public void Dispose() { Dispose(true); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs index 256c6b2..3be99f5 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs @@ -10,7 +10,7 @@ namespace JT809.KafkaService { protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } - protected override ProducerConfig ProducerConfig { get; } + protected override Serializer Serializer => (position) => position.ToByteArray(); protected override JT809PartitionOptions PartitionOptions { get; } @@ -24,18 +24,10 @@ namespace JT809.KafkaService IOptions producerConfigAccessor, IJT809ProducerPartitionFactory partitionFactory, IOptions partitionAccessor - ) + ):base(producerConfigAccessor.Value) { ProducerPartitionFactory = partitionFactory; - ProducerConfig = producerConfigAccessor?.Value; PartitionOptions = partitionAccessor?.Value; } - - protected override IProducer CreateProducer() - { - return new ProducerBuilder(ProducerConfig) - .SetValueSerializer((position) => position.ToByteArray()) - .Build(); - } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs index 74e05d5..7b1e5e9 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs @@ -11,110 +11,28 @@ using System.Threading.Tasks; namespace JT809.KafkaService { - public class JT809_GpsPosition_Consumer : IJT808ConsumerOfT + public sealed class JT809_GpsPosition_Consumer : JT809Consumer { - public CancellationTokenSource Cts => new CancellationTokenSource(); + protected override Deserializer Deserializer => (data, isNull) => { + if (isNull) return default; + return new MessageParser(() => new JT809GpsPosition()) + .ParseFrom(data.ToArray()); + }; - public string TopicName => JT809Constants.JT809TopicName; - - private readonly ILogger logger; - - private readonly List topicPartitionList; - - private readonly List> consumers; - - private readonly JT809PartitionOptions partition; + protected override JT809PartitionOptions PartitionOptions { get; } public JT809_GpsPosition_Consumer( - IOptions partitionAccessor, - IOptions consumerConfigAccessor, - ILoggerFactory loggerFactory) - { - partition = partitionAccessor.Value; - logger = loggerFactory.CreateLogger(); - topicPartitionList = new List(); - consumers = new List>(); - for (int i=0;i< partition.Partition; i++) - { - topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); - consumers.Add(new ConsumerBuilder(consumerConfigAccessor.Value) - .SetErrorHandler((consumer, error) => { - logger.LogError(error.Reason); - }) - .SetValueDeserializer((data, isNull) => { - if (isNull) return default; - return new MessageParser(() => new JT809GpsPosition()) - .ParseFrom(data.ToArray()); - }) - .Build()); - } - } - - public JT809_GpsPosition_Consumer( - IOptions consumerConfigAccessor, - ILoggerFactory loggerFactory):this(new JT809PartitionOptions (), consumerConfigAccessor, loggerFactory) - { - - } - - public void OnMessage(Action<(string MsgId, JT809GpsPosition data)> callback) - { - logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}"); - for (int i = 0; i < consumers.Count; i++) - { - Task.Factory.StartNew((num) => - { - int n = (int)num; - while (!Cts.IsCancellationRequested) - { - try - { - //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 - //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 - consumers[n].Assign(topicPartitionList[n]); - var data = consumers[n].Consume(Cts.Token); - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); - } - callback((data.Key, data.Value)); - } - catch (ConsumeException ex) - { - logger.LogError(ex, TopicName); - Thread.Sleep(1000); - } - catch (Exception ex) - { - logger.LogError(ex, TopicName); - Thread.Sleep(1000); - } - } - }, i, Cts.Token); - } - } - - public void Subscribe() - { - //仅有一个分区才需要订阅 - if (topicPartitionList.Count == 1) - { - consumers[0].Subscribe(TopicName); - } - } - - public void Unsubscribe() + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory, + IOptions partitionAccessor + ) : base(consumerConfigAccessor.Value, loggerFactory) { - consumers.ForEach(consumer => consumer.Unsubscribe()); + PartitionOptions = partitionAccessor.Value; } - public void Dispose() + public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor,ILoggerFactory loggerFactory) + : this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) { - Cts.Cancel(); - consumers.ForEach(consumer => { - consumer.Close(); - consumer.Dispose(); - }); } } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs new file mode 100644 index 0000000..fae192d --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs @@ -0,0 +1,32 @@ +using Confluent.Kafka; +using Google.Protobuf; +using JT809.GrpcProtos; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace JT809.KafkaService +{ + public sealed class JT809_Same_Consumer : JT809Consumer + { + protected override JT809PartitionOptions PartitionOptions { get; } + + public JT809_Same_Consumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory, + IOptions partitionAccessor + ) : base(consumerConfigAccessor.Value, loggerFactory) + { + PartitionOptions = partitionAccessor.Value; + } + + public JT809_Same_Consumer(IOptions consumerConfigAccessor,ILoggerFactory loggerFactory) + : this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) + { + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs new file mode 100644 index 0000000..71df7e2 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs @@ -0,0 +1,32 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Confluent.Kafka; +using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Options; + +namespace JT809.KafkaService +{ + public sealed class JT809_Same_Producer : JT809Producer + { + protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } + + protected override JT809PartitionOptions PartitionOptions { get; } + + public JT809_Same_Producer(IOptions producerConfigAccessor) + : this(producerConfigAccessor, null, null) + { + + } + + public JT809_Same_Producer( + IOptions producerConfigAccessor, + IJT809ProducerPartitionFactory partitionFactory, + IOptions partitionAccessor + ):base(producerConfigAccessor.Value) + { + ProducerPartitionFactory = partitionFactory; + PartitionOptions = partitionAccessor?.Value; + } + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs index 5deb4a2..27e1046 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809PartitionOptions.cs @@ -10,5 +10,7 @@ namespace JT809.PubSub.Abstractions public int Partition { get; set; } = 1; public JT809PartitionOptions Value => this; + + public List AssignPartitions { get; set; } } }