diff --git a/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj b/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
index a7f106d..f548503 100644
--- a/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
+++ b/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj b/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
index 7f2f663..bb6a15e 100644
--- a/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
+++ b/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
index 856af8e..67163d6 100644
--- a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
+++ b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
@@ -8,10 +8,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj b/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
index dd5684e..c1849b6 100644
--- a/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
+++ b/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
@@ -8,10 +8,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj b/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
index 8392a19..ba7c35d 100644
--- a/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
+++ b/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
@@ -14,10 +14,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
index b926db2..99a292d 100644
--- a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
+++ b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
@@ -12,10 +12,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Simples.sln b/simples/JT808.Simples.sln
index 0c9350b..fb23759 100644
--- a/simples/JT808.Simples.sln
+++ b/simples/JT808.Simples.sln
@@ -17,7 +17,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueSe
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueService", "JT808.Gateway.SimpleQueueService\JT808.Gateway.SimpleQueueService.csproj", "{E2D1CFEF-417A-4C44-BC2E-E5A160602485}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.SimpleQueueNotification", "JT808.Gateway.SimpleQueueNotification\JT808.Gateway.SimpleQueueNotification.csproj", "{163D2EE2-9A62-4E8A-B203-BF147909E89A}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueNotification", "JT808.Gateway.SimpleQueueNotification\JT808.Gateway.SimpleQueueNotification.csproj", "{163D2EE2-9A62-4E8A-B203-BF147909E89A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
index 4dbce74..e144cb4 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgConsumer : IJT808MsgConsumer
+ public sealed class JT808MsgConsumer : IJT808MsgConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -61,21 +63,37 @@ namespace JT808.DotNetty.Kafka
}
}, Cts.Token);
}
-
public void Subscribe()
{
consumer.Subscribe(TopicName);
}
-
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
+ }
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgConsumer()
+ {
+ Dispose(false);
}
-
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
index 573822e..14a9d78 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgProducer : IJT808MsgProducer
+ public sealed class JT808MsgProducer : IJT808MsgProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -20,18 +21,34 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
index e3aea34..8fdefa3 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+ public sealed class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -69,13 +71,32 @@ namespace JT808.DotNetty.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
index fab6275..bf938bc 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+ public sealed class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
+ private bool disposed = false;
public string TopicName { get;}
private IProducer producer;
@@ -20,18 +21,35 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
index 587976a..04fb585 100644
--- a/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808SessionConsumer : IJT808SessionConsumer
+ public sealed class JT808SessionConsumer : IJT808SessionConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -69,13 +71,32 @@ namespace JT808.DotNetty.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
index dd781b6..1eba8f8 100644
--- a/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808SessionProducer : IJT808SessionProducer
+ public sealed class JT808SessionProducer : IJT808SessionProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -20,18 +21,35 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string notice,string terminalNo)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = notice,
Value = terminalNo
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
index 2de8f75..b5747f4 100644
--- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
+++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
@@ -30,13 +30,13 @@
-
-
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
index 062bbfd..953e77b 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
@@ -12,18 +12,18 @@
-
-
-
-
-
+
+
+
+
+
-
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
index ede005b..184c555 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
@@ -6,10 +6,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
index ed71a38..a0b9abb 100644
--- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
+++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
@@ -22,12 +22,12 @@
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
index 1d8762d..bad5ee3 100644
--- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
+++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
@@ -20,12 +20,12 @@
$(JT808GatewayPackageVersion)
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
index 7fd4496..62a696f 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgConsumer : IJT808MsgConsumer
+ public sealed class JT808MsgConsumer : IJT808MsgConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -62,21 +64,37 @@ namespace JT808.Gateway.Kafka
}
}, Cts.Token);
}
-
public void Subscribe()
{
consumer.Subscribe(TopicName);
}
-
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
+ }
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgConsumer()
+ {
+ Dispose(false);
}
-
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
index e644927..1e22bc2 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgProducer : IJT808MsgProducer
+ public sealed class JT808MsgProducer : IJT808MsgProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -21,18 +22,34 @@ namespace JT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
index deb9958..7bb7a59 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+ public sealed class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -70,13 +72,32 @@ namespace JT808.Gateway.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
index a609273..adcacb3 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JJT808.Gateway.Kafka
{
- public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+ public sealed class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
+ private bool disposed = false;
public string TopicName { get;}
private IProducer producer;
@@ -21,18 +22,35 @@ namespace JJT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
index 1df01d3..92a4424 100644
--- a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808SessionConsumer : IJT808SessionConsumer
+ public sealed class JT808SessionConsumer : IJT808SessionConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -70,13 +72,32 @@ namespace JT808.Gateway.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
index 9049a0d..1ce0675 100644
--- a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808SessionProducer : IJT808SessionProducer
+ public sealed class JT808SessionProducer : IJT808SessionProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -21,18 +22,35 @@ namespace JT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string notice,string terminalNo)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = notice,
Value = terminalNo
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
index 8bb463a..3ed4582 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
index 97ebd8d..2997850 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
@@ -21,7 +21,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
index c85def7..bfe4066 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
index ba8141a..e8334de 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
index 5c73175..714c20d 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
index 72f720f..73cec5e 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
@@ -6,10 +6,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
index ec4daa0..6f7658a 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
@@ -7,15 +7,15 @@
-
-
-
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj
index 67910d5..d3b4e08 100644
--- a/src/JT808.Gateway/JT808.Gateway.csproj
+++ b/src/JT808.Gateway/JT808.Gateway.csproj
@@ -21,9 +21,9 @@
-
-
-
+
+
+