diff --git a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
index de3eb76..4774af9 100644
--- a/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
+++ b/src/JT808.Gateway.Abstractions/JT808.Gateway.Abstractions.csproj
@@ -6,6 +6,10 @@
SmallChi(Koike)
false
false
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
LICENSE
true
基于Pipeline实现的JT808Gateway的抽象库
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
new file mode 100644
index 0000000..9d2aa55
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808ConsumerConfig: ConsumerConfig, IOptions
+ {
+ public string TopicName { get; set; }
+
+ public JT808ConsumerConfig Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
new file mode 100644
index 0000000..53746e2
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions
+ {
+ JT808MsgConsumerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
new file mode 100644
index 0000000..d55b2ce
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions
+ {
+ JT808MsgProducerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
new file mode 100644
index 0000000..79f4c3c
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions
+ {
+ JT808MsgReplyConsumerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
new file mode 100644
index 0000000..9b62e6e
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions
+ {
+ JT808MsgReplyProducerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
new file mode 100644
index 0000000..fca47cd
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808ProducerConfig : ProducerConfig,IOptions
+ {
+ public string TopicName { get; set; }
+
+ public JT808ProducerConfig Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
new file mode 100644
index 0000000..6b57a0a
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions
+ {
+ JT808SessionConsumerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
new file mode 100644
index 0000000..bba6871
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+ public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions
+ {
+ JT808SessionProducerConfig IOptions.Value => this;
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
new file mode 100644
index 0000000..da06104
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
@@ -0,0 +1,39 @@
+
+
+
+ netstandard2.1
+ 8.0
+ Copyright 2019.
+ SmallChi(Koike)
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
+ false
+ 1.0.0-preview2
+ false
+ LICENSE
+ true
+ JT808.Gateway.Kafka
+ JT808.Gateway.Kafka
+ 基于Kafka的JT808消息发布与订阅
+ 基于Kafka的JT808消息发布与订阅
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
new file mode 100644
index 0000000..d34e239
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
@@ -0,0 +1,25 @@
+using JT808.Gateway.Abstractions;
+using JT808.Protocol;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Kafka
+{
+ internal class JT808ClientBuilderDefault : IJT808ClientBuilder
+ {
+ public IJT808Builder JT808Builder { get; }
+
+ public JT808ClientBuilderDefault(IJT808Builder builder)
+ {
+ JT808Builder = builder;
+ }
+
+ public IJT808Builder Builder()
+ {
+ return JT808Builder;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
new file mode 100644
index 0000000..0875b36
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
@@ -0,0 +1,66 @@
+using JJT808.Gateway.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace JT808.Gateway.Kafka
+{
+ public static class JT808ClientKafkaExtensions
+ {
+ public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder)
+ {
+ return new JT808ClientBuilderDefault(builder);
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808MsgConsumerConfig")
+ ///
+ public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+ {
+ jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgConsumerConfig"));
+ jT808ClientBuilder.JT808Builder.Services.TryAddSingleton();
+ return jT808ClientBuilder;
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808MsgReplyProducerConfig")
+ ///
+ public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+ {
+ jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyProducerConfig"));
+ jT808ClientBuilder.JT808Builder.Services.TryAddSingleton();
+ return jT808ClientBuilder;
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808MsgReplyConsumerConfig")
+ ///
+ public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+ {
+ jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig"));
+ jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
+ return jT808ClientBuilder;
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808SessionConsumerConfig")
+ ///
+ public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+ {
+ jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionConsumerConfig"));
+ jT808ClientBuilder.JT808Builder.Services.TryAddSingleton();
+ return jT808ClientBuilder;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
new file mode 100644
index 0000000..da9d71e
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+ public class JT808MsgConsumer : IJT808MsgConsumer
+ {
+ public CancellationTokenSource Cts => new CancellationTokenSource();
+
+ private readonly IConsumer consumer;
+
+ private readonly ILogger logger;
+
+ public string TopicName { get; }
+
+ public JT808MsgConsumer(
+ IOptions consumerConfigAccessor,
+ ILoggerFactory loggerFactory)
+ {
+ consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build();
+ TopicName = consumerConfigAccessor.Value.TopicName;
+ logger = loggerFactory.CreateLogger("JT808MsgConsumer");
+ }
+
+ public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
+ {
+ Task.Run(() =>
+ {
+ while (!Cts.IsCancellationRequested)
+ {
+ try
+ {
+ //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+ //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+ var data = consumer.Consume(Cts.Token);
+ if (logger.IsEnabled(LogLevel.Debug))
+ {
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ }
+ callback((data.Key, data.Value));
+ }
+ catch (ConsumeException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (OperationCanceledException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ }
+ }, Cts.Token);
+ }
+
+ public void Subscribe()
+ {
+ consumer.Subscribe(TopicName);
+ }
+
+ public void Unsubscribe()
+ {
+ consumer.Unsubscribe();
+ }
+
+ public void Dispose()
+ {
+ consumer.Close();
+ consumer.Dispose();
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
new file mode 100644
index 0000000..e644927
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+ public class JT808MsgProducer : IJT808MsgProducer
+ {
+ public string TopicName { get; }
+
+ private readonly IProducer producer;
+ public JT808MsgProducer(
+ IOptions producerConfigAccessor)
+ {
+ producer = new ProducerBuilder(producerConfigAccessor.Value).Build();
+ TopicName = producerConfigAccessor.Value.TopicName;
+ }
+
+ public void Dispose()
+ {
+ producer.Dispose();
+ }
+
+ public async ValueTask ProduceAsync(string terminalNo, byte[] data)
+ {
+ await producer.ProduceAsync(TopicName, new Message
+ {
+ Key = terminalNo,
+ Value = data
+ });
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
new file mode 100644
index 0000000..340fd30
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+ public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+ {
+ public CancellationTokenSource Cts => new CancellationTokenSource();
+
+ private readonly IConsumer consumer;
+
+ private readonly ILogger logger;
+
+ public string TopicName { get; }
+
+ public JT808MsgReplyConsumer(
+ IOptions consumerConfigAccessor,
+ ILoggerFactory loggerFactory)
+ {
+ consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build();
+ TopicName = consumerConfigAccessor.Value.TopicName;
+ logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer");
+ }
+
+ public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
+ {
+ Task.Run(() =>
+ {
+ while (!Cts.IsCancellationRequested)
+ {
+ try
+ {
+ //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+ //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+ var data = consumer.Consume(Cts.Token);
+ if (logger.IsEnabled(LogLevel.Debug))
+ {
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ }
+ callback((data.Key, data.Value));
+ }
+ catch (ConsumeException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (OperationCanceledException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ }
+ }, Cts.Token);
+ }
+
+ public void Subscribe()
+ {
+ consumer.Subscribe(TopicName);
+ }
+
+ public void Unsubscribe()
+ {
+ consumer.Unsubscribe();
+ }
+
+ public void Dispose()
+ {
+ consumer.Close();
+ consumer.Dispose();
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
new file mode 100644
index 0000000..a609273
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JJT808.Gateway.Kafka
+{
+ public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+ {
+ public string TopicName { get;}
+
+ private IProducer producer;
+ public JT808MsgReplyProducer(
+ IOptions producerConfigAccessor)
+ {
+ producer = new ProducerBuilder(producerConfigAccessor.Value).Build();
+ TopicName = producerConfigAccessor.Value.TopicName;
+ }
+
+ public void Dispose()
+ {
+ producer.Dispose();
+ }
+
+ public async ValueTask ProduceAsync(string terminalNo, byte[] data)
+ {
+ await producer.ProduceAsync(TopicName, new Message
+ {
+ Key = terminalNo,
+ Value = data
+ });
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
new file mode 100644
index 0000000..e4e493b
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
@@ -0,0 +1,48 @@
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace JT808.Gateway.Kafka
+{
+ public static class JT808ServerKafkaExtensions
+ {
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808MsgProducerConfig")
+ ///
+ public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+ {
+ jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgProducerConfig"));
+ jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton));
+ return jT808GatewayBuilder;
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808MsgReplyConsumerConfig")
+ ///
+ public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+ {
+ jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig"));
+ jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
+ return jT808GatewayBuilder;
+ }
+ ///
+ ///
+ ///
+ ///
+ /// GetSection("JT808SessionProducerConfig")
+ ///
+ public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+ {
+ jT808GatewayBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionProducerConfig"));
+ jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton));
+ return jT808GatewayBuilder;
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
new file mode 100644
index 0000000..83b4d31
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+ public class JT808SessionConsumer : IJT808SessionConsumer
+ {
+ public CancellationTokenSource Cts => new CancellationTokenSource();
+
+ private readonly IConsumer consumer;
+
+ private readonly ILogger logger;
+
+ public string TopicName { get; }
+
+ public JT808SessionConsumer(
+ IOptions consumerConfigAccessor,
+ ILoggerFactory loggerFactory)
+ {
+ consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build();
+ TopicName = consumerConfigAccessor.Value.TopicName;
+ logger = loggerFactory.CreateLogger("JT808SessionConsumer");
+ }
+
+ public void OnMessage(Action<(string Notice, string TerminalNo)> callback)
+ {
+ Task.Run(() =>
+ {
+ while (!Cts.IsCancellationRequested)
+ {
+ try
+ {
+ //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+ //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+ var data = consumer.Consume(Cts.Token);
+ if (logger.IsEnabled(LogLevel.Debug))
+ {
+ logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+ }
+ callback((data.Key, data.Value));
+ }
+ catch (ConsumeException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (OperationCanceledException ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ catch (Exception ex)
+ {
+ logger.LogError(ex, TopicName);
+ }
+ }
+ }, Cts.Token);
+ }
+
+ public void Subscribe()
+ {
+ consumer.Subscribe(TopicName);
+ }
+
+ public void Unsubscribe()
+ {
+ consumer.Unsubscribe();
+ }
+
+ public void Dispose()
+ {
+ consumer.Close();
+ consumer.Dispose();
+ }
+ }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
new file mode 100644
index 0000000..9049a0d
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.Abstractions;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+ public class JT808SessionProducer : IJT808SessionProducer
+ {
+ public string TopicName { get; }
+
+ private readonly IProducer producer;
+ public JT808SessionProducer(
+ IOptions producerConfigAccessor)
+ {
+ producer = new ProducerBuilder(producerConfigAccessor.Value).Build();
+ TopicName = producerConfigAccessor.Value.TopicName;
+ }
+
+ public void Dispose()
+ {
+ producer.Dispose();
+ }
+
+ public async ValueTask ProduceAsync(string notice,string terminalNo)
+ {
+ await producer.ProduceAsync(TopicName, new Message
+ {
+ Key = notice,
+ Value = terminalNo
+ });
+ }
+ }
+}
diff --git a/src/JT808.Gateway.sln b/src/JT808.Gateway.sln
index 3719e74..e171f8b 100644
--- a/src/JT808.Gateway.sln
+++ b/src/JT808.Gateway.sln
@@ -11,6 +11,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Test", "JT808
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Abstractions", "JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj", "{3AA17DF7-A1B3-449C-93C2-45B051C32933}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.Kafka", "JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj", "{274C048E-A8E3-4422-A578-A10A97DF36F2}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -33,6 +35,10 @@ Global
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3AA17DF7-A1B3-449C-93C2-45B051C32933}.Release|Any CPU.Build.0 = Release|Any CPU
+ {274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {274C048E-A8E3-4422-A578-A10A97DF36F2}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {274C048E-A8E3-4422-A578-A10A97DF36F2}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/JT808.Gateway/JT808.Gateway.csproj b/src/JT808.Gateway/JT808.Gateway.csproj
index fdbfd6e..ff072a0 100644
--- a/src/JT808.Gateway/JT808.Gateway.csproj
+++ b/src/JT808.Gateway/JT808.Gateway.csproj
@@ -7,6 +7,10 @@
SmallChi(Koike)
false
false
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
+ https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE
LICENSE
true
基于Pipeline实现的JT808Gateway的网络库