From 72acdaedf986e40ea1bc79432676c585e70bf4d7 Mon Sep 17 00:00:00 2001
From: "SmallChi(Koike)" <564952747@qq.com>
Date: Tue, 21 Apr 2020 22:40:12 +0800
Subject: [PATCH] =?UTF-8?q?1.=E4=BF=AE=E5=A4=8Dnetty=E5=92=8Cpipeline?=
=?UTF-8?q?=E6=9C=8D=E5=8A=A1=E9=87=8D=E6=96=B0=E5=90=AF=E5=8A=A8=E6=97=B6?=
=?UTF-8?q?=E5=80=99=E6=B2=A1=E6=9C=89=E4=BC=98=E9=9B=85=E7=9A=84=E9=87=8A?=
=?UTF-8?q?=E6=94=BE=E5=AF=BC=E8=87=B4=E6=97=A5=E5=BF=97=E6=98=BE=E7=A4=BA?=
=?UTF-8?q?=E6=8A=A5=E9=94=99=E9=97=AE=E9=A2=98=202.=E5=8D=87=E7=BA=A7?=
=?UTF-8?q?=E7=9B=B8=E5=BA=94=E5=BC=95=E7=94=A8=E5=BA=93?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../JT808.DotNetty.SimpleClient.csproj | 8 ++---
.../JT808.DotNetty.SimpleServer.csproj | 8 ++---
.../JT808.Gateway.SimpleClient.csproj | 8 ++---
.../JT808.Gateway.SimpleQueueServer.csproj | 8 ++---
.../JT808.Gateway.SimpleQueueService.csproj | 8 ++---
.../JT808.Gateway.SimpleServer.csproj | 8 ++---
simples/JT808.Simples.sln | 2 +-
src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs | 34 ++++++++++++++-----
src/JT808.DotNetty.Kafka/JT808MsgProducer.cs | 29 ++++++++++++----
.../JT808MsgReplyConsumer.cs | 31 ++++++++++++++---
.../JT808MsgReplyProducer.cs | 30 ++++++++++++----
.../JT808SessionConsumer.cs | 31 ++++++++++++++---
.../JT808SessionProducer.cs | 30 ++++++++++++----
.../JT808.Gateway.Abstractions.csproj | 8 ++---
.../JT808.Gateway.CleintBenchmark.csproj | 12 +++----
.../JT808.Gateway.ServerBenchmark.csproj | 8 ++---
.../JT808.Gateway.Client.csproj | 12 +++----
.../JT808.Gateway.Kafka.csproj | 12 +++----
src/JT808.Gateway.Kafka/JT808MsgConsumer.cs | 34 ++++++++++++++-----
src/JT808.Gateway.Kafka/JT808MsgProducer.cs | 29 ++++++++++++----
.../JT808MsgReplyConsumer.cs | 31 ++++++++++++++---
.../JT808MsgReplyProducer.cs | 30 ++++++++++++----
.../JT808SessionConsumer.cs | 31 ++++++++++++++---
.../JT808SessionProducer.cs | 30 ++++++++++++----
.../JT808.Gateway.MsgLogging.csproj | 2 +-
.../JT808.Gateway.ReplyMessage.csproj | 2 +-
.../JT808.Gateway.SessionNotice.csproj | 2 +-
.../JT808.Gateway.Transmit.csproj | 2 +-
.../JT808.Gateway.NormalHosting.csproj | 8 ++---
.../JT808.Gateway.QueueHosting.csproj | 8 ++---
.../JT808.Gateway.Test.csproj | 8 ++---
src/JT808.Gateway/JT808.Gateway.csproj | 6 ++--
32 files changed, 368 insertions(+), 142 deletions(-)
diff --git a/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj b/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
index a7f106d..f548503 100644
--- a/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
+++ b/simples/JT808.DotNetty.SimpleClient/JT808.DotNetty.SimpleClient.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj b/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
index 7f2f663..bb6a15e 100644
--- a/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
+++ b/simples/JT808.DotNetty.SimpleServer/JT808.DotNetty.SimpleServer.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
index 856af8e..67163d6 100644
--- a/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
+++ b/simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
@@ -8,10 +8,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj b/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
index dd5684e..c1849b6 100644
--- a/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
+++ b/simples/JT808.Gateway.SimpleQueueServer/JT808.Gateway.SimpleQueueServer.csproj
@@ -8,10 +8,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj b/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
index 8392a19..ba7c35d 100644
--- a/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
+++ b/simples/JT808.Gateway.SimpleQueueService/JT808.Gateway.SimpleQueueService.csproj
@@ -14,10 +14,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
index b926db2..99a292d 100644
--- a/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
+++ b/simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
@@ -12,10 +12,10 @@
-
-
-
-
+
+
+
+
diff --git a/simples/JT808.Simples.sln b/simples/JT808.Simples.sln
index 0c9350b..fb23759 100644
--- a/simples/JT808.Simples.sln
+++ b/simples/JT808.Simples.sln
@@ -17,7 +17,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueSe
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueService", "JT808.Gateway.SimpleQueueService\JT808.Gateway.SimpleQueueService.csproj", "{E2D1CFEF-417A-4C44-BC2E-E5A160602485}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.Gateway.SimpleQueueNotification", "JT808.Gateway.SimpleQueueNotification\JT808.Gateway.SimpleQueueNotification.csproj", "{163D2EE2-9A62-4E8A-B203-BF147909E89A}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueNotification", "JT808.Gateway.SimpleQueueNotification\JT808.Gateway.SimpleQueueNotification.csproj", "{163D2EE2-9A62-4E8A-B203-BF147909E89A}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
index 4dbce74..e144cb4 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgConsumer : IJT808MsgConsumer
+ public sealed class JT808MsgConsumer : IJT808MsgConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -61,21 +63,37 @@ namespace JT808.DotNetty.Kafka
}
}, Cts.Token);
}
-
public void Subscribe()
{
consumer.Subscribe(TopicName);
}
-
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
+ }
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgConsumer()
+ {
+ Dispose(false);
}
-
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
index 573822e..14a9d78 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgProducer : IJT808MsgProducer
+ public sealed class JT808MsgProducer : IJT808MsgProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -20,18 +21,34 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
index e3aea34..8fdefa3 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+ public sealed class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -69,13 +71,32 @@ namespace JT808.DotNetty.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
index fab6275..bf938bc 100644
--- a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+ public sealed class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
+ private bool disposed = false;
public string TopicName { get;}
private IProducer producer;
@@ -20,18 +21,35 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
index 587976a..04fb585 100644
--- a/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs
@@ -10,8 +10,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808SessionConsumer : IJT808SessionConsumer
+ public sealed class JT808SessionConsumer : IJT808SessionConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -35,6 +36,7 @@ namespace JT808.DotNetty.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -42,9 +44,9 @@ namespace JT808.DotNetty.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -69,13 +71,32 @@ namespace JT808.DotNetty.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
index dd781b6..1eba8f8 100644
--- a/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
+++ b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs
@@ -8,8 +8,9 @@ using System.Threading.Tasks;
namespace JT808.DotNetty.Kafka
{
- public class JT808SessionProducer : IJT808SessionProducer
+ public sealed class JT808SessionProducer : IJT808SessionProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -20,18 +21,35 @@ namespace JT808.DotNetty.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async Task ProduceAsync(string notice,string terminalNo)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = notice,
Value = terminalNo
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
index 2de8f75..b5747f4 100644
--- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
+++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
@@ -30,13 +30,13 @@
-
-
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
-
+
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
index 062bbfd..953e77b 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.CleintBenchmark/JT808.Gateway.CleintBenchmark.csproj
@@ -12,18 +12,18 @@
-
-
-
-
-
+
+
+
+
+
-
+
diff --git a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
index ede005b..184c555 100644
--- a/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
+++ b/src/JT808.Gateway.Benchmark/JT808.Gateway.ServerBenchmark/JT808.Gateway.ServerBenchmark.csproj
@@ -6,10 +6,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
index ed71a38..a0b9abb 100644
--- a/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
+++ b/src/JT808.Gateway.Client/JT808.Gateway.Client.csproj
@@ -22,12 +22,12 @@
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
index 1d8762d..bad5ee3 100644
--- a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
+++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
@@ -20,12 +20,12 @@
$(JT808GatewayPackageVersion)
-
-
-
-
-
-
+
+
+
+
+
+
diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
index 7fd4496..62a696f 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgConsumer : IJT808MsgConsumer
+ public sealed class JT808MsgConsumer : IJT808MsgConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -62,21 +64,37 @@ namespace JT808.Gateway.Kafka
}
}, Cts.Token);
}
-
public void Subscribe()
{
consumer.Subscribe(TopicName);
}
-
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
+ }
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgConsumer()
+ {
+ Dispose(false);
}
-
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
index e644927..1e22bc2 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgProducer : IJT808MsgProducer
+ public sealed class JT808MsgProducer : IJT808MsgProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -21,18 +22,34 @@ namespace JT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
index deb9958..7bb7a59 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+ public sealed class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -70,13 +72,32 @@ namespace JT808.Gateway.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
index a609273..adcacb3 100644
--- a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JJT808.Gateway.Kafka
{
- public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+ public sealed class JT808MsgReplyProducer : IJT808MsgReplyProducer
{
+ private bool disposed = false;
public string TopicName { get;}
private IProducer producer;
@@ -21,18 +22,35 @@ namespace JJT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = terminalNo,
Value = data
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808MsgReplyProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
index 1df01d3..92a4424 100644
--- a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
+++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
@@ -11,8 +11,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808SessionConsumer : IJT808SessionConsumer
+ public sealed class JT808SessionConsumer : IJT808SessionConsumer
{
+ private bool disposed = false;
public CancellationTokenSource Cts { get; private set; } = new CancellationTokenSource();
private readonly IConsumer consumer;
@@ -36,6 +37,7 @@ namespace JT808.Gateway.Kafka
{
while (!Cts.IsCancellationRequested)
{
+ if (disposed) return;
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
@@ -43,9 +45,9 @@ namespace JT808.Gateway.Kafka
var data = consumer.Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
- logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Message.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
- callback((data.Key, data.Value));
+ callback((data.Message.Key, data.Message.Value));
}
catch (ConsumeException ex)
{
@@ -70,13 +72,32 @@ namespace JT808.Gateway.Kafka
public void Unsubscribe()
{
+ if (disposed) return;
consumer.Unsubscribe();
+ Cts.Cancel();
}
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ consumer.Close();
+ consumer.Dispose();
+ Cts.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionConsumer()
+ {
+ Dispose(false);
+ }
public void Dispose()
{
- consumer.Close();
- consumer.Dispose();
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
}
}
}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
index 9049a0d..1ce0675 100644
--- a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
+++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
@@ -9,8 +9,9 @@ using System.Threading.Tasks;
namespace JT808.Gateway.Kafka
{
- public class JT808SessionProducer : IJT808SessionProducer
+ public sealed class JT808SessionProducer : IJT808SessionProducer
{
+ private bool disposed = false;
public string TopicName { get; }
private readonly IProducer producer;
@@ -21,18 +22,35 @@ namespace JT808.Gateway.Kafka
TopicName = producerConfigAccessor.Value.TopicName;
}
- public void Dispose()
- {
- producer.Dispose();
- }
-
public async ValueTask ProduceAsync(string notice,string terminalNo)
{
+ if (disposed) return;
await producer.ProduceAsync(TopicName, new Message
{
Key = notice,
Value = terminalNo
});
}
+
+ private void Dispose(bool disposing)
+ {
+ if (disposed) return;
+ if (disposing)
+ {
+ producer.Dispose();
+ }
+ disposed = true;
+ }
+ ~JT808SessionProducer()
+ {
+ Dispose(false);
+ }
+ public void Dispose()
+ {
+ //必须为true
+ Dispose(true);
+ //通知垃圾回收机制不再调用终结器(析构器)
+ GC.SuppressFinalize(this);
+ }
}
}
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
index 8bb463a..3ed4582 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.MsgLogging/JT808.Gateway.MsgLogging.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
index 97ebd8d..2997850 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.ReplyMessage/JT808.Gateway.ReplyMessage.csproj
@@ -21,7 +21,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
index c85def7..bfe4066 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.SessionNotice/JT808.Gateway.SessionNotice.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
index ba8141a..e8334de 100644
--- a/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
+++ b/src/JT808.Gateway.Services/JT808.Gateway.Transmit/JT808.Gateway.Transmit.csproj
@@ -22,7 +22,7 @@
LICENSE
-
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
index 5c73175..714c20d 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.NormalHosting/JT808.Gateway.NormalHosting.csproj
@@ -7,10 +7,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
index 72f720f..73cec5e 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.QueueHosting/JT808.Gateway.QueueHosting.csproj
@@ -6,10 +6,10 @@
-
-
-
-
+
+
+
+
diff --git a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
index ec4daa0..6f7658a 100644
--- a/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
+++ b/src/JT808.Gateway.Tests/JT808.Gateway.Test/JT808.Gateway.Test.csproj
@@ -7,15 +7,15 @@
-
-
-
+
+
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
-
+
all
runtime; build; native; contentfiles; analyzers; buildtransitive
diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj
index 67910d5..d3b4e08 100644
--- a/src/JT808.Gateway/JT808.Gateway.csproj
+++ b/src/JT808.Gateway/JT808.Gateway.csproj
@@ -21,9 +21,9 @@
-
-
-
+
+
+