From cc50b7d0cce71ef57848115f650cb4f021538711 Mon Sep 17 00:00:00 2001 From: smallchi <564952747@qq.com> Date: Mon, 2 Sep 2019 16:57:18 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=B0=86Builder=E7=A7=BB=E5=85=A5=E6=8A=BD?= =?UTF-8?q?=E8=B1=A1=E7=B1=BB=E4=B8=AD=202.=E4=BF=AE=E6=94=B9=E4=BC=9A?= =?UTF-8?q?=E8=AF=9D=E9=80=9A=E7=9F=A5=E4=B8=BA=E7=94=9F=E4=BA=A7=E6=B6=88?= =?UTF-8?q?=E8=B4=B9=E6=A8=A1=E5=BC=8F=203.=E5=AE=8C=E5=96=84808=E7=9A=84?= =?UTF-8?q?=E7=94=9F=E4=BA=A7=E6=B6=88=E8=B4=B9=E6=B5=8B=E8=AF=95=204.?= =?UTF-8?q?=E4=BF=AE=E6=94=B9=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 32 +++---- .../IJT808NettyBuilder.cs | 4 +- .../IJT808SessionConsumer.cs | 18 ++++ .../IJT808SessionProducer.cs | 13 +++ .../IJT808SessionPublishing.cs | 12 --- .../JT808.DotNetty.Abstractions.csproj | 4 +- .../JT808NettyConstants.cs | 2 +- .../Impls/JT808NettyBuilderDefault.cs | 22 ++--- .../Impls/JT808SessionProducerDefaultImpl.cs | 31 +++++++ .../Impls/JT808SessionPublishingEmptyImpl.cs | 23 ----- .../Interfaces/IJT808WebApiNettyBuilder.cs | 3 +- .../JT808.DotNetty.Core.csproj | 1 - .../JT808CoreDotnettyExtensions.cs | 4 +- .../Session/JT808SessionManager.cs | 20 ++--- .../{ => Configs}/JT808ConsumerConfig.cs | 0 .../Configs/JT808MsgConsumerConfig.cs | 13 +++ .../Configs/JT808MsgProducerConfig.cs | 13 +++ .../Configs/JT808MsgReplyConsumerConfig.cs | 13 +++ .../Configs/JT808MsgReplyProducerConfig.cs | 13 +++ .../{ => Configs}/JT808ProducerConfig.cs | 0 .../Configs/JT808SessionConsumerConfig.cs | 13 +++ .../Configs/JT808SessionProducerConfig.cs | 13 +++ .../JT808.DotNetty.Kafka.csproj | 3 + .../JT808ClientKafkaExtensions.cs | 47 +++++++++++ src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs | 2 +- src/JT808.DotNetty.Kafka/JT808MsgProducer.cs | 2 +- .../JT808MsgReplyConsumer.cs | 4 +- .../JT808MsgReplyProducer.cs | 4 +- .../JT808ServerKafkaExtensions.cs | 47 +++++++++++ .../JT808SessionConsumer.cs | 84 +++++++++++++++++++ .../JT808SessionProducer.cs | 37 ++++++++ .../JT808TcpDotnettyExtensions.cs | 1 + .../SeedTcpSession.cs | 2 +- .../JT808.DotNetty.Hosting.csproj | 1 + .../JT808.DotNetty.Hosting/Program.cs | 8 ++ .../JT808.DotNetty.Kafka.Test.csproj | 10 +++ .../JT808BaseTest.cs | 13 +++ .../JT808ConfigTest.cs | 54 ++++++++++++ .../JT808MsgConsumerTest.cs | 12 +-- .../JT808MsgProducerTest.cs | 13 +-- .../JT808MsgReplyConsumerTest.cs | 36 ++++++++ .../JT808MsgReplyProducerTest.cs | 59 +++++++++++++ .../JT808SessionConsumerTest.cs | 36 ++++++++ .../JT808SessionProducerTest.cs | 49 +++++++++++ .../JT808UdpDotnettyExtensions.cs | 3 +- .../JT808WebApiBuilderDefault.cs | 3 +- .../JT808WebApiDotnettyExtensions.cs | 3 +- 47 files changed, 691 insertions(+), 109 deletions(-) rename src/{JT808.DotNetty.Core/Interfaces => JT808.DotNetty.Abstractions}/IJT808NettyBuilder.cs (59%) create mode 100644 src/JT808.DotNetty.Abstractions/IJT808SessionConsumer.cs create mode 100644 src/JT808.DotNetty.Abstractions/IJT808SessionProducer.cs delete mode 100644 src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs create mode 100644 src/JT808.DotNetty.Core/Impls/JT808SessionProducerDefaultImpl.cs delete mode 100644 src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs rename src/JT808.DotNetty.Kafka/{ => Configs}/JT808ConsumerConfig.cs (100%) create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808MsgConsumerConfig.cs create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808MsgProducerConfig.cs create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyConsumerConfig.cs create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyProducerConfig.cs rename src/JT808.DotNetty.Kafka/{ => Configs}/JT808ProducerConfig.cs (100%) create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808SessionConsumerConfig.cs create mode 100644 src/JT808.DotNetty.Kafka/Configs/JT808SessionProducerConfig.cs create mode 100644 src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs create mode 100644 src/JT808.DotNetty.Kafka/JT808ServerKafkaExtensions.cs create mode 100644 src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs create mode 100644 src/JT808.DotNetty.Kafka/JT808SessionProducer.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808BaseTest.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808ConfigTest.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyConsumerTest.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyProducerTest.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionConsumerTest.cs create mode 100644 src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionProducerTest.cs diff --git a/README.md b/README.md index 9e7f42e..081f3d3 100644 --- a/README.md +++ b/README.md @@ -35,14 +35,13 @@ |接口名称|接口说明|使用场景| |:------:|:------|:------| -| IJT808SessionPublishing| 会话通知(在线/离线)| 有些超长待机的设备,不会实时保持连接,那么通过平台下发的命令是无法到达的,这时候就需要设备一上线,就即时通知服务去处理,然后在即时的下发消息到设备。| +| IJT808SessionProducer| 会话通知(在线/离线)数据生产接口| 有些超长待机的设备,不会实时保持连接,那么通过平台下发的命令是无法到达的,这时候就需要设备一上线,就即时通知服务去处理,然后在即时的下发消息到设备。| +| IJT808SessionConsumer| 会话通知(在线/离线)数据消费接口| -| | IJT808MsgProducer| 数据生产接口| 网关将接收到的数据发送到队列| | IJT808MsgConsumer| 数据消费接口| 将数据进行对应的消息业务处理(例:设备流量统计、第三方平台数据转发、消息日志等) | | IJT808MsgReplyProducer| 应答数据生产接口|将生产的数据解析为对应的消息Id应答发送到队列 | | IJT808MsgReplyConsumer| 应答数据消费接口| 将接收到的应答数据下发给设备| -> 只要实现IJT808SessionPublishing接口的任意一款MQ都能实现该功能。 - > 使用物联网卡通过udp下发指令时,存储的那个socket地址端口,有效期非常短,不速度快点下发,那个socket地址端口就可能映射到别的对应卡去了,所以此处采用跟随设备消息下发指令。 ## NuGet安装 @@ -81,21 +80,26 @@ static async Task Main(string[] args) services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddJT808Configure() .AddJT808NettyCore(hostContext.Configuration) - //自定义会话通知(在线/离线)使用异步方式 - //.ReplaceSessionPublishing() .AddJT808TcpNettyHost() .AddJT808UdpNettyHost() .AddJT808WebApiNettyHost() + //扩展webapi JT808MsgIdHttpHandlerBase + //.ReplaceMsgIdHandler() .Builder(); - //webapi客户端调用 - services.AddHttpApi().ConfigureHttpApiConfig((c, p) => - { - c.HttpHost = new Uri("http://localhost:828/jt808api/"); - c.FormatOptions.DateTimeFormat = "yyyy-MM-dd HH:mm:ss.fff"; - c.LoggerFactory = p.GetRequiredService(); - }); - var client = services.BuildServiceProvider().GetRequiredService(); - var result = client.GetTcpAtomicCounter().InvokeAsync().Result; + //添加kafka插件 + //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) + //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) + //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) + //.Builder(); + //webapi客户端调用 + //services.AddHttpApi().ConfigureHttpApiConfig((c, p) => + //{ + // c.HttpHost = new Uri("http://localhost:828/jt808api/"); + // c.FormatOptions.DateTimeFormat = "yyyy-MM-dd HH:mm:ss.fff"; + // c.LoggerFactory = p.GetRequiredService(); + //}); + //var client = services.BuildServiceProvider().GetRequiredService(); + //var result = client.GetTcpAtomicCounter().InvokeAsync().Result; }); await serverHostBuilder.RunConsoleAsync(); diff --git a/src/JT808.DotNetty.Core/Interfaces/IJT808NettyBuilder.cs b/src/JT808.DotNetty.Abstractions/IJT808NettyBuilder.cs similarity index 59% rename from src/JT808.DotNetty.Core/Interfaces/IJT808NettyBuilder.cs rename to src/JT808.DotNetty.Abstractions/IJT808NettyBuilder.cs index d495cae..40e6723 100644 --- a/src/JT808.DotNetty.Core/Interfaces/IJT808NettyBuilder.cs +++ b/src/JT808.DotNetty.Abstractions/IJT808NettyBuilder.cs @@ -5,13 +5,11 @@ using System; using System.Collections.Generic; using System.Text; -namespace JT808.DotNetty.Core.Interfaces +namespace JT808.DotNetty.Abstractions { public interface IJT808NettyBuilder { IJT808Builder JT808Builder { get; } - IJT808NettyBuilder ReplaceSessionPublishing() where T : IJT808SessionPublishing; - IJT808NettyBuilder ReplaceMsgProducer() where T : IJT808MsgProducer; IJT808Builder Builder(); } } diff --git a/src/JT808.DotNetty.Abstractions/IJT808SessionConsumer.cs b/src/JT808.DotNetty.Abstractions/IJT808SessionConsumer.cs new file mode 100644 index 0000000..3525c6c --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/IJT808SessionConsumer.cs @@ -0,0 +1,18 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; + +namespace JT808.DotNetty.Abstractions +{ + /// + /// 会话通知(在线/离线) + /// + public interface IJT808SessionConsumer : IJT808PubSub, IDisposable + { + void OnMessage(Action<(string Notice, string TerminalNo)> callback); + CancellationTokenSource Cts { get; } + void Subscribe(); + void Unsubscribe(); + } +} diff --git a/src/JT808.DotNetty.Abstractions/IJT808SessionProducer.cs b/src/JT808.DotNetty.Abstractions/IJT808SessionProducer.cs new file mode 100644 index 0000000..04c452b --- /dev/null +++ b/src/JT808.DotNetty.Abstractions/IJT808SessionProducer.cs @@ -0,0 +1,13 @@ +using System; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Abstractions +{ + /// + /// 会话通知(在线/离线) + /// + public interface IJT808SessionProducer : IJT808PubSub, IDisposable + { + Task ProduceAsync(string notice,string terminalNo); + } +} diff --git a/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs b/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs deleted file mode 100644 index 3a8e56c..0000000 --- a/src/JT808.DotNetty.Abstractions/IJT808SessionPublishing.cs +++ /dev/null @@ -1,12 +0,0 @@ -using System.Threading.Tasks; - -namespace JT808.DotNetty.Abstractions -{ - /// - /// 会话通知(在线/离线) - /// - public interface IJT808SessionPublishing - { - Task PublishAsync(string topicName, string value); - } -} diff --git a/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj b/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj index 87166d8..3971abb 100644 --- a/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj +++ b/src/JT808.DotNetty.Abstractions/JT808.DotNetty.Abstractions.csproj @@ -6,5 +6,7 @@ 基于DotNetty实现的JT808DotNetty的抽象库 基于DotNetty实现的JT808DotNetty的抽象库 - + + + diff --git a/src/JT808.DotNetty.Abstractions/JT808NettyConstants.cs b/src/JT808.DotNetty.Abstractions/JT808NettyConstants.cs index cb98506..93d1546 100644 --- a/src/JT808.DotNetty.Abstractions/JT808NettyConstants.cs +++ b/src/JT808.DotNetty.Abstractions/JT808NettyConstants.cs @@ -5,7 +5,7 @@ public const string SessionOnline= "JT808SessionOnline"; public const string SessionOffline = "JT808SessionOffline"; - + public const string SessionTopic = "jt808session"; public const string MsgTopic = "jt808msgdefault"; public const string MsgReplyTopic = "jt808msgreplydefault"; diff --git a/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs b/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs index 50341fc..378b3a0 100644 --- a/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs +++ b/src/JT808.DotNetty.Core/Impls/JT808NettyBuilderDefault.cs @@ -23,22 +23,10 @@ namespace JT808.DotNetty.Core.Impls return JT808Builder; } - public IJT808NettyBuilder ReplaceSessionPublishing() where T : IJT808SessionPublishing - { - JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionPublishing), typeof(T), ServiceLifetime.Singleton)); - return this; - } - - public IJT808NettyBuilder ReplaceMsgProducer() where T : IJT808MsgProducer - { - JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(T), ServiceLifetime.Singleton)); - return this; - } - - public IJT808NettyBuilder ReplaceMsgReplyConsumer() where T : IJT808MsgReplyConsumer - { - JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(T), ServiceLifetime.Singleton)); - return this; - } + //public IJT808NettyBuilder ReplaceSessionPublishing() where T : IJT808SessionPublishing + //{ + // JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808SessionPublishing), typeof(T), ServiceLifetime.Singleton)); + // return this; + //} } } \ No newline at end of file diff --git a/src/JT808.DotNetty.Core/Impls/JT808SessionProducerDefaultImpl.cs b/src/JT808.DotNetty.Core/Impls/JT808SessionProducerDefaultImpl.cs new file mode 100644 index 0000000..b5d6d11 --- /dev/null +++ b/src/JT808.DotNetty.Core/Impls/JT808SessionProducerDefaultImpl.cs @@ -0,0 +1,31 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Core +{ + internal class JT808SessionProducerDefaultImpl : IJT808SessionProducer + { + private readonly ILogger logger; + public JT808SessionProducerDefaultImpl(ILoggerFactory loggerFactory) + { + logger = loggerFactory.CreateLogger(); + } + + public string TopicName => JT808NettyConstants.SessionTopic; + + public void Dispose() + { + + } + + public Task ProduceAsync(string terminalNo, string notice) + { + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"{terminalNo}-{notice}"); + } + return Task.CompletedTask; + } + } +} diff --git a/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs b/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs deleted file mode 100644 index 859273b..0000000 --- a/src/JT808.DotNetty.Core/Impls/JT808SessionPublishingEmptyImpl.cs +++ /dev/null @@ -1,23 +0,0 @@ -using JT808.DotNetty.Abstractions; -using Microsoft.Extensions.Logging; -using System.Threading.Tasks; - -namespace JT808.DotNetty.Core -{ - internal class JT808SessionPublishingEmptyImpl : IJT808SessionPublishing - { - private readonly ILogger logger; - public JT808SessionPublishingEmptyImpl(ILoggerFactory loggerFactory) - { - logger = loggerFactory.CreateLogger(); - } - public Task PublishAsync(string topicName, string value) - { - if (logger.IsEnabled(LogLevel.Debug)) - { - logger.LogDebug($"{topicName}-{value}"); - } - return Task.CompletedTask; - } - } -} diff --git a/src/JT808.DotNetty.Core/Interfaces/IJT808WebApiNettyBuilder.cs b/src/JT808.DotNetty.Core/Interfaces/IJT808WebApiNettyBuilder.cs index 09bf34a..02dae2b 100644 --- a/src/JT808.DotNetty.Core/Interfaces/IJT808WebApiNettyBuilder.cs +++ b/src/JT808.DotNetty.Core/Interfaces/IJT808WebApiNettyBuilder.cs @@ -1,4 +1,5 @@ -using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Handlers; using System; using System.Collections.Generic; using System.Text; diff --git a/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj b/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj index 6efc62c..73c4ca0 100644 --- a/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj +++ b/src/JT808.DotNetty.Core/JT808.DotNetty.Core.csproj @@ -11,7 +11,6 @@ - diff --git a/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs b/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs index 54228cb..f568f14 100644 --- a/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs +++ b/src/JT808.DotNetty.Core/JT808CoreDotnettyExtensions.cs @@ -65,7 +65,7 @@ namespace JT808.DotNetty.Core nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.TryAddSingleton(); - nettyBuilder.JT808Builder.Services.TryAddSingleton(); + nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.AddHostedService(); return nettyBuilder; @@ -93,7 +93,7 @@ namespace JT808.DotNetty.Core nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.TryAddSingleton(); - nettyBuilder.JT808Builder.Services.TryAddSingleton(); + nettyBuilder.JT808Builder.Services.TryAddSingleton(); nettyBuilder.JT808Builder.Services.AddHostedService(); return nettyBuilder; } diff --git a/src/JT808.DotNetty.Core/Session/JT808SessionManager.cs b/src/JT808.DotNetty.Core/Session/JT808SessionManager.cs index 729c4a8..f59f2e5 100644 --- a/src/JT808.DotNetty.Core/Session/JT808SessionManager.cs +++ b/src/JT808.DotNetty.Core/Session/JT808SessionManager.cs @@ -18,27 +18,27 @@ namespace JT808.DotNetty.Core.Session private readonly ILogger logger; private readonly IJT808DatagramPacket jT808DatagramPacket; - public IJT808SessionPublishing JT808SessionPublishing { get; } + public IJT808SessionProducer JT808SessionProducer { get; } public ConcurrentDictionary Sessions { get; } public JT808SessionManager( - IJT808SessionPublishing jT808SessionPublishing, + IJT808SessionProducer jT808SessionProducer, ILoggerFactory loggerFactory ) { Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - JT808SessionPublishing = jT808SessionPublishing; + JT808SessionProducer = jT808SessionProducer; logger = loggerFactory.CreateLogger(); } public JT808SessionManager( - IJT808SessionPublishing jT808SessionPublishing, + IJT808SessionProducer jT808SessionProducer, ILoggerFactory loggerFactory, IJT808DatagramPacket jT808DatagramPacket) { Sessions = new ConcurrentDictionary(StringComparer.OrdinalIgnoreCase); - JT808SessionPublishing = jT808SessionPublishing; + JT808SessionProducer = jT808SessionProducer; logger = loggerFactory.CreateLogger(); this.jT808DatagramPacket = jT808DatagramPacket; } @@ -200,7 +200,7 @@ namespace JT808.DotNetty.Core.Session //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 //有设备关联上来可以进行通知 例如:使用Redis发布订阅 - JT808SessionPublishing.PublishAsync(JT808NettyConstants.SessionOnline, jT808TcpSession.TerminalPhoneNo); + JT808SessionProducer.ProduceAsync(JT808NettyConstants.SessionOnline,jT808TcpSession.TerminalPhoneNo); } } } @@ -230,7 +230,7 @@ namespace JT808.DotNetty.Core.Session //移动很多卡,存储的那个socket地址端口,有效期非常短 //不速度快点下发,那个socket地址端口就可能映射到别的对应卡去了 //所以此处采用跟随设备消息下发指令 - JT808SessionPublishing.PublishAsync(JT808NettyConstants.SessionOnline, terminalPhoneNo); + JT808SessionProducer.ProduceAsync(JT808NettyConstants.SessionOnline,terminalPhoneNo); } public IJT808Session RemoveSession(string terminalPhoneNo) { @@ -254,7 +254,7 @@ namespace JT808.DotNetty.Core.Session } string nos = string.Join(",", terminalPhoneNos); logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); - JT808SessionPublishing.PublishAsync(JT808NettyConstants.SessionOffline, nos); + JT808SessionProducer.ProduceAsync(JT808NettyConstants.SessionOffline, nos); return jT808Session; } else @@ -262,7 +262,7 @@ namespace JT808.DotNetty.Core.Session if (Sessions.TryRemove(terminalPhoneNo, out IJT808Session jT808SessionRemove)) { logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); - JT808SessionPublishing.PublishAsync(JT808NettyConstants.SessionOffline, terminalPhoneNo); + JT808SessionProducer.ProduceAsync(JT808NettyConstants.SessionOffline, terminalPhoneNo); return jT808SessionRemove; } else @@ -284,7 +284,7 @@ namespace JT808.DotNetty.Core.Session } string nos = string.Join(",", terminalPhoneNos); logger.LogInformation($">>>{nos} Channel Remove."); - JT808SessionPublishing.PublishAsync(JT808NettyConstants.SessionOffline, nos); + JT808SessionProducer.ProduceAsync(JT808NettyConstants.SessionOffline, nos); } } public IEnumerable GetAll() diff --git a/src/JT808.DotNetty.Kafka/JT808ConsumerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808ConsumerConfig.cs similarity index 100% rename from src/JT808.DotNetty.Kafka/JT808ConsumerConfig.cs rename to src/JT808.DotNetty.Kafka/Configs/JT808ConsumerConfig.cs diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808MsgConsumerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808MsgConsumerConfig.cs new file mode 100644 index 0000000..c37439f --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808MsgConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808MsgConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808MsgProducerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808MsgProducerConfig.cs new file mode 100644 index 0000000..659789c --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808MsgProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808MsgProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyConsumerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyConsumerConfig.cs new file mode 100644 index 0000000..d7db165 --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808MsgReplyConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808MsgReplyConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyProducerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyProducerConfig.cs new file mode 100644 index 0000000..5b13af2 --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808MsgReplyProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808MsgReplyProducerConfig : JT808ProducerConfig, IOptions + { + JT808MsgReplyProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/JT808ProducerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808ProducerConfig.cs similarity index 100% rename from src/JT808.DotNetty.Kafka/JT808ProducerConfig.cs rename to src/JT808.DotNetty.Kafka/Configs/JT808ProducerConfig.cs diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808SessionConsumerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808SessionConsumerConfig.cs new file mode 100644 index 0000000..2aceeb4 --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808SessionConsumerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808SessionConsumerConfig : JT808ConsumerConfig, IOptions + { + JT808SessionConsumerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/Configs/JT808SessionProducerConfig.cs b/src/JT808.DotNetty.Kafka/Configs/JT808SessionProducerConfig.cs new file mode 100644 index 0000000..37c0a06 --- /dev/null +++ b/src/JT808.DotNetty.Kafka/Configs/JT808SessionProducerConfig.cs @@ -0,0 +1,13 @@ +using Confluent.Kafka; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka +{ + public class JT808SessionProducerConfig : JT808ProducerConfig, IOptions + { + JT808SessionProducerConfig IOptions.Value => this; + } +} diff --git a/src/JT808.DotNetty.Kafka/JT808.DotNetty.Kafka.csproj b/src/JT808.DotNetty.Kafka/JT808.DotNetty.Kafka.csproj index 3eb35c4..e8c00a4 100644 --- a/src/JT808.DotNetty.Kafka/JT808.DotNetty.Kafka.csproj +++ b/src/JT808.DotNetty.Kafka/JT808.DotNetty.Kafka.csproj @@ -8,8 +8,11 @@ + + + diff --git a/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs new file mode 100644 index 0000000..b4d2faa --- /dev/null +++ b/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs @@ -0,0 +1,47 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.DotNetty.Kafka +{ + public static class JT808ClientKafkaExtensions + { + /// + /// + /// + /// + /// GetSection("JT808MsgConsumerConfig") + /// + public static IServiceCollection AddJT808ClientKafkaMsgConsumer(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT808MsgConsumerConfig")); + serviceDescriptors.TryAddSingleton(); + return serviceDescriptors; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyProducerConfig") + /// + public static IServiceCollection AddJT808ClientKafkaMsgReplyProducer(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT808MsgReplyProducerConfig")); + serviceDescriptors.TryAddSingleton(); + return serviceDescriptors; + } + /// + /// + /// + /// + /// GetSection("JT808SessionConsumerConfig") + /// + public static IServiceCollection AddJT808ClientKafkaSessionConsumer(this IServiceCollection serviceDescriptors, IConfiguration configuration) + { + serviceDescriptors.Configure(configuration.GetSection("JT808SessionConsumerConfig")); + serviceDescriptors.TryAddSingleton(); + return serviceDescriptors; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs index 5893384..7fea180 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs @@ -21,7 +21,7 @@ namespace JT808.DotNetty.Kafka public string TopicName { get; } public JT808MsgConsumer( - IOptions consumerConfigAccessor, + IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) { consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); diff --git a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs index 3dfa14b..573822e 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs @@ -14,7 +14,7 @@ namespace JT808.DotNetty.Kafka private readonly IProducer producer; public JT808MsgProducer( - IOptions producerConfigAccessor) + IOptions producerConfigAccessor) { producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); TopicName = producerConfigAccessor.Value.TopicName; diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs index 0aafa19..09c459c 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs @@ -10,7 +10,7 @@ using System.Threading.Tasks; namespace JT808.DotNetty.Kafka { - public class JT808MsgReplyConsumer : IJT808MsgConsumer + public class JT808MsgReplyConsumer : IJT808MsgReplyConsumer { public CancellationTokenSource Cts => new CancellationTokenSource(); @@ -21,7 +21,7 @@ namespace JT808.DotNetty.Kafka public string TopicName { get; } public JT808MsgReplyConsumer( - IOptions consumerConfigAccessor, + IOptions consumerConfigAccessor, ILoggerFactory loggerFactory) { consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs index 138a0b5..fab6275 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs @@ -8,13 +8,13 @@ using System.Threading.Tasks; namespace JT808.DotNetty.Kafka { - public class JT808MsgReplyProducer : IJT808MsgProducer + public class JT808MsgReplyProducer : IJT808MsgReplyProducer { public string TopicName { get;} private IProducer producer; public JT808MsgReplyProducer( - IOptions producerConfigAccessor) + IOptions producerConfigAccessor) { producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); TopicName = producerConfigAccessor.Value.TopicName; diff --git a/src/JT808.DotNetty.Kafka/JT808ServerKafkaExtensions.cs b/src/JT808.DotNetty.Kafka/JT808ServerKafkaExtensions.cs new file mode 100644 index 0000000..d2494ce --- /dev/null +++ b/src/JT808.DotNetty.Kafka/JT808ServerKafkaExtensions.cs @@ -0,0 +1,47 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; + +namespace JT808.DotNetty.Kafka +{ + public static class JT808ServerKafkaExtensions + { + /// + /// + /// + /// + /// GetSection("JT808MsgProducerConfig") + /// + public static IJT808NettyBuilder AddJT808ServerKafkaMsgProducer(this IJT808NettyBuilder jT808NettyBuilder, IConfiguration configuration) + { + jT808NettyBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgProducerConfig")); + jT808NettyBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgProducer), typeof(JT808MsgProducer), ServiceLifetime.Singleton)); + return jT808NettyBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808MsgReplyConsumerConfig") + /// + public static IJT808NettyBuilder AddJT808ServerKafkaMsgReplyConsumer(this IJT808NettyBuilder jT808NettyBuilder, IConfiguration configuration) + { + jT808NettyBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig")); + jT808NettyBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton)); + return jT808NettyBuilder; + } + /// + /// + /// + /// + /// GetSection("JT808SessionProducerConfig") + /// + public static IJT808NettyBuilder AddJT808ServerKafkaSessionProducer(this IJT808NettyBuilder jT808NettyBuilder, IConfiguration configuration) + { + jT808NettyBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808SessionProducerConfig")); + jT808NettyBuilder.JT808Builder.Services.TryAddSingleton(); + return jT808NettyBuilder; + } + } +} \ No newline at end of file diff --git a/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs new file mode 100644 index 0000000..6ddb83d --- /dev/null +++ b/src/JT808.DotNetty.Kafka/JT808SessionConsumer.cs @@ -0,0 +1,84 @@ +using Confluent.Kafka; +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Kafka +{ + public class JT808SessionConsumer : IJT808SessionConsumer + { + public CancellationTokenSource Cts => new CancellationTokenSource(); + + private readonly IConsumer consumer; + + private readonly ILogger logger; + + public string TopicName { get; } + + public JT808SessionConsumer( + IOptions consumerConfigAccessor, + ILoggerFactory loggerFactory) + { + consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; + logger = loggerFactory.CreateLogger("JT808SessionConsumer"); + } + + public void OnMessage(Action<(string Notice, string TerminalNo)> callback) + { + Task.Run(() => + { + while (!Cts.IsCancellationRequested) + { + try + { + //如果不指定分区,根据kafka的机制会从多个分区中拉取数据 + //如果指定分区,根据kafka的机制会从相应的分区中拉取数据 + var data = consumer.Consume(Cts.Token); + if (logger.IsEnabled(LogLevel.Debug)) + { + logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); + } + callback((data.Key, data.Value)); + } + catch (ConsumeException ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + catch (OperationCanceledException ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + catch (Exception ex) + { + logger.LogError(ex, TopicName); + Thread.Sleep(1000); + } + } + }, Cts.Token); + } + + public void Subscribe() + { + consumer.Subscribe(TopicName); + } + + public void Unsubscribe() + { + consumer.Unsubscribe(); + } + + public void Dispose() + { + consumer.Close(); + consumer.Dispose(); + } + } +} diff --git a/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs new file mode 100644 index 0000000..dd781b6 --- /dev/null +++ b/src/JT808.DotNetty.Kafka/JT808SessionProducer.cs @@ -0,0 +1,37 @@ +using Confluent.Kafka; +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading.Tasks; + +namespace JT808.DotNetty.Kafka +{ + public class JT808SessionProducer : IJT808SessionProducer + { + public string TopicName { get; } + + private readonly IProducer producer; + public JT808SessionProducer( + IOptions producerConfigAccessor) + { + producer = new ProducerBuilder(producerConfigAccessor.Value).Build(); + TopicName = producerConfigAccessor.Value.TopicName; + } + + public void Dispose() + { + producer.Dispose(); + } + + public async Task ProduceAsync(string notice,string terminalNo) + { + await producer.ProduceAsync(TopicName, new Message + { + Key = notice, + Value = terminalNo + }); + } + } +} diff --git a/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs b/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs index 203075d..9047563 100644 --- a/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs +++ b/src/JT808.DotNetty.Tcp/JT808TcpDotnettyExtensions.cs @@ -4,6 +4,7 @@ using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; using System.Runtime.CompilerServices; using JT808.DotNetty.Core.Interfaces; +using JT808.DotNetty.Abstractions; [assembly: InternalsVisibleTo("JT808.DotNetty.Tcp.Test")] diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/SeedTcpSession.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/SeedTcpSession.cs index cf98510..21b090a 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/SeedTcpSession.cs +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Core.Test/SeedTcpSession.cs @@ -13,7 +13,7 @@ namespace JT808.DotNetty.Core.Test public class SeedTcpSession { public JT808SessionManager jT80TcpSessionManager = new JT808SessionManager( - new JT808SessionPublishingEmptyImpl(new LoggerFactory()), + new JT808SessionProducerDefaultImpl(new LoggerFactory()), new LoggerFactory()); public SeedTcpSession() diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/JT808.DotNetty.Hosting.csproj b/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/JT808.DotNetty.Hosting.csproj index ee41510..3c5227a 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/JT808.DotNetty.Hosting.csproj +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/JT808.DotNetty.Hosting.csproj @@ -18,6 +18,7 @@ + diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs index 056a26a..1625ff3 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs @@ -19,6 +19,7 @@ using System.Collections.Generic; using System.Diagnostics; using System.Threading.Tasks; using WebApiClient.Extensions.DependencyInjection; +using JT808.DotNetty.Kafka; namespace JT808.DotNetty.Hosting { @@ -55,7 +56,14 @@ namespace JT808.DotNetty.Hosting .AddJT808TcpNettyHost() .AddJT808UdpNettyHost() .AddJT808WebApiNettyHost() + //扩展webapi JT808MsgIdHttpHandlerBase + //.ReplaceMsgIdHandler() .Builder(); + //添加kafka插件 + //.AddJT808ServerKafkaMsgProducer(hostContext.Configuration) + //.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration) + //.AddJT808ServerKafkaSessionProducer(hostContext.Configuration) + //.Builder(); //webapi客户端调用 //services.AddHttpApi().ConfigureHttpApiConfig((c, p) => //{ diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj index 916f895..3922235 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj @@ -7,7 +7,11 @@ + + + + all @@ -19,4 +23,10 @@ + + + Always + + + diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808BaseTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808BaseTest.cs new file mode 100644 index 0000000..c71537e --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808BaseTest.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808BaseTest + { + public const string BootstrapServers = "172.16.19.120:9092"; + + //public const string BootstrapServers = "192.168.3.11:9092"; + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808ConfigTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808ConfigTest.cs new file mode 100644 index 0000000..b76a092 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808ConfigTest.cs @@ -0,0 +1,54 @@ +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.DependencyInjection.Extensions; +using System; +using System.Collections.Generic; +using System.Text; +using Xunit; +using Microsoft.Extensions.FileProviders; +using Microsoft.Extensions.Options; +using System.IO; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808ConfigTest + { + [Fact] + public void Test1() + { + var configurationBuilder = new ConfigurationBuilder(); + configurationBuilder.SetBasePath(Path.Combine(AppDomain.CurrentDomain.BaseDirectory)); + configurationBuilder.AddJsonFile("JT808Config.json"); + IConfigurationRoot configurationRoot = configurationBuilder.Build(); + IServiceCollection serviceDescriptors = new ServiceCollection(); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808MsgProducerConfig")); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808MsgConsumerConfig")); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808MsgReplyProducerConfig")); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808MsgReplyConsumerConfig")); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808SessionProducerConfig")); + serviceDescriptors.Configure(configurationRoot.GetSection("JT808SessionConsumerConfig")); + var serviceProvider = serviceDescriptors.BuildServiceProvider(); + var jT808MsgProducerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808Msg", jT808MsgProducerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9092", jT808MsgProducerConfigAccessor.Value.BootstrapServers); + var jT808MsgConsumerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808Msg", jT808MsgConsumerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9092", jT808MsgConsumerConfigAccessor.Value.BootstrapServers); + Assert.Equal("msg-group", jT808MsgConsumerConfigAccessor.Value.GroupId); + var jT808MsgReplyProducerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808MsgReply", jT808MsgReplyProducerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9093", jT808MsgReplyProducerConfigAccessor.Value.BootstrapServers); + var jT808MsgReplyConsumerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808MsgReply", jT808MsgReplyConsumerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9093", jT808MsgReplyConsumerConfigAccessor.Value.BootstrapServers); + Assert.Equal("msgreply-group", jT808MsgReplyConsumerConfigAccessor.Value.GroupId); + var jT808SessionProducerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808Session", jT808SessionProducerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9094", jT808SessionProducerConfigAccessor.Value.BootstrapServers); + var jT808SessionConsumerConfigAccessor = serviceProvider.GetRequiredService>(); + Assert.Equal("JT808Session", jT808SessionConsumerConfigAccessor.Value.TopicName); + Assert.Equal("127.0.0.1:9094", jT808SessionConsumerConfigAccessor.Value.BootstrapServers); + Assert.Equal("session-group", jT808SessionConsumerConfigAccessor.Value.GroupId); + } + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs index c0fe439..b2256a5 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs @@ -9,16 +9,12 @@ using Xunit; namespace JT808.DotNetty.Kafka.Test { - public class JT808MsgConsumerTest + public class JT808MsgConsumerTest: JT808BaseTest { - public const string BootstrapServers = "172.16.19.120:9092"; - - //public const string BootstrapServers = "192.168.3.11:9092"; - - public JT808ConsumerConfig JT808ConsumerConfig = new JT808ConsumerConfig + public JT808MsgConsumerConfig JT808ConsumerConfig = new JT808MsgConsumerConfig { - GroupId="jt808.gps.test", - TopicName = "jt808test", + GroupId= "JT808Msg.test", + TopicName = "JT808Msg", BootstrapServers = BootstrapServers }; [Fact] diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs index 4392568..37d0e12 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs @@ -9,15 +9,12 @@ using Xunit; namespace JT808.DotNetty.Kafka.Test { - public class JT808MsgProducerTest + public class JT808MsgProducerTest: JT808BaseTest { - public const string BootstrapServers = "172.16.19.120:9092"; - //public const string BootstrapServers = "192.168.3.11:9092"; - - public JT808ProducerConfig JT808ProducerConfig = new JT808ProducerConfig + public JT808MsgProducerConfig JT808ProducerConfig = new JT808MsgProducerConfig { - TopicName = "jt808test", + TopicName = "JT808Msg", BootstrapServers = BootstrapServers }; @@ -29,6 +26,10 @@ namespace JT808.DotNetty.Kafka.Test { adminClient.DeleteTopicsAsync(new List() { JT808ProducerConfig.TopicName }).Wait(); } + catch (AggregateException e) + { + //Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } catch (CreateTopicsException e) { Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyConsumerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyConsumerTest.cs new file mode 100644 index 0000000..f896957 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyConsumerTest.cs @@ -0,0 +1,36 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using Xunit; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808MsgReplyConsumerTest: JT808BaseTest + { + + public JT808MsgReplyConsumerConfig JT808ConsumerConfig = new JT808MsgReplyConsumerConfig + { + GroupId= "jt808.MsgReply.test", + TopicName = "JT808MsgReply", + BootstrapServers = BootstrapServers + }; + [Fact] + public void Test1() + { + using (IJT808MsgReplyConsumer JT808MsgConsumer = new JT808MsgReplyConsumer(JT808ConsumerConfig, new LoggerFactory())) + { + JT808MsgConsumer.Subscribe(); + JT808MsgConsumer.OnMessage(item => + { + Debug.WriteLine($"{item.TerminalNo}-{item.Data.Length}"); + }); + Thread.Sleep(30000); + JT808MsgConsumer.Unsubscribe(); + } + } + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyProducerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyProducerTest.cs new file mode 100644 index 0000000..a6ae040 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgReplyProducerTest.cs @@ -0,0 +1,59 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using JT808.DotNetty.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using Xunit; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808MsgReplyProducerTest: JT808BaseTest + { + + public JT808MsgReplyProducerConfig JT808ProducerConfig = new JT808MsgReplyProducerConfig + { + TopicName = "JT808MsgReply", + BootstrapServers = BootstrapServers + }; + + public JT808MsgReplyProducerTest() + { + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = BootstrapServers }).Build()) + { + try + { + adminClient.DeleteTopicsAsync(new List() { JT808ProducerConfig.TopicName }).Wait(); + } + catch(AggregateException e) + { + //Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + catch (CreateTopicsException e) + { + Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + } + + [Fact] + public void Test1() + { + using (IJT808MsgReplyProducer jT808MsgProducer = new JT808MsgReplyProducer(JT808ProducerConfig)) + { + jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); + } + } + + [Fact] + public void Test2() + { + using (IJT808MsgReplyProducer jT808MsgProducer = new JT808MsgReplyProducer(JT808ProducerConfig)) + { + jT808MsgProducer.ProduceAsync("123457", new byte[] { 0x7E, 0, 0x7E }).Wait(); + jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); + } + } + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionConsumerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionConsumerTest.cs new file mode 100644 index 0000000..4b98857 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionConsumerTest.cs @@ -0,0 +1,36 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using Xunit; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808SessionConsumerTest : JT808BaseTest + { + + public JT808SessionConsumerConfig JT808ConsumerConfig = new JT808SessionConsumerConfig + { + GroupId= "JT808Session.test", + TopicName = "JT808Session", + BootstrapServers = BootstrapServers + }; + [Fact] + public void Test1() + { + using (IJT808SessionConsumer JT808MsgConsumer = new JT808SessionConsumer(JT808ConsumerConfig, new LoggerFactory())) + { + JT808MsgConsumer.Subscribe(); + JT808MsgConsumer.OnMessage(item => + { + Debug.WriteLine($"{item.TerminalNo}-{item.Notice}"); + }); + Thread.Sleep(30000); + JT808MsgConsumer.Unsubscribe(); + } + } + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionProducerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionProducerTest.cs new file mode 100644 index 0000000..2c9bea9 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808SessionProducerTest.cs @@ -0,0 +1,49 @@ +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using JT808.DotNetty.Abstractions; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using Xunit; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808SessionProducerTest: JT808BaseTest + { + public JT808SessionProducerConfig JT808ProducerConfig = new JT808SessionProducerConfig + { + TopicName = "JT808Session", + BootstrapServers = BootstrapServers + }; + + public JT808SessionProducerTest() + { + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = BootstrapServers }).Build()) + { + try + { + adminClient.DeleteTopicsAsync(new List() { JT808ProducerConfig.TopicName }).Wait(); + } + catch (AggregateException e) + { + //Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + catch (CreateTopicsException e) + { + Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + } + + [Fact] + public void Test1() + { + using (IJT808SessionProducer jT808MsgProducer = new JT808SessionProducer(JT808ProducerConfig)) + { + jT808MsgProducer.ProduceAsync("online","123456").Wait(); + jT808MsgProducer.ProduceAsync("offline", "123457").Wait(); + } + } + } +} diff --git a/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs b/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs index ce55477..d08d817 100644 --- a/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs +++ b/src/JT808.DotNetty.Udp/JT808UdpDotnettyExtensions.cs @@ -1,4 +1,5 @@ -using JT808.DotNetty.Core.Codecs; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Codecs; using JT808.DotNetty.Core.Impls; using JT808.DotNetty.Core.Interfaces; using JT808.DotNetty.Udp.Handlers; diff --git a/src/JT808.DotNetty.WebApi/JT808WebApiBuilderDefault.cs b/src/JT808.DotNetty.WebApi/JT808WebApiBuilderDefault.cs index 53cda12..b64c68f 100644 --- a/src/JT808.DotNetty.WebApi/JT808WebApiBuilderDefault.cs +++ b/src/JT808.DotNetty.WebApi/JT808WebApiBuilderDefault.cs @@ -1,4 +1,5 @@ -using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Handlers; using JT808.DotNetty.Core.Interfaces; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; diff --git a/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs b/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs index f3b30fe..1d4fec3 100644 --- a/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs +++ b/src/JT808.DotNetty.WebApi/JT808WebApiDotnettyExtensions.cs @@ -1,4 +1,5 @@ -using JT808.DotNetty.Core.Handlers; +using JT808.DotNetty.Abstractions; +using JT808.DotNetty.Core.Handlers; using JT808.DotNetty.Core.Interfaces; using JT808.DotNetty.WebApi.Handlers; using Microsoft.Extensions.DependencyInjection;