|
|
@@ -1,5 +1,5 @@ |
|
|
|
using Confluent.Kafka; |
|
|
|
using JT809.PubSub.Abstractions; |
|
|
|
using JT809.KafkaService.Configs; |
|
|
|
using Microsoft.Extensions.Logging; |
|
|
|
using Microsoft.Extensions.Options; |
|
|
|
using System; |
|
|
@@ -16,21 +16,20 @@ namespace JT809.KafkaService |
|
|
|
|
|
|
|
protected ILogger logger { get; } |
|
|
|
|
|
|
|
protected override IList<IConsumer<string, T>> Consumers { get; } |
|
|
|
protected override IConsumer<string, T> Consumer { get; } |
|
|
|
|
|
|
|
protected JT809Consumer( |
|
|
|
IOptions<ConsumerConfig> consumerConfigAccessor, |
|
|
|
IOptions<JT809ConsumerConfig> consumerConfigAccessor, |
|
|
|
ILoggerFactory loggerFactory) |
|
|
|
: base(consumerConfigAccessor.Value) |
|
|
|
{ |
|
|
|
logger = loggerFactory.CreateLogger("JT809Consumer"); |
|
|
|
Consumers = new List<IConsumer<string, T>>(); |
|
|
|
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(ConsumerConfig); |
|
|
|
consumerBuilder.SetErrorHandler((consumer, error) => |
|
|
|
{ |
|
|
|
logger.LogError(error.Reason); |
|
|
|
}); |
|
|
|
Consumers.Add(consumerBuilder.Build()); |
|
|
|
Consumer = consumerBuilder.Build(); |
|
|
|
} |
|
|
|
|
|
|
|
public override void OnMessage(Action<(string MsgId, T Data)> callback) |
|
|
@@ -44,7 +43,7 @@ namespace JT809.KafkaService |
|
|
|
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 |
|
|
|
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 |
|
|
|
//consumers[n].Assign(topicPartitionList[n]); |
|
|
|
var data = Consumers[0].Consume(Cts.Token); |
|
|
|
var data = Consumer.Consume(Cts.Token); |
|
|
|
if (logger.IsEnabled(LogLevel.Debug)) |
|
|
|
{ |
|
|
|
logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); |
|
|
@@ -53,13 +52,11 @@ namespace JT809.KafkaService |
|
|
|
} |
|
|
|
catch (ConsumeException ex) |
|
|
|
{ |
|
|
|
#warning topicname |
|
|
|
logger.LogError(ex, ""); |
|
|
|
logger.LogError(ex, ConsumerConfig.TopicName); |
|
|
|
} |
|
|
|
catch (Exception ex) |
|
|
|
{ |
|
|
|
#warning topicname |
|
|
|
logger.LogError(ex, ""); |
|
|
|
logger.LogError(ex, ConsumerConfig.TopicName); |
|
|
|
} |
|
|
|
} |
|
|
|
}, Cts.Token); |
|
|
@@ -69,14 +66,13 @@ namespace JT809.KafkaService |
|
|
|
{ |
|
|
|
if (_disposed) return; |
|
|
|
//仅有一个分区才需要订阅 |
|
|
|
#warning topicname |
|
|
|
Consumers[0].Subscribe(""); |
|
|
|
Consumer.Subscribe(ConsumerConfig.TopicName); |
|
|
|
} |
|
|
|
|
|
|
|
public override void Unsubscribe() |
|
|
|
{ |
|
|
|
if (_disposed) return; |
|
|
|
Consumers[0].Unsubscribe(); |
|
|
|
Consumer.Unsubscribe(); |
|
|
|
} |
|
|
|
|
|
|
|
public override void Dispose() |
|
|
@@ -97,8 +93,8 @@ namespace JT809.KafkaService |
|
|
|
if (disposing) |
|
|
|
{ |
|
|
|
Cts.Cancel(); |
|
|
|
Consumers[0].Close(); |
|
|
|
Consumers[0].Dispose(); |
|
|
|
Consumer.Close(); |
|
|
|
Consumer.Dispose(); |
|
|
|
Cts.Dispose(); |
|
|
|
} |
|
|
|
_disposed = true; |
|
|
|