@@ -0,0 +1,169 @@ | |||||
using Confluent.Kafka; | |||||
using JT809.PubSub.Abstractions; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT809.KafkaService | |||||
{ | |||||
public abstract class JT809Consumer<T> : IJT808ConsumerOfT<T> | |||||
{ | |||||
private bool _disposed = false; | |||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
public virtual string TopicName => JT809Constants.JT809TopicName; | |||||
private readonly ILogger logger; | |||||
private List<TopicPartition> topicPartitionList; | |||||
private IList<IConsumer<string, T>> consumers; | |||||
protected abstract JT809PartitionOptions PartitionOptions { get; } | |||||
protected virtual Deserializer<T> Deserializer { get; } | |||||
protected JT809Consumer( | |||||
ConsumerConfig consumerConfig, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
logger = loggerFactory.CreateLogger("JT809Consumer"); | |||||
CreateTopicPartition(); | |||||
consumers = new List<IConsumer<string, T>>(); | |||||
foreach(var topicPartition in topicPartitionList) | |||||
{ | |||||
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(consumerConfig); | |||||
consumerBuilder.SetErrorHandler((consumer, error) => | |||||
{ | |||||
logger.LogError(error.Reason); | |||||
}); | |||||
if (Deserializer != null) | |||||
{ | |||||
consumerBuilder.SetValueDeserializer(Deserializer); | |||||
} | |||||
if (PartitionOptions.Partition > 1) | |||||
{ | |||||
consumerBuilder.SetPartitionsAssignedHandler((c, p) => { | |||||
p.Add(topicPartition); | |||||
}); | |||||
} | |||||
consumers.Add(consumerBuilder.Build()); | |||||
} | |||||
} | |||||
private void CreateTopicPartition() | |||||
{ | |||||
topicPartitionList = new List<TopicPartition>(); | |||||
if (PartitionOptions.Partition > 1) | |||||
{ | |||||
if(PartitionOptions.AssignPartitions!=null && PartitionOptions.AssignPartitions.Count>0) | |||||
{ | |||||
foreach(var p in PartitionOptions.AssignPartitions) | |||||
{ | |||||
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(p))); | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
for (int i = 0; i < PartitionOptions.Partition; i++) | |||||
{ | |||||
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); | |||||
} | |||||
} | |||||
} | |||||
else | |||||
{ | |||||
for (int i = 0; i < PartitionOptions.Partition; i++) | |||||
{ | |||||
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); | |||||
} | |||||
} | |||||
} | |||||
public void OnMessage(Action<(string MsgId, T data)> callback) | |||||
{ | |||||
logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}"); | |||||
for (int i = 0; i < consumers.Count; i++) | |||||
{ | |||||
Task.Factory.StartNew((num) => | |||||
{ | |||||
int n = (int)num; | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||||
//consumers[n].Assign(topicPartitionList[n]); | |||||
var data = consumers[n].Consume(Cts.Token); | |||||
if (logger.IsEnabled(LogLevel.Debug)) | |||||
{ | |||||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||||
} | |||||
callback((data.Key, data.Value)); | |||||
} | |||||
catch (ConsumeException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
Thread.Sleep(1000); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
Thread.Sleep(1000); | |||||
} | |||||
} | |||||
}, i, Cts.Token); | |||||
} | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
if (_disposed) return; | |||||
//仅有一个分区才需要订阅 | |||||
if (topicPartitionList.Count == 1) | |||||
{ | |||||
consumers[0].Subscribe(TopicName); | |||||
} | |||||
} | |||||
public void Unsubscribe() | |||||
{ | |||||
if (_disposed) return; | |||||
foreach(var c in consumers) | |||||
{ | |||||
c.Unsubscribe(); | |||||
} | |||||
} | |||||
public void Dispose() | |||||
{ | |||||
Dispose(true); | |||||
GC.SuppressFinalize(true); | |||||
} | |||||
~JT809Consumer() | |||||
{ | |||||
Dispose(false); | |||||
} | |||||
protected virtual void Dispose(bool disposing) | |||||
{ | |||||
if (_disposed) return; | |||||
if (disposing) | |||||
{ | |||||
Cts.Cancel(); | |||||
foreach (var c in consumers) | |||||
{ | |||||
c.Close(); | |||||
c.Dispose(); | |||||
} | |||||
Cts.Dispose(); | |||||
} | |||||
_disposed = true; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,59 @@ | |||||
using Confluent.Kafka; | |||||
using JT809.GrpcProtos; | |||||
using JT809.PubSub.Abstractions; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT809.KafkaService | |||||
{ | |||||
/// <summary> | |||||
/// | |||||
/// https://github.com/aspnet/Extensions/blob/master/src/DependencyInjection/DI.Specification.Tests/src/DependencyInjectionSpecificationTests.cs | |||||
/// https://github.com/aspnet/Extensions/pull/536 | |||||
/// </summary> | |||||
public static class JT809KafkaServiceExtensions | |||||
{ | |||||
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||||
{ | |||||
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("JT809ProducerConfig")); | |||||
serviceDescriptors.AddSingleton(typeof(JT809Producer<byte[]>), typeof(JT809_Same_Producer)); | |||||
serviceDescriptors.AddSingleton(typeof(JT809Producer<JT809GpsPosition>), typeof(JT809_GpsPositio_Producer)); | |||||
return serviceDescriptors; | |||||
} | |||||
public static IServiceCollection AddJT809KafkaProducerPartitionsService<TPartitionFactory>(this IServiceCollection serviceDescriptors,Action<JT809PartitionOptions> action) | |||||
where TPartitionFactory: IJT809ProducerPartitionFactory | |||||
{ | |||||
serviceDescriptors.Configure(action); | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); | |||||
return serviceDescriptors; | |||||
} | |||||
public static IServiceCollection AddJT809KafkaProducerPartitionsService<TPartitionFactory>(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||||
where TPartitionFactory : IJT809ProducerPartitionFactory | |||||
{ | |||||
serviceDescriptors.Configure<JT809PartitionOptions>(configuration.GetSection("JT809PartitionOptions")); | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); | |||||
return serviceDescriptors; | |||||
} | |||||
public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action<JT809PartitionOptions> action = null) | |||||
{ | |||||
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("JT809ConsumerConfig")); | |||||
if (configuration.GetSection("JT809PartitionOptions").Exists()) | |||||
{ | |||||
serviceDescriptors.Configure<JT809PartitionOptions>(configuration.GetSection("JT809PartitionOptions")); | |||||
} | |||||
if (action != null) | |||||
{ | |||||
serviceDescriptors.Configure<JT809PartitionOptions>(action); | |||||
} | |||||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<byte[]>), typeof(JT809_Same_Consumer)); | |||||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<JT809GpsPosition>), typeof(JT809_GpsPosition_Consumer)); | |||||
return serviceDescriptors; | |||||
} | |||||
} | |||||
} |
@@ -7,6 +7,10 @@ using System.Collections.Generic; | |||||
namespace JT809.KafkaService | namespace JT809.KafkaService | ||||
{ | { | ||||
/// <summary> | |||||
/// | |||||
/// </summary> | |||||
/// <typeparam name="T"></typeparam> | |||||
public abstract class JT809Producer<T> : IJT809ProducerOfT<T> | public abstract class JT809Producer<T> : IJT809ProducerOfT<T> | ||||
{ | { | ||||
private bool _disposed = false; | private bool _disposed = false; | ||||
@@ -17,15 +21,20 @@ namespace JT809.KafkaService | |||||
private IProducer<string, T> producer; | private IProducer<string, T> producer; | ||||
protected virtual Serializer<T> Serializer { get; } | |||||
protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } | protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } | ||||
protected virtual JT809PartitionOptions PartitionOptions { get; } | protected virtual JT809PartitionOptions PartitionOptions { get; } | ||||
protected abstract ProducerConfig ProducerConfig { get; } | |||||
protected JT809Producer() | |||||
protected JT809Producer(ProducerConfig producerConfig) | |||||
{ | { | ||||
CreateProducer(); | |||||
ProducerBuilder<string, T> producerBuilder= new ProducerBuilder<string, T>(producerConfig); | |||||
if (Serializer != null) | |||||
{ | |||||
producerBuilder.SetValueSerializer(Serializer); | |||||
} | |||||
producer = producerBuilder.Build(); | |||||
if (PartitionOptions != null) | if (PartitionOptions != null) | ||||
{ | { | ||||
TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>(); | TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>(); | ||||
@@ -74,8 +83,6 @@ namespace JT809.KafkaService | |||||
} | } | ||||
} | } | ||||
protected abstract IProducer<string, T> CreateProducer(); | |||||
public void Dispose() | public void Dispose() | ||||
{ | { | ||||
Dispose(true); | Dispose(true); | ||||
@@ -10,7 +10,7 @@ namespace JT809.KafkaService | |||||
{ | { | ||||
protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } | protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } | ||||
protected override ProducerConfig ProducerConfig { get; } | |||||
protected override Serializer<JT809GpsPosition> Serializer => (position) => position.ToByteArray(); | |||||
protected override JT809PartitionOptions PartitionOptions { get; } | protected override JT809PartitionOptions PartitionOptions { get; } | ||||
@@ -24,18 +24,10 @@ namespace JT809.KafkaService | |||||
IOptions<ProducerConfig> producerConfigAccessor, | IOptions<ProducerConfig> producerConfigAccessor, | ||||
IJT809ProducerPartitionFactory partitionFactory, | IJT809ProducerPartitionFactory partitionFactory, | ||||
IOptions<JT809PartitionOptions> partitionAccessor | IOptions<JT809PartitionOptions> partitionAccessor | ||||
) | |||||
):base(producerConfigAccessor.Value) | |||||
{ | { | ||||
ProducerPartitionFactory = partitionFactory; | ProducerPartitionFactory = partitionFactory; | ||||
ProducerConfig = producerConfigAccessor?.Value; | |||||
PartitionOptions = partitionAccessor?.Value; | PartitionOptions = partitionAccessor?.Value; | ||||
} | } | ||||
protected override IProducer<string, JT809GpsPosition> CreateProducer() | |||||
{ | |||||
return new ProducerBuilder<string, JT809GpsPosition>(ProducerConfig) | |||||
.SetValueSerializer((position) => position.ToByteArray()) | |||||
.Build(); | |||||
} | |||||
} | } | ||||
} | } |
@@ -11,110 +11,28 @@ using System.Threading.Tasks; | |||||
namespace JT809.KafkaService | namespace JT809.KafkaService | ||||
{ | { | ||||
public class JT809_GpsPosition_Consumer : IJT808ConsumerOfT<JT809GpsPosition> | |||||
public sealed class JT809_GpsPosition_Consumer : JT809Consumer<JT809GpsPosition> | |||||
{ | { | ||||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||||
protected override Deserializer<JT809GpsPosition> Deserializer => (data, isNull) => { | |||||
if (isNull) return default; | |||||
return new MessageParser<JT809GpsPosition>(() => new JT809GpsPosition()) | |||||
.ParseFrom(data.ToArray()); | |||||
}; | |||||
public string TopicName => JT809Constants.JT809TopicName; | |||||
private readonly ILogger<JT809_GpsPosition_Consumer> logger; | |||||
private readonly List<TopicPartition> topicPartitionList; | |||||
private readonly List<IConsumer<string, JT809GpsPosition>> consumers; | |||||
private readonly JT809PartitionOptions partition; | |||||
protected override JT809PartitionOptions PartitionOptions { get; } | |||||
public JT809_GpsPosition_Consumer( | public JT809_GpsPosition_Consumer( | ||||
IOptions<JT809PartitionOptions> partitionAccessor, | |||||
IOptions<ConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory) | |||||
{ | |||||
partition = partitionAccessor.Value; | |||||
logger = loggerFactory.CreateLogger<JT809_GpsPosition_Consumer>(); | |||||
topicPartitionList = new List<TopicPartition>(); | |||||
consumers = new List<IConsumer<string, JT809GpsPosition>>(); | |||||
for (int i=0;i< partition.Partition; i++) | |||||
{ | |||||
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i))); | |||||
consumers.Add(new ConsumerBuilder<string, JT809GpsPosition>(consumerConfigAccessor.Value) | |||||
.SetErrorHandler((consumer, error) => { | |||||
logger.LogError(error.Reason); | |||||
}) | |||||
.SetValueDeserializer((data, isNull) => { | |||||
if (isNull) return default; | |||||
return new MessageParser<JT809GpsPosition>(() => new JT809GpsPosition()) | |||||
.ParseFrom(data.ToArray()); | |||||
}) | |||||
.Build()); | |||||
} | |||||
} | |||||
public JT809_GpsPosition_Consumer( | |||||
IOptions<ConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory):this(new JT809PartitionOptions (), consumerConfigAccessor, loggerFactory) | |||||
{ | |||||
} | |||||
public void OnMessage(Action<(string MsgId, JT809GpsPosition data)> callback) | |||||
{ | |||||
logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}"); | |||||
for (int i = 0; i < consumers.Count; i++) | |||||
{ | |||||
Task.Factory.StartNew((num) => | |||||
{ | |||||
int n = (int)num; | |||||
while (!Cts.IsCancellationRequested) | |||||
{ | |||||
try | |||||
{ | |||||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||||
consumers[n].Assign(topicPartitionList[n]); | |||||
var data = consumers[n].Consume(Cts.Token); | |||||
if (logger.IsEnabled(LogLevel.Debug)) | |||||
{ | |||||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||||
} | |||||
callback((data.Key, data.Value)); | |||||
} | |||||
catch (ConsumeException ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
Thread.Sleep(1000); | |||||
} | |||||
catch (Exception ex) | |||||
{ | |||||
logger.LogError(ex, TopicName); | |||||
Thread.Sleep(1000); | |||||
} | |||||
} | |||||
}, i, Cts.Token); | |||||
} | |||||
} | |||||
public void Subscribe() | |||||
{ | |||||
//仅有一个分区才需要订阅 | |||||
if (topicPartitionList.Count == 1) | |||||
{ | |||||
consumers[0].Subscribe(TopicName); | |||||
} | |||||
} | |||||
public void Unsubscribe() | |||||
IOptions<ConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory, | |||||
IOptions<JT809PartitionOptions> partitionAccessor | |||||
) : base(consumerConfigAccessor.Value, loggerFactory) | |||||
{ | { | ||||
consumers.ForEach(consumer => consumer.Unsubscribe()); | |||||
PartitionOptions = partitionAccessor.Value; | |||||
} | } | ||||
public void Dispose() | |||||
public JT809_GpsPosition_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor,ILoggerFactory loggerFactory) | |||||
: this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) | |||||
{ | { | ||||
Cts.Cancel(); | |||||
consumers.ForEach(consumer => { | |||||
consumer.Close(); | |||||
consumer.Dispose(); | |||||
}); | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -0,0 +1,32 @@ | |||||
using Confluent.Kafka; | |||||
using Google.Protobuf; | |||||
using JT809.GrpcProtos; | |||||
using JT809.PubSub.Abstractions; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT809.KafkaService | |||||
{ | |||||
public sealed class JT809_Same_Consumer : JT809Consumer<byte[]> | |||||
{ | |||||
protected override JT809PartitionOptions PartitionOptions { get; } | |||||
public JT809_Same_Consumer( | |||||
IOptions<ConsumerConfig> consumerConfigAccessor, | |||||
ILoggerFactory loggerFactory, | |||||
IOptions<JT809PartitionOptions> partitionAccessor | |||||
) : base(consumerConfigAccessor.Value, loggerFactory) | |||||
{ | |||||
PartitionOptions = partitionAccessor.Value; | |||||
} | |||||
public JT809_Same_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor,ILoggerFactory loggerFactory) | |||||
: this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions()) | |||||
{ | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,32 @@ | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using Confluent.Kafka; | |||||
using JT809.PubSub.Abstractions; | |||||
using Microsoft.Extensions.Options; | |||||
namespace JT809.KafkaService | |||||
{ | |||||
public sealed class JT809_Same_Producer : JT809Producer<byte[]> | |||||
{ | |||||
protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; } | |||||
protected override JT809PartitionOptions PartitionOptions { get; } | |||||
public JT809_Same_Producer(IOptions<ProducerConfig> producerConfigAccessor) | |||||
: this(producerConfigAccessor, null, null) | |||||
{ | |||||
} | |||||
public JT809_Same_Producer( | |||||
IOptions<ProducerConfig> producerConfigAccessor, | |||||
IJT809ProducerPartitionFactory partitionFactory, | |||||
IOptions<JT809PartitionOptions> partitionAccessor | |||||
):base(producerConfigAccessor.Value) | |||||
{ | |||||
ProducerPartitionFactory = partitionFactory; | |||||
PartitionOptions = partitionAccessor?.Value; | |||||
} | |||||
} | |||||
} |
@@ -10,5 +10,7 @@ namespace JT809.PubSub.Abstractions | |||||
public int Partition { get; set; } = 1; | public int Partition { get; set; } = 1; | ||||
public JT809PartitionOptions Value => this; | public JT809PartitionOptions Value => this; | ||||
public List<int> AssignPartitions { get; set; } | |||||
} | } | ||||
} | } |