From ecb34e0b378c01ec752d2f71da2f1ab66004fe33 Mon Sep 17 00:00:00 2001
From: "SmallChi(Koike)" <564952747@qq.com>
Date: Mon, 23 Dec 2019 11:23:11 +0800
Subject: [PATCH] =?UTF-8?q?=E8=BF=98=E5=8E=9Fkafka=E9=A1=B9=E7=9B=AE?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

---
 .../Configs/JT808ConsumerConfig.cs            | 15 ++++
 .../Configs/JT808MsgConsumerConfig.cs         | 13 +++
 .../Configs/JT808MsgProducerConfig.cs         | 13 +++
 .../Configs/JT808MsgReplyConsumerConfig.cs    | 13 +++
 .../Configs/JT808MsgReplyProducerConfig.cs    | 13 +++
 .../Configs/JT808ProducerConfig.cs            | 15 ++++
 .../Configs/JT808SessionConsumerConfig.cs     | 13 +++
 .../Configs/JT808SessionProducerConfig.cs     | 13 +++
 .../JT808.Gateway.Kafka.csproj                | 39 +++++++++
 .../JT808ClientBuilderDefault.cs              | 24 ++++++
 .../JT808ClientKafkaExtensions.cs             | 66 +++++++++++++++
 src/JT808.Gateway.Kafka/JT808MsgConsumer.cs   | 82 +++++++++++++++++++
 src/JT808.Gateway.Kafka/JT808MsgProducer.cs   | 38 +++++++++
 .../JT808MsgReplyConsumer.cs                  | 82 +++++++++++++++++++
 .../JT808MsgReplyProducer.cs                  | 38 +++++++++
 .../JT808ServerKafkaExtensions.cs             | 48 +++++++++++
 .../JT808SessionConsumer.cs                   | 82 +++++++++++++++++++
 .../JT808SessionProducer.cs                   | 38 +++++++++
 18 files changed, 645 insertions(+)
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
 create mode 100644 src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808MsgProducer.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
 create mode 100644 src/JT808.Gateway.Kafka/JT808SessionProducer.cs

diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
new file mode 100644
index 0000000..9d2aa55
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808ConsumerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808ConsumerConfig: ConsumerConfig, IOptions<JT808ConsumerConfig>
+    {
+        public string TopicName { get; set; }
+
+        public JT808ConsumerConfig Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
new file mode 100644
index 0000000..53746e2
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgConsumerConfig>
+    {
+        JT808MsgConsumerConfig IOptions<JT808MsgConsumerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
new file mode 100644
index 0000000..d55b2ce
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions<JT808MsgProducerConfig>
+    {
+        JT808MsgProducerConfig IOptions<JT808MsgProducerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
new file mode 100644
index 0000000..79f4c3c
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions<JT808MsgReplyConsumerConfig>
+    {
+        JT808MsgReplyConsumerConfig IOptions<JT808MsgReplyConsumerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
new file mode 100644
index 0000000..9b62e6e
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808MsgReplyProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions<JT808MsgReplyProducerConfig>
+    {
+        JT808MsgReplyProducerConfig IOptions<JT808MsgReplyProducerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
new file mode 100644
index 0000000..fca47cd
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808ProducerConfig.cs
@@ -0,0 +1,15 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808ProducerConfig : ProducerConfig,IOptions<JT808ProducerConfig>
+    {
+        public string TopicName { get; set; }
+
+        public JT808ProducerConfig Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
new file mode 100644
index 0000000..6b57a0a
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionConsumerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions<JT808SessionConsumerConfig>
+    {
+        JT808SessionConsumerConfig IOptions<JT808SessionConsumerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
new file mode 100644
index 0000000..bba6871
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/Configs/JT808SessionProducerConfig.cs
@@ -0,0 +1,13 @@
+using Confluent.Kafka;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Configs.Kafka
+{
+    public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions<JT808SessionProducerConfig>
+    {
+        JT808SessionProducerConfig IOptions<JT808SessionProducerConfig>.Value => this;
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
new file mode 100644
index 0000000..f68e24b
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808.Gateway.Kafka.csproj
@@ -0,0 +1,39 @@
+<Project Sdk="Microsoft.NET.Sdk">
+
+  <PropertyGroup>
+    <TargetFramework>netstandard2.0</TargetFramework>
+    <LangVersion>8.0</LangVersion>
+    <Copyright>Copyright 2018.</Copyright>
+    <Authors>SmallChi(Koike)</Authors>
+    <RepositoryUrl>https://github.com/SmallChi/JT808DotNetty</RepositoryUrl>
+    <PackageProjectUrl>https://github.com/SmallChi/JT808DotNetty</PackageProjectUrl>
+    <licenseUrl>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</licenseUrl>
+    <license>https://github.com/SmallChi/JT808DotNetty/blob/master/LICENSE</license>
+    <GeneratePackageOnBuild>false</GeneratePackageOnBuild>
+    <Version>1.0.0-preview1</Version>
+    <SignAssembly>false</SignAssembly>
+    <PackageLicenseFile>LICENSE</PackageLicenseFile>
+    <PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
+    <PackageId>JT808.Gateway.Kafka</PackageId>
+    <Product>JT808.Gateway.Kafka</Product>
+    <Description>基于Kafka的JT808消息发布与订阅</Description>
+    <PackageReleaseNotes>基于Kafka的JT808消息发布与订阅</PackageReleaseNotes>
+  </PropertyGroup>
+  <ItemGroup>
+    <PackageReference Include="Confluent.Kafka" Version="1.2.1" />
+    <PackageReference Include="Microsoft.Extensions.Configuration.Abstractions" Version="3.0.0" />
+    <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.0.0" />
+    <PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="3.0.0" />
+    <PackageReference Include="Microsoft.Extensions.Options" Version="3.0.0" />
+    <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.0.0" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <None Include="..\..\LICENSE" Pack="true" PackagePath="" />
+  </ItemGroup>
+
+  <ItemGroup>
+    <ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
+  </ItemGroup>
+
+</Project>
diff --git a/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
new file mode 100644
index 0000000..3279c64
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ClientBuilderDefault.cs
@@ -0,0 +1,24 @@
+using JT808.Protocol;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT808.Gateway.Kafka
+{
+    internal class JT808ClientBuilderDefault : IJT808ClientBuilder
+    {
+        public IJT808Builder JT808Builder { get; }
+
+        public JT808ClientBuilderDefault(IJT808Builder builder)
+        {
+            JT808Builder = builder;
+        }
+
+        public IJT808Builder Builder()
+        {
+            return JT808Builder;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
new file mode 100644
index 0000000..b334097
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ClientKafkaExtensions.cs
@@ -0,0 +1,66 @@
+using JJT808.Gateway.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace JT808.Gateway.Kafka
+{
+    public static class JT808ClientKafkaExtensions
+    {
+        public static IJT808ClientBuilder AddJT808ClientKafka(this IJT808Builder builder)
+        {
+            return new JT808ClientBuilderDefault(builder);
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="serviceDescriptors"></param>
+        /// <param name="configuration">GetSection("JT808MsgConsumerConfig")</param>
+        /// <returns></returns>
+        public static IJT808ClientBuilder AddMsgConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+        {
+            jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgConsumerConfig>(configuration.GetSection("JT808MsgConsumerConfig"));
+            jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgConsumer, JT808MsgConsumer>();
+            return jT808ClientBuilder;
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="serviceDescriptors"></param>
+        /// <param name="configuration">GetSection("JT808MsgReplyProducerConfig")</param>
+        /// <returns></returns>
+        public static IJT808ClientBuilder AddMsgReplyProducer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+        {
+            jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyProducerConfig>(configuration.GetSection("JT808MsgReplyProducerConfig"));
+            jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyProducer, JT808MsgReplyProducer>();
+            return jT808ClientBuilder;
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="jT808NettyBuilder"></param>
+        /// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param>
+        /// <returns></returns>
+        public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+        {
+            jT808ClientBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig"));
+            jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
+            return jT808ClientBuilder;
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="serviceDescriptors"></param>
+        /// <param name="configuration">GetSection("JT808SessionConsumerConfig")</param>
+        /// <returns></returns>
+        public static IJT808ClientBuilder AddSessionConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+        {
+            jT808ClientBuilder.JT808Builder.Services.Configure<JT808SessionConsumerConfig>(configuration.GetSection("JT808SessionConsumerConfig"));
+            jT808ClientBuilder.JT808Builder.Services.TryAddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
+            return jT808ClientBuilder;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
new file mode 100644
index 0000000..56818f2
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+    public class JT808MsgConsumer : IJT808MsgConsumer
+    {
+        public CancellationTokenSource Cts => new CancellationTokenSource();
+
+        private readonly IConsumer<string, byte[]> consumer;
+
+        private readonly ILogger logger;
+
+        public string TopicName { get; }
+
+        public JT808MsgConsumer(
+            IOptions<JT808MsgConsumerConfig> consumerConfigAccessor,
+            ILoggerFactory loggerFactory)
+        {
+            consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build();
+            TopicName = consumerConfigAccessor.Value.TopicName;
+            logger = loggerFactory.CreateLogger("JT808MsgConsumer");
+        }
+
+        public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
+        {
+            Task.Run(() =>
+            {
+                while (!Cts.IsCancellationRequested)
+                {
+                    try
+                    {
+                        //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+                        //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+                        var data = consumer.Consume(Cts.Token);
+                        if (logger.IsEnabled(LogLevel.Debug))
+                        {
+                            logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+                        }
+                        callback((data.Key, data.Value));
+                    }
+                    catch (ConsumeException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (OperationCanceledException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                }
+            }, Cts.Token);
+        }
+
+        public void Subscribe()
+        {
+            consumer.Subscribe(TopicName);
+        }
+
+        public void Unsubscribe()
+        {
+            consumer.Unsubscribe();
+        }
+
+        public void Dispose()
+        {
+            consumer.Close();
+            consumer.Dispose();
+        }
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
new file mode 100644
index 0000000..67d6d1b
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+    public class JT808MsgProducer : IJT808MsgProducer
+    {
+        public string TopicName { get; }
+
+        private readonly IProducer<string, byte[]> producer;
+        public JT808MsgProducer(
+          IOptions<JT808MsgProducerConfig> producerConfigAccessor)
+        {
+            producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
+            TopicName = producerConfigAccessor.Value.TopicName;
+        }
+
+        public void Dispose()
+        {
+            producer.Dispose();
+        }
+
+        public async Task ProduceAsync(string terminalNo, byte[] data)
+        {
+            await producer.ProduceAsync(TopicName, new Message<string, byte[]>
+            {
+                Key = terminalNo,
+                Value = data
+            });
+        }
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
new file mode 100644
index 0000000..004a391
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+    public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer
+    {
+        public CancellationTokenSource Cts => new CancellationTokenSource();
+
+        private readonly IConsumer<string, byte[]> consumer;
+
+        private readonly ILogger logger;
+
+        public string TopicName { get; }
+
+        public JT808MsgReplyConsumer(
+            IOptions<JT808MsgReplyConsumerConfig> consumerConfigAccessor,
+            ILoggerFactory loggerFactory)
+        {
+            consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build();
+            TopicName = consumerConfigAccessor.Value.TopicName;
+            logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer");
+        }
+
+        public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback)
+        {
+            Task.Run(() =>
+            {
+                while (!Cts.IsCancellationRequested)
+                {
+                    try
+                    {
+                        //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+                        //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+                        var data = consumer.Consume(Cts.Token);
+                        if (logger.IsEnabled(LogLevel.Debug))
+                        {
+                            logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+                        }
+                        callback((data.Key, data.Value));
+                    }
+                    catch (ConsumeException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (OperationCanceledException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                }
+            }, Cts.Token);
+        }
+
+        public void Subscribe()
+        {
+            consumer.Subscribe(TopicName);
+        }
+
+        public void Unsubscribe()
+        {
+            consumer.Unsubscribe();
+        }
+
+        public void Dispose()
+        {
+            consumer.Close();
+            consumer.Dispose();
+        }
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
new file mode 100644
index 0000000..f29e9be
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808MsgReplyProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JJT808.Gateway.Kafka
+{
+    public class JT808MsgReplyProducer : IJT808MsgReplyProducer
+    {
+        public string TopicName { get;}
+
+        private IProducer<string, byte[]> producer;
+        public JT808MsgReplyProducer(
+          IOptions<JT808MsgReplyProducerConfig> producerConfigAccessor)
+        {
+            producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build();
+            TopicName = producerConfigAccessor.Value.TopicName;
+        }
+
+        public void Dispose()
+        {
+            producer.Dispose();
+        }
+
+        public async Task ProduceAsync(string terminalNo, byte[] data)
+        {
+            await producer.ProduceAsync(TopicName, new Message<string, byte[]>
+            {
+                Key = terminalNo,
+                Value = data
+            });
+        }
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
new file mode 100644
index 0000000..e8e1dc1
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808ServerKafkaExtensions.cs
@@ -0,0 +1,48 @@
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+
+namespace JT808.Gateway.Kafka
+{
+    public static class JT808ServerKafkaExtensions
+    {
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="jT808NettyBuilder"></param>
+        /// <param name="configuration">GetSection("JT808MsgProducerConfig")</param>
+        /// <returns></returns>
+        public static IJT808GatewayBuilder AddJT808ServerKafkaMsgProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+        {
+            jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgProducerConfig>(configuration.GetSection("JT808MsgProducerConfig"));
+            jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton));
+            return jT808GatewayBuilder;
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="jT808NettyBuilder"></param>
+        /// <param name="configuration">GetSection("JT808MsgReplyConsumerConfig")</param>
+        /// <returns></returns>
+        public static IJT808GatewayBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+        {
+            jT808GatewayBuilder.JT808Builder.Services.Configure<JT808MsgReplyConsumerConfig>(configuration.GetSection("JT808MsgReplyConsumerConfig"));
+            jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
+            return jT808GatewayBuilder;
+        }
+        /// <summary>
+        /// 
+        /// </summary>
+        /// <param name="jT808NettyBuilder"></param>
+        /// <param name="configuration">GetSection("JT808SessionProducerConfig")</param>
+        /// <returns></returns>
+        public static IJT808GatewayBuilder AddJT808ServerKafkaSessionProducer(this IJT808GatewayBuilder jT808GatewayBuilder, IConfiguration configuration)
+        {
+            jT808GatewayBuilder.JT808Builder.Services.Configure<JT808SessionProducerConfig>(configuration.GetSection("JT808SessionProducerConfig"));
+            jT808GatewayBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionProducer), typeof(JT808SessionProducer), ServiceLifetime.Singleton));
+            return jT808GatewayBuilder;
+        }
+    }
+}
\ No newline at end of file
diff --git a/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
new file mode 100644
index 0000000..9ccf830
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808SessionConsumer.cs
@@ -0,0 +1,82 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+    public class JT808SessionConsumer : IJT808SessionConsumer
+    {
+        public CancellationTokenSource Cts => new CancellationTokenSource();
+
+        private readonly IConsumer<string, string> consumer;
+
+        private readonly ILogger logger;
+
+        public string TopicName { get; }
+
+        public JT808SessionConsumer(
+            IOptions<JT808SessionConsumerConfig> consumerConfigAccessor,
+            ILoggerFactory loggerFactory)
+        {
+            consumer = new ConsumerBuilder<string, string>(consumerConfigAccessor.Value).Build();
+            TopicName = consumerConfigAccessor.Value.TopicName;
+            logger = loggerFactory.CreateLogger("JT808SessionConsumer");
+        }
+
+        public void OnMessage(Action<(string Notice, string TerminalNo)> callback)
+        {
+            Task.Run(() =>
+            {
+                while (!Cts.IsCancellationRequested)
+                {
+                    try
+                    {
+                        //如果不指定分区,根据kafka的机制会从多个分区中拉取数据
+                        //如果指定分区,根据kafka的机制会从相应的分区中拉取数据
+                        var data = consumer.Consume(Cts.Token);
+                        if (logger.IsEnabled(LogLevel.Debug))
+                        {
+                            logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}");
+                        }
+                        callback((data.Key, data.Value));
+                    }
+                    catch (ConsumeException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (OperationCanceledException ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                    catch (Exception ex)
+                    {
+                        logger.LogError(ex, TopicName);
+                    }
+                }
+            }, Cts.Token);
+        }
+
+        public void Subscribe()
+        {
+            consumer.Subscribe(TopicName);
+        }
+
+        public void Unsubscribe()
+        {
+            consumer.Unsubscribe();
+        }
+
+        public void Dispose()
+        {
+            consumer.Close();
+            consumer.Dispose();
+        }
+    }
+}
diff --git a/src/JT808.Gateway.Kafka/JT808SessionProducer.cs b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
new file mode 100644
index 0000000..3b6494f
--- /dev/null
+++ b/src/JT808.Gateway.Kafka/JT808SessionProducer.cs
@@ -0,0 +1,38 @@
+using Confluent.Kafka;
+using JT808.Gateway.Configs.Kafka;
+using JT808.Gateway.PubSub;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace JT808.Gateway.Kafka
+{
+    public class JT808SessionProducer : IJT808SessionProducer
+    {
+        public string TopicName { get; }
+
+        private readonly IProducer<string, string> producer;
+        public JT808SessionProducer(
+          IOptions<JT808SessionProducerConfig> producerConfigAccessor)
+        {
+            producer = new ProducerBuilder<string, string>(producerConfigAccessor.Value).Build();
+            TopicName = producerConfigAccessor.Value.TopicName;
+        }
+
+        public void Dispose()
+        {
+            producer.Dispose();
+        }
+
+        public async Task ProduceAsync(string notice,string terminalNo)
+        {
+            await producer.ProduceAsync(TopicName, new Message<string, string>
+            {
+                Key = notice,
+                Value = terminalNo
+            });
+        }
+    }
+}