diff --git a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs
index 646f179..d98ac9e 100644
--- a/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs
+++ b/src/JT809.DotNetty.Core/Internal/JT809SubordinateLinkNotifyImplService.cs
@@ -44,18 +44,17 @@ namespace JT809.DotNetty.Core.Internal
{
if (configuration.SubordinateClientEnable)
{
-#warning JT809GlobalConfig
+#warning jT809SuperiorMainSessionManager
//var session = jT809SuperiorMainSessionManager.GetSession(JT809GlobalConfig.Instance.HeaderOptions.MsgGNSSCENTERID);
//if (session != null)
//{
// //发送从链路注销请求
// var package = JT809BusinessType.从链路断开通知消息.Create(new JT809_0x9007()
// {
-#warning JT809_0x9007_ReasonCode???
- // ErrorCode = JT809_0x1007_ErrorCode.主链路断开
+ // ReasonCode = reasonCode
// });
// JT809Response jT809Response = new JT809Response(package, 100);
- // if(logger.IsEnabled(LogLevel.Information))
+ // if (logger.IsEnabled(LogLevel.Information))
// logger.LogInformation($"从链路断开通知消息>>>{JT809Serializer.Serialize(package, 100).ToHexString()}");
// session.Channel.WriteAndFlushAsync(jT809Response);
//}
diff --git a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj
index 1084653..fa53237 100644
--- a/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj
+++ b/src/JT809.DotNetty.Core/JT809.DotNetty.Core.csproj
@@ -26,7 +26,7 @@
-
+
diff --git a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj
index 37eda4e..00ecc9d 100644
--- a/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj
+++ b/src/JT809.DotNetty.Simples/Inferior/JT809.Inferior.Client/JT809.Inferior.Client.csproj
@@ -6,7 +6,7 @@
7.3
-
+
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs
new file mode 100644
index 0000000..524556a
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ConsumerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.KafkaService.Configs
+{
+ public class JT809ConsumerConfig: ConsumerConfig, IOptions
+ {
+ public string TopicName { get; set; }
+
+ public JT809ConsumerConfig Value => this;
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs
new file mode 100644
index 0000000..0d994d5
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/Configs/JT809ProducerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.KafkaService.Configs
+{
+ public class JT809ProducerConfig : ProducerConfig,IOptions
+ {
+ public string TopicName { get; set; }
+
+ public JT809ProducerConfig Value => this;
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs
index 8672977..236a5c8 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs
@@ -1,5 +1,5 @@
using Confluent.Kafka;
-using JT809.PubSub.Abstractions;
+using JT809.KafkaService.Configs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
@@ -16,21 +16,20 @@ namespace JT809.KafkaService
protected ILogger logger { get; }
- protected override IList> Consumers { get; }
+ protected override IConsumer Consumer { get; }
protected JT809Consumer(
- IOptions consumerConfigAccessor,
+ IOptions consumerConfigAccessor,
ILoggerFactory loggerFactory)
: base(consumerConfigAccessor.Value)
{
logger = loggerFactory.CreateLogger("JT809Consumer");
- Consumers = new List>();
ConsumerBuilder consumerBuilder = new ConsumerBuilder(ConsumerConfig);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
- Consumers.Add(consumerBuilder.Build());
+ Consumer = consumerBuilder.Build();
}
public override void OnMessage(Action<(string MsgId, T Data)> callback)
@@ -44,7 +43,7 @@ namespace JT809.KafkaService
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
- var data = Consumers[0].Consume(Cts.Token);
+ var data = Consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Message.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
@@ -53,13 +52,11 @@ namespace JT809.KafkaService
}
catch (ConsumeException ex)
{
-#warning topicname
- logger.LogError(ex, "");
+ logger.LogError(ex, ConsumerConfig.TopicName);
}
catch (Exception ex)
{
-#warning topicname
- logger.LogError(ex, "");
+ logger.LogError(ex, ConsumerConfig.TopicName);
}
}
}, Cts.Token);
@@ -69,14 +66,13 @@ namespace JT809.KafkaService
{
if (_disposed) return;
//仅有一个分区才需要订阅
-#warning topicname
- Consumers[0].Subscribe("");
+ Consumer.Subscribe(ConsumerConfig.TopicName);
}
public override void Unsubscribe()
{
if (_disposed) return;
- Consumers[0].Unsubscribe();
+ Consumer.Unsubscribe();
}
public override void Dispose()
@@ -97,8 +93,8 @@ namespace JT809.KafkaService
if (disposing)
{
Cts.Cancel();
- Consumers[0].Close();
- Consumers[0].Dispose();
+ Consumer.Close();
+ Consumer.Dispose();
Cts.Dispose();
}
_disposed = true;
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs
index 4dc81e3..5e64cea 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs
@@ -1,4 +1,5 @@
using Confluent.Kafka;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -11,15 +12,15 @@ namespace JT809.KafkaService
{
public abstract class JT809ConsumerBase : IJT808ConsumerOfT
{
- public ConsumerConfig ConsumerConfig { get; }
+ public JT809ConsumerConfig ConsumerConfig { get; }
- protected JT809ConsumerBase( ConsumerConfig config)
+ protected JT809ConsumerBase(IOptions config)
{
- ConsumerConfig = config;
+ ConsumerConfig = config.Value;
}
public abstract CancellationTokenSource Cts { get; }
- protected abstract IList> Consumers { get; }
+ protected abstract IConsumer Consumer { get; }
public abstract void Dispose();
public abstract void OnMessage(Action<(string MsgId, T Data)> callback);
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
index 068f104..7268102 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
@@ -1,5 +1,6 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
@@ -25,7 +26,7 @@ namespace JT809.KafkaService
protected override IProducer Producer { get; }
protected JT809Producer(
- IOptions producerConfigAccessor)
+ IOptions producerConfigAccessor)
: base(producerConfigAccessor.Value)
{
Producer = CreateProducer();
@@ -51,8 +52,7 @@ namespace JT809.KafkaService
public override async void ProduceAsync(string msgId, string vno_color, T data)
{
if (_disposed) return;
-#warning topicname
- await Producer.ProduceAsync("", new Message
+ await Producer.ProduceAsync(ProducerConfig.TopicName, new Message
{
Key = msgId,
Value = data
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs
index 01f32ba..afe9bba 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs
@@ -1,5 +1,7 @@
using Confluent.Kafka;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
@@ -8,12 +10,12 @@ namespace JT809.KafkaService
{
public abstract class JT809ProducerBase : IJT809ProducerOfT
{
- protected JT809ProducerBase(ProducerConfig config)
+ protected JT809ProducerBase(IOptions config)
{
- ProducerConfig = config;
+ ProducerConfig = config.Value;
}
- public ProducerConfig ProducerConfig { get;}
+ public JT809ProducerConfig ProducerConfig { get;}
protected abstract IProducer Producer { get;}
public abstract void Dispose();
public abstract void ProduceAsync(string msgId, string vno_color, T data);
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
index 59e9e2c..ca3d6b7 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Google.Protobuf;
using JT809.GrpcProtos;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
@@ -8,7 +9,7 @@ namespace JT809.KafkaService
{
public sealed class JT809_GpsPositio_Producer : JT809Producer
{
- public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) : base( producerConfigAccessor)
+ public JT809_GpsPositio_Producer(IOptions producerConfigAccessor) : base( producerConfigAccessor)
{
}
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
index f01e274..1e55c76 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Google.Protobuf;
using JT809.GrpcProtos;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -13,7 +14,7 @@ namespace JT809.KafkaService
{
public sealed class JT809_GpsPosition_Consumer : JT809Consumer
{
- public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory)
+ public JT809_GpsPosition_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) : base( consumerConfigAccessor, loggerFactory)
{
}
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs
index 83ba2ad..956eb73 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs
@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Google.Protobuf;
using JT809.GrpcProtos;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
@@ -13,7 +14,7 @@ namespace JT809.KafkaService
{
public sealed class JT809_Same_Consumer : JT809Consumer
{
- public JT809_Same_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory)
+ public JT809_Same_Consumer(IOptions consumerConfigAccessor, ILoggerFactory loggerFactory)
: base(consumerConfigAccessor, loggerFactory)
{
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs
index 38ccc4e..50a58d2 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs
@@ -2,6 +2,7 @@
using System.Collections.Generic;
using System.Text;
using Confluent.Kafka;
+using JT809.KafkaService.Configs;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
@@ -9,7 +10,7 @@ namespace JT809.KafkaService
{
public sealed class JT809_Same_Producer : JT809Producer
{
- public JT809_Same_Producer(IOptions producerConfigAccessor)
+ public JT809_Same_Producer(IOptions producerConfigAccessor)
: base(producerConfigAccessor)
{
}
diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj
index ba25158..c5c9629 100644
--- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj
+++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/JT809.DotNetty.Host.Test.csproj
@@ -6,7 +6,7 @@
7.3
-
+