Kaynağa Gözat

1.增加redis发布通知实现及测试用例

2.增加常量配置
tags/v1.0.0
SmallChi 6 yıl önce
ebeveyn
işleme
545d87f60a
8 değiştirilmiş dosya ile 172 ekleme ve 20 silme
  1. +16
    -5
      README.md
  2. +78
    -0
      src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs
  3. +4
    -1
      src/JT808.DotNetty.Test/SeedSession.cs
  4. +2
    -1
      src/JT808.DotNetty.Test/appsettings.json
  5. +45
    -10
      src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs
  6. +13
    -0
      src/JT808.DotNetty/JT808Constants.cs
  7. +1
    -0
      src/JT808.DotNetty/JT808DotnettyExtensions.cs
  8. +13
    -3
      src/JT808.DotNetty/JT808SessionManager.cs

+ 16
- 5
README.md Dosyayı Görüntüle

@@ -33,7 +33,18 @@

[WebApi接口服务](https://github.com/SmallChi/JT808DotNetty/blob/master/api/README.md)

### 3.集成业务消息处理程序
### 3.集成会话通知(在线/离线)

| 功能 | 说明 |
|:-------:|:-------:|:-------:|
| JT808SessionPublishingRedisImpl | 基于Redis的发布通知 |
| JT808SessionPublishingEmptyImpl | 默认空实现 |

使用场景:有些超长待机的设备,不会实时保持连接,那么通过平台下发的命令是无法到达的,这时候就需要设备一上线,就即时通知服务去处理,然后在即时的下发消息到设备。

> 只要实现IJT808SessionPublishing接口的任意一款MQ都能实现该功能。

### 4.集成业务消息处理程序

| 功能 | 说明 | 使用场景 |
|:-------:|:-------:|:-------:|
@@ -41,7 +52,7 @@

### 举个栗子1

#### 3.1.实现业务消息处理程序JT808MsgIdHandlerBase
#### 4.1.实现业务消息处理程序JT808MsgIdHandlerBase

```business Imp
public class JT808MsgIdCustomHandler : JT808MsgIdHandlerBase
@@ -63,19 +74,19 @@ public class JT808MsgIdCustomHandler : JT808MsgIdHandlerBase

```

#### 3.2.自定义业务消息处理程序替换默认实现
#### 4.2.自定义业务消息处理程序替换默认实现

``` handler
services.Replace(new ServiceDescriptor(typeof(JT808MsgIdHandlerBase), typeof(JT808MsgIdCustomHandler), ServiceLifetime.Singleton));
```

#### 3.3.使用JT808 Host
#### 4.3.使用JT808 Host

``` host
UseJT808Host()
```

#### 3.4.完整示例
#### 4.4.完整示例

``` demo
// 默认网关端口:808


+ 78
- 0
src/JT808.DotNetty.Test/Internal/JT808SessionPublishingRedisImplTest.cs Dosyayı Görüntüle

@@ -0,0 +1,78 @@
using JT808.DotNetty.Internal;
using System;
using System.Collections.Generic;
using System.Text;
using Microsoft.Extensions.DependencyInjection;
using JT808.DotNetty.Configurations;
using Microsoft.Extensions.Options;
using Xunit;
using System.Threading.Tasks;
using System.Threading;
using StackExchange.Redis;

namespace JT808.DotNetty.Test.Internal
{
public class JT808SessionPublishingRedisImplTest: TestBase
{
JT808SessionPublishingRedisImpl jT808SessionPublishingRedisImpl;

public JT808SessionPublishingRedisImplTest()
{
jT808SessionPublishingRedisImpl = new JT808SessionPublishingRedisImpl(
ServiceProvider.GetRequiredService<IOptionsMonitor<JT808Configuration>>());
}

[Fact]
public void Test1()
{
int i = 10000;
Task.Run(() => {
while (i > 0)
{
jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOnline, null, Guid.NewGuid().ToString("N"));
jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOffline, null, Guid.NewGuid().ToString("N"));
i--;
Thread.Sleep(1000);
}
});
Thread.Sleep(1000);
List<string> SessionOnlines = new List<string>();
ChannelMessageQueue channelMessageQueue= jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOnline);
channelMessageQueue.OnMessage((msg) => {
SessionOnlines.Add(msg.Message);
});
List<string> SessionOfflines = new List<string>();
ChannelMessageQueue channelMessageQueue1 = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOffline);
channelMessageQueue1.OnMessage((msg) => {
SessionOfflines.Add(msg.Message);
});
Thread.Sleep(3000);
}

[Fact]
public void Test2()
{
int i = 100000;
Task.Run(() => {
while (i > 0)
{
jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOnline, null, Guid.NewGuid().ToString("N"));
jT808SessionPublishingRedisImpl.PublishAsync(JT808Constants.SessionOffline, null, Guid.NewGuid().ToString("N"));
i--;
Thread.Sleep(1000);
}
});
Thread.Sleep(1000);
List<string> SessionOnlines = new List<string>();
List<string> SessionOfflines = new List<string>();
ChannelMessageQueue channelMessageQueue = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOnline);
channelMessageQueue.OnMessage((msg) => {
SessionOnlines.Add(msg.Message);
});
ChannelMessageQueue channelMessageQueue1 = jT808SessionPublishingRedisImpl.Subscriber.Subscribe(JT808Constants.SessionOffline);
channelMessageQueue1.OnMessage((msg) => {
SessionOfflines.Add(msg.Message);
});
}
}
}

+ 4
- 1
src/JT808.DotNetty.Test/SeedSession.cs Dosyayı Görüntüle

@@ -1,4 +1,5 @@
using DotNetty.Transport.Channels.Embedded;
using JT808.DotNetty.Internal;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
@@ -9,7 +10,9 @@ namespace JT808.DotNetty.Test
{
public class SeedSession
{
public JT808SessionManager jT808SessionManager = new JT808SessionManager(new LoggerFactory());
public JT808SessionManager jT808SessionManager = new JT808SessionManager(
new JT808SessionPublishingEmptyImpl(),
new LoggerFactory());

public SeedSession()
{


+ 2
- 1
src/JT808.DotNetty.Test/appsettings.json Dosyayı Görüntüle

@@ -33,6 +33,7 @@
"Host": "127.0.0.1",
"Port": 6562
}
]
],
"RedisHost": "127.0.0.1:6379"
}
}

+ 45
- 10
src/JT808.DotNetty/Internal/JT808SessionPublishingRedisImpl.cs Dosyayı Görüntüle

@@ -1,40 +1,75 @@
using JT808.DotNetty.Interfaces;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;
using StackExchange.Redis;
using Microsoft.Extensions.Options;
using JT808.DotNetty.Configurations;
using Microsoft.Extensions.Logging;

namespace JT808.DotNetty.Internal
{
internal class JT808SessionPublishingRedisImpl : IJT808SessionPublishing
internal class JT808SessionPublishingRedisImpl : IJT808SessionPublishing,IDisposable
{
private IConnectionMultiplexer connectionMultiplexer;

private IOptionsMonitor<JT808Configuration> optionsMonitor;

private ILogger<JT808SessionPublishingRedisImpl> logger;
private string redisHost;

private JT808SessionPublishingRedisImpl(
ILoggerFactory loggerFactory,
private IDisposable optionsMonitorDisposable;

public JT808SessionPublishingRedisImpl(
IOptionsMonitor<JT808Configuration> optionsMonitor
)
{
this.optionsMonitor = optionsMonitor;
logger = loggerFactory.CreateLogger<JT808SessionPublishingRedisImpl>();
connectionMultiplexer = ConnectionMultiplexer.Connect(optionsMonitor.CurrentValue.RedisHost);
redisHost = optionsMonitor.CurrentValue.RedisHost;
try
{
connectionMultiplexer = ConnectionMultiplexer.Connect(redisHost);
}
catch
{

}
optionsMonitorDisposable= this.optionsMonitor.OnChange((config,str) =>
{
if(config.RedisHost!= redisHost)
{
redisHost = config.RedisHost;
connectionMultiplexer.Close();
try
{
connectionMultiplexer = ConnectionMultiplexer.Connect(redisHost);
}
catch
{
}
}
});
}

public Task PublishAsync(string topicName, string key, string value)
{
if (connectionMultiplexer.IsConnected)
{

Subscriber?.PublishAsync(topicName, value);
}
return Task.CompletedTask;
}

internal ISubscriber Subscriber
{
get
{
return connectionMultiplexer.GetSubscriber();
}
}

public void Dispose()
{
connectionMultiplexer.Close();
optionsMonitorDisposable.Dispose();
}
}
}

+ 13
- 0
src/JT808.DotNetty/JT808Constants.cs Dosyayı Görüntüle

@@ -0,0 +1,13 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty
{
public static class JT808Constants
{
public const string SessionOnline= "JT808SessionOnline";

public const string SessionOffline = "JT808SessionOffline";
}
}

+ 1
- 0
src/JT808.DotNetty/JT808DotnettyExtensions.cs Dosyayı Görüntüle

@@ -35,6 +35,7 @@ namespace JT808.DotNetty
return builder.ConfigureServices((hostContext, services) =>
{
services.Configure<JT808Configuration>(hostContext.Configuration.GetSection("JT808Configuration"));
services.TryAddSingleton<IJT808SessionPublishing, JT808SessionPublishingEmptyImpl>();
services.TryAddSingleton<JT808SessionManager>();
services.TryAddSingleton<JT808AtomicCounterService>();
services.TryAddSingleton<JT808TransmitAddressFilterService>();


+ 13
- 3
src/JT808.DotNetty/JT808SessionManager.cs Dosyayı Görüntüle

@@ -5,6 +5,7 @@ using System.Collections.Generic;
using System.Linq;
using JT808.DotNetty.Metadata;
using DotNetty.Transport.Channels;
using JT808.DotNetty.Interfaces;

namespace JT808.DotNetty
{
@@ -15,9 +16,13 @@ namespace JT808.DotNetty
{
private readonly ILogger<JT808SessionManager> logger;

private readonly IJT808SessionPublishing jT808SessionPublishing;

public JT808SessionManager(
IJT808SessionPublishing jT808SessionPublishing,
ILoggerFactory loggerFactory)
{
this.jT808SessionPublishing = jT808SessionPublishing;
logger = loggerFactory.CreateLogger<JT808SessionManager>();
}

@@ -69,7 +74,7 @@ namespace JT808.DotNetty
//部标的超长待机设备,不会像正常的设备一样一直连着,可能10几分钟连上了,然后发完就关闭连接,
//这时候想下发数据需要知道设备什么时候上线,在这边做通知最好不过了。
//todo: 有设备关联上来可以进行通知 例如:使用Redis发布订阅
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOnline,null, appSession.TerminalPhoneNo);
}
}

@@ -93,7 +98,9 @@ namespace JT808.DotNetty
{
SessionIdDict.TryRemove(key, out JT808Session jT808SessionRemove);
}
logger.LogInformation($">>>{terminalPhoneNo}-{string.Join(",", terminalPhoneNos)} 1-n Session Remove.");
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{terminalPhoneNo}-{nos} 1-n Session Remove.");
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos);
return jT808Session;
}
else
@@ -101,6 +108,7 @@ namespace JT808.DotNetty
if (SessionIdDict.TryRemove(terminalPhoneNo, out JT808Session jT808SessionRemove))
{
logger.LogInformation($">>>{terminalPhoneNo} Session Remove.");
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, terminalPhoneNo);
return jT808SessionRemove;
}
else
@@ -119,7 +127,9 @@ namespace JT808.DotNetty
{
SessionIdDict.TryRemove(key, out JT808Session jT808SessionRemove);
}
logger.LogInformation($">>>{string.Join(",", terminalPhoneNos)} Channel Remove.");
string nos = string.Join(",", terminalPhoneNos);
logger.LogInformation($">>>{nos} Channel Remove.");
jT808SessionPublishing.PublishAsync(JT808Constants.SessionOffline, null, nos);
}

public IEnumerable<JT808Session> GetAll()


Yükleniyor…
İptal
Kaydet