From 545d87f60acd71407338bd10d5a0fd4e8a074eec Mon Sep 17 00:00:00 2001 From: SmallChi <564952747@qq.com> Date: Mon, 24 Dec 2018 23:40:45 +0800 Subject: [PATCH] =?UTF-8?q?1.=E5=A2=9E=E5=8A=A0redis=E5=8F=91=E5=B8=83?= =?UTF-8?q?=E9=80=9A=E7=9F=A5=E5=AE=9E=E7=8E=B0=E5=8F=8A=E6=B5=8B=E8=AF=95?= =?UTF-8?q?=E7=94=A8=E4=BE=8B=202.=E5=A2=9E=E5=8A=A0=E5=B8=B8=E9=87=8F?= =?UTF-8?q?=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 21 +++-- .../JT808SessionPublishingRedisImplTest.cs | 78 +++++++++++++++++++ src/JT808.DotNetty.Test/SeedSession.cs | 5 +- src/JT808.DotNetty.Test/appsettings.json | 3 +- .../JT808SessionPublishingRedisImpl.cs | 55 ++++++++++--- src/JT808.DotNetty/JT808Constants.cs | 13 ++++ src/JT808.DotNetty/JT808DotnettyExtensions.cs | 1 + src/JT808.DotNetty/JT808SessionManager.cs | 16 +++- 8 files changed, 172 insertions(+), 20 deletions(-) create mode 100644 src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs create mode 100644 src/JT808.DotNetty/JT808Constants.cs diff --git a/README.md b/README.md index 87c262e..2b7b0a1 100644 --- a/README.md +++ b/README.md @@ -33,7 +33,18 @@ [WebApi接口服务](https://github.com/SmallChi/JT808DotNetty/blob/master/api/README.md) -### 3.集成业务消息处理程序 +### 3.集成会话通知(在线/离线) + +| 功能 | 说明 | +|:-------:|:-------:|:-------:| +| JT808SessionPublishingRedisImpl | 基于Redis的发布通知 | +| JT808SessionPublishingEmptyImpl | 默认空实现 | + +使用场景:有些超长待机的设备,不会实时保持连接,那么通过平台下发的命令是无法到达的,这时候就需要设备一上线,就即时通知服务去处理,然后在即时的下发消息到设备。 + +> 只要实现IJT808SessionPublishing接口的任意一款MQ都能实现该功能。 + +### 4.集成业务消息处理程序 | 功能 | 说明 | 使用场景 | |:-------:|:-------:|:-------:| @@ -41,7 +52,7 @@ ### 举个栗子1 -#### 3.1.实现业务消息处理程序JT808MsgIdHandlerBase +#### 4.1.实现业务消息处理程序JT808MsgIdHandlerBase ```business Imp public class JT808MsgIdCustomHandler : JT808MsgIdHandlerBase @@ -63,19 +74,19 @@ public class JT808MsgIdCustomHandler : JT808MsgIdHandlerBase ``` -#### 3.2.自定义业务消息处理程序替换默认实现 +#### 4.2.自定义业务消息处理程序替换默认实现 ``` handler services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHandlerBase), typeof(JT808MsgIdCustomHandler), ServiceLifetime.Singleton)); ``` -#### 3.3.使用JT808 Host +#### 4.3.使用JT808 Host ``` host UseJT808Host() ``` -#### 3.4.完整示例 +#### 4.4.完整示例 ``` demo // 默认网关端口:808 diff --git a/src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs b/src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs new file mode 100644 index 0000000..5d0d737 --- /dev/null +++ b/src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs @@ -0,0 +1,78 @@ +using JT808.DotNetty.Internal; +using System; +using System.Collections.Generic; +using System.Text; +using Microsoft.Extensions.DependencyInjection; +using JT808.DotNetty.Configurations; +using Microsoft.Extensions.Options; +using Xunit; +using System.Threading.Tasks; +using System.Threading; +using StackExchange.Redis; + +namespace JT808.DotNetty.Test.Internal +{ + public class JT808SessionPublishingRedisImplTest: TestBase + { + JT808SessionPublishingRedisImpl jT808SessionPublishingRedisImpl; + + public JT808SessionPublishingRedisImplTest() + { + jT808SessionPublishingRedisImpl = new JT808SessionPublishingRedisImpl( + ServiceProvider.GetRequiredService>()); + } + + [Fact] + public void Test1() + { + int i = 10000; + Task.Run(() => { + while (i > 0) + { + jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOnline, null, Guid.NewGuid().ToString("N")); + jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOffline, null, Guid.NewGuid().ToString("N")); + i--; + Thread.Sleep(1000); + } + }); + Thread.Sleep(1000); + List SessionOnlines = new List(); + ChannelMessageQueue channelMessageQueue= jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOnline); + channelMessageQueue.OnMessage((msg) => { + SessionOnlines.Add(msg.Message); + }); + List SessionOfflines = new List(); + ChannelMessageQueue channelMessageQueue1 = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOffline); + channelMessageQueue1.OnMessage((msg) => { + SessionOfflines.Add(msg.Message); + }); + Thread.Sleep(3000); + } + + [Fact] + public void Test2() + { + int i = 100000; + Task.Run(() => { + while (i > 0) + { + jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOnline, null, Guid.NewGuid().ToString("N")); + jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOffline, null, Guid.NewGuid().ToString("N")); + i--; + Thread.Sleep(1000); + } + }); + Thread.Sleep(1000); + List SessionOnlines = new List(); + List SessionOfflines = new List(); + ChannelMessageQueue channelMessageQueue = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOnline); + channelMessageQueue.OnMessage((msg) => { + SessionOnlines.Add(msg.Message); + }); + ChannelMessageQueue channelMessageQueue1 = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOffline); + channelMessageQueue1.OnMessage((msg) => { + SessionOfflines.Add(msg.Message); + }); + } + } +} diff --git a/src/JT808.DotNetty.Test/SeedSession.cs b/src/JT808.DotNetty.Test/SeedSession.cs index 8d6aadb..4a9124d 100644 --- a/src/JT808.DotNetty.Test/SeedSession.cs +++ b/src/JT808.DotNetty.Test/SeedSession.cs @@ -1,4 +1,5 @@ using DotNetty.Transport.Channels.Embedded; +using JT808.DotNetty.Internal; using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; @@ -9,7 +10,9 @@ namespace JT808.DotNetty.Test { public class SeedSession { - public JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory()); + public JT808SessionManager jT808SessionManager = new JT808SessionManager( + new JT808SessionPublishingEmptyImpl(), + new LoggerFactory()); public SeedSession() { diff --git a/src/JT808.DotNetty.Test/appsettings.json b/src/JT808.DotNetty.Test/appsettings.json index 7d0edb3..28e73d8 100644 --- a/src/JT808.DotNetty.Test/appsettings.json +++ b/src/JT808.DotNetty.Test/appsettings.json @@ -33,6 +33,7 @@ "Host": "127.0.0.1", "Port": 6562 } - ] + ], + "RedisHost": "127.0.0.1:6379" } } diff --git a/src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs b/src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs index 93cbf99..8f492a5 100644 --- a/src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs +++ b/src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs @@ -1,40 +1,75 @@ using JT808.DotNetty.Interfaces; using System; -using System.Collections.Generic; -using System.Text; using System.Threading.Tasks; using StackExchange.Redis; using Microsoft.Extensions.Options; using JT808.DotNetty.Configurations; -using Microsoft.Extensions.Logging; namespace JT808.DotNetty.Internal { - internal class JT808SessionPublishingRedisImpl : IJT808SessionPublishing + internal class JT808SessionPublishingRedisImpl : IJT808SessionPublishing,IDisposable { private IConnectionMultiplexer connectionMultiplexer; private IOptionsMonitor optionsMonitor; - private ILogger logger; + private string redisHost; - private JT808SessionPublishingRedisImpl( - ILoggerFactory loggerFactory, + private IDisposable optionsMonitorDisposable; + + public JT808SessionPublishingRedisImpl( IOptionsMonitor optionsMonitor ) { this.optionsMonitor = optionsMonitor; - logger = loggerFactory.CreateLogger(); - connectionMultiplexer = ConnectionMultiplexer.Connect(optionsMonitor.CurrentValue.RedisHost); + redisHost = optionsMonitor.CurrentValue.RedisHost; + try + { + connectionMultiplexer = ConnectionMultiplexer.Connect(redisHost); + } + catch + { + + } + optionsMonitorDisposable= this.optionsMonitor.OnChange((config,str) => + { + if(config.RedisHost!= redisHost) + { + redisHost = config.RedisHost; + connectionMultiplexer.Close(); + try + { + connectionMultiplexer = ConnectionMultiplexer.Connect(redisHost); + } + catch + { + + } + } + }); } public Task PublishAsync(string topicName, string key, string value) { if (connectionMultiplexer.IsConnected) { - + Subscriber?.PublishAsync(topicName, value); } return Task.CompletedTask; } + + internal ISubscriber Subscriber + { + get + { + return connectionMultiplexer.GetSubscriber(); + } + } + + public void Dispose() + { + connectionMultiplexer.Close(); + optionsMonitorDisposable.Dispose(); + } } } diff --git a/src/JT808.DotNetty/JT808Constants.cs b/src/JT808.DotNetty/JT808Constants.cs new file mode 100644 index 0000000..309041c --- /dev/null +++ b/src/JT808.DotNetty/JT808Constants.cs @@ -0,0 +1,13 @@ +using System; +using System.Collections.Generic; +using System.Text; + +namespace JT808.DotNetty +{ + public static class JT808Constants + { + public const string SessionOnline= "JT808SessionOnline"; + + public const string SessionOffline = "JT808SessionOffline"; + } +} diff --git a/src/JT808.DotNetty/JT808DotnettyExtensions.cs b/src/JT808.DotNetty/JT808DotnettyExtensions.cs index 6bec490..82fa6e7 100644 --- a/src/JT808.DotNetty/JT808DotnettyExtensions.cs +++ b/src/JT808.DotNetty/JT808DotnettyExtensions.cs @@ -35,6 +35,7 @@ namespace JT808.DotNetty return builder.ConfigureServices((hostContext, services) => { services.Configure(hostContext.Configuration.GetSection("JT808Configuration")); + services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); services.TryAddSingleton(); diff --git a/src/JT808.DotNetty/JT808SessionManager.cs b/src/JT808.DotNetty/JT808SessionManager.cs index f545dc7..4dcf996 100644 --- a/src/JT808.DotNetty/JT808SessionManager.cs +++ b/src/JT808.DotNetty/JT808SessionManager.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Linq; using JT808.DotNetty.Metadata; using DotNetty.Transport.Channels; +using JT808.DotNetty.Interfaces; namespace JT808.DotNetty { @@ -15,9 +16,13 @@ namespace JT808.DotNetty { private readonly ILogger logger; + private readonly IJT808SessionPublishing jT808SessionPublishing; + public JT808SessionManager( + IJT808SessionPublishing jT808SessionPublishing, ILoggerFactory loggerFactory) { + this.jT808SessionPublishing = jT808SessionPublishing; logger = loggerFactory.CreateLogger(); } @@ -69,7 +74,7 @@ namespace JT808.DotNetty //部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接, //这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。 //todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅 - + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo); } } @@ -93,7 +98,9 @@ namespace JT808.DotNetty { SessionIdDict.TryRemove(key, out JT808Session jT808SessionRemove); } - logger.LogInformation($">>>{terminalPhoneNo}-{string.Join(",", terminalPhoneNos)} 1-n Session Remove."); + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); return jT808Session; } else @@ -101,6 +108,7 @@ namespace JT808.DotNetty if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808Session jT808SessionRemove)) { logger.LogInformation($">>>{terminalPhoneNo} Session Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo); return jT808SessionRemove; } else @@ -119,7 +127,9 @@ namespace JT808.DotNetty { SessionIdDict.TryRemove(key, out JT808Session jT808SessionRemove); } - logger.LogInformation($">>>{string.Join(",", terminalPhoneNos)} Channel Remove."); + string nos = string.Join(",", terminalPhoneNos); + logger.LogInformation($">>>{nos} Channel Remove."); + jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos); } public IEnumerable GetAll()