From 950345adddab289a123542bd3d1bc01abdda773a Mon Sep 17 00:00:00 2001 From: "SmallChi(Koike)" <564952747@qq.com> Date: Mon, 25 May 2020 23:23:50 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E7=94=9F=E4=BA=A7=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JT809SubordinateLinkNotifyImplService.cs | 7 +++-- .../JT809.DotNetty.Core.csproj | 2 +- .../JT809.Inferior.Client.csproj | 2 +- .../Configs/JT809ConsumerConfig.cs | 15 +++++++++++ .../Configs/JT809ProducerConfig.cs | 15 +++++++++++ .../JT809.KafkaService/JT809Consumer.cs | 26 ++++++++----------- .../JT809.KafkaService/JT809ConsumerBase.cs | 9 ++++--- .../JT809.KafkaService/JT809Producer.cs | 6 ++--- .../JT809.KafkaService/JT809ProducerBase.cs | 8 +++--- .../JT809_GpsPositio_Producer.cs | 3 ++- .../JT809_GpsPosition_Consumer.cs | 3 ++- .../JT809.KafkaService/JT809_Same_Consumer.cs | 3 ++- .../JT809.KafkaService/JT809_Same_Producer.cs | 3 ++- .../JT809.DotNetty.Host.Test.csproj | 2 +- 14 files changed, 68 insertions(+), 36 deletions(-) create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs index 646f179..d98ac9e 100644 --- a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs +++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs @@ -44,18 +44,17 @@ namespace JT809.DotNetty.Core.Internal { if (configuration.SubordinateClientEnable) { -#warning JT809GlobalConfig +#warning jT809SuperiorMainSessionManager //var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID); //if (session != null) //{ // //发送从链路注销请求 // var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007() // { -#warning JT809_0x9007_ReasonCode??? - // ErrorCode = JT809_0x1007_ErrorCode.主链路断开 + // ReasonCode = reasonCode // }); // JT809Response jT809Response = new JT809Response(package, 100); - // if(logger.IsEnabled(LogLevel.Information)) + // if (logger.IsEnabled(LogLevel.Information)) // logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}"); // session.Channel.WriteAndFlushAsync(jT809Response); //} diff --git a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj index 1084653..fa53237 100644 --- a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj +++ b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj @@ -26,7 +26,7 @@ - + diff --git a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj index 37eda4e..00ecc9d 100644 --- a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj +++ b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj @@ -6,7 +6,7 @@ 7.3 - + diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs new file mode 100644 index 0000000..524556a --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService.Configs +{ + public class JT809ConsumerConfig: ConsumerConfig, IOptions + { + public string TopicName { get; set; } + + public JT809ConsumerConfig Value => this; + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs new file mode 100644 index 0000000..0d994d5 --- /dev/null +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs @@ -0,0 +1,15 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT809.KafkaService.Configs +{ + public class JT809ProducerConfig : ProducerConfig,IOptions + { + public string TopicName { get; set; } + + public JT809ProducerConfig Value => this; + } +} diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs index 8672977..236a5c8 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs @@ -1,5 +1,5 @@ using Confluent.Kafka; -using JT809.PubSub.Abstractions; +using JT809.KafkaService.Configs; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; using System; @@ -16,21 +16,20 @@ namespace JT809.KafkaService protected ILogger logger { get; } - protected override IList> Consumers { get; } + protected override IConsumer Consumer { get; } protected JT809Consumer( - IOptions consumerConfigAccessor, + IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor.Value) { logger = loggerFactory.CreateLogger("JT809Consumer"); - Consumers = new List>(); ConsumerBuilder consumerBuilder = new ConsumerBuilder(ConsumerConfig); consumerBuilder.SetErrorHandler((consumer, error) => { logger.LogError(error.Reason); }); - Consumers.Add(consumerBuilder.Build()); + Consumer = consumerBuilder.Build(); } public override void OnMessage(Action<(string MsgId, T Data)> callback) @@ -44,7 +43,7 @@ namespace JT809.KafkaService //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 //consumers[n].Assign(topicPartitionList[n]); - var data = Consumers[0].Consume(Cts.Token); + var data = Consumer.Consume(Cts.Token); if (logger.IsEnabled(LogLevel.Debug)) { logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}"); @@ -53,13 +52,11 @@ namespace JT809.KafkaService } catch (ConsumeException ex) { -#warning topicname - logger.LogError(ex, ""); + logger.LogError(ex, ConsumerConfig.TopicName); } catch (Exception ex) { -#warning topicname - logger.LogError(ex, ""); + logger.LogError(ex, ConsumerConfig.TopicName); } } }, Cts.Token); @@ -69,14 +66,13 @@ namespace JT809.KafkaService { if (_disposed) return; //仅有一个分区才需要订阅 -#warning topicname - Consumers[0].Subscribe(""); + Consumer.Subscribe(ConsumerConfig.TopicName); } public override void Unsubscribe() { if (_disposed) return; - Consumers[0].Unsubscribe(); + Consumer.Unsubscribe(); } public override void Dispose() @@ -97,8 +93,8 @@ namespace JT809.KafkaService if (disposing) { Cts.Cancel(); - Consumers[0].Close(); - Consumers[0].Dispose(); + Consumer.Close(); + Consumer.Dispose(); Cts.Dispose(); } _disposed = true; diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs index 4dc81e3..5e64cea 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs @@ -1,4 +1,5 @@ using Confluent.Kafka; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -11,15 +12,15 @@ namespace JT809.KafkaService { public abstract class JT809ConsumerBase : IJT808ConsumerOfT { - public ConsumerConfig ConsumerConfig { get; } + public JT809ConsumerConfig ConsumerConfig { get; } - protected JT809ConsumerBase( ConsumerConfig config) + protected JT809ConsumerBase(IOptions config) { - ConsumerConfig = config; + ConsumerConfig = config.Value; } public abstract CancellationTokenSource Cts { get; } - protected abstract IList> Consumers { get; } + protected abstract IConsumer Consumer { get; } public abstract void Dispose(); public abstract void OnMessage(Action<(string MsgId, T Data)> callback); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs index 068f104..7268102 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs @@ -1,5 +1,6 @@ using Confluent.Kafka; using Confluent.Kafka.Admin; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Options; using System; @@ -25,7 +26,7 @@ namespace JT809.KafkaService protected override IProducer Producer { get; } protected JT809Producer( - IOptions producerConfigAccessor) + IOptions producerConfigAccessor) : base(producerConfigAccessor.Value) { Producer = CreateProducer(); @@ -51,8 +52,7 @@ namespace JT809.KafkaService public override async void ProduceAsync(string msgId, string vno_color, T data) { if (_disposed) return; -#warning topicname - await Producer.ProduceAsync("", new Message + await Producer.ProduceAsync(ProducerConfig.TopicName, new Message { Key = msgId, Value = data diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs index 01f32ba..afe9bba 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs @@ -1,5 +1,7 @@ using Confluent.Kafka; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; +using Microsoft.Extensions.Options; using System; using System.Collections.Generic; using System.Text; @@ -8,12 +10,12 @@ namespace JT809.KafkaService { public abstract class JT809ProducerBase : IJT809ProducerOfT { - protected JT809ProducerBase(ProducerConfig config) + protected JT809ProducerBase(IOptions config) { - ProducerConfig = config; + ProducerConfig = config.Value; } - public ProducerConfig ProducerConfig { get;} + public JT809ProducerConfig ProducerConfig { get;} protected abstract IProducer Producer { get;} public abstract void Dispose(); public abstract void ProduceAsync(string msgId, string vno_color, T data); diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs index 59e9e2c..ca3d6b7 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Google.Protobuf; using JT809.GrpcProtos; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Options; @@ -8,7 +9,7 @@ namespace JT809.KafkaService { public sealed class JT809_GpsPositio_Producer : JT809Producer { - public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) : base( producerConfigAccessor) + public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) : base( producerConfigAccessor) { } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs index f01e274..1e55c76 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Google.Protobuf; using JT809.GrpcProtos; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -13,7 +14,7 @@ namespace JT809.KafkaService { public sealed class JT809_GpsPosition_Consumer : JT809Consumer { - public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory) + public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory) { } } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs index 83ba2ad..956eb73 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs @@ -1,6 +1,7 @@ using Confluent.Kafka; using Google.Protobuf; using JT809.GrpcProtos; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; @@ -13,7 +14,7 @@ namespace JT809.KafkaService { public sealed class JT809_Same_Consumer : JT809Consumer { - public JT809_Same_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) + public JT809_Same_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor, loggerFactory) { } diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs index 38ccc4e..50a58d2 100644 --- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs +++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs @@ -2,6 +2,7 @@ using System.Collections.Generic; using System.Text; using Confluent.Kafka; +using JT809.KafkaService.Configs; using JT809.PubSub.Abstractions; using Microsoft.Extensions.Options; @@ -9,7 +10,7 @@ namespace JT809.KafkaService { public sealed class JT809_Same_Producer : JT809Producer { - public JT809_Same_Producer(IOptions producerConfigAccessor) + public JT809_Same_Producer(IOptions producerConfigAccessor) : base(producerConfigAccessor) { } diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj index ba25158..c5c9629 100644 --- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj +++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj @@ -6,7 +6,7 @@ 7.3 - +