瀏覽代碼

下级平台主从链路开发

tags/old
SmallChi 6 年之前
父節點
當前提交
3eac81870a
共有 15 個檔案被更改,包括 928 行新增0 行删除
  1. +39
    -0
      src/JT809Netty.Core/AtomicCounter.cs
  2. +14
    -0
      src/JT809Netty.Core/Configs/JT809NettyOptions.cs
  3. +77
    -0
      src/JT809Netty.Core/Handlers/JT808ServiceHandler.cs
  4. +64
    -0
      src/JT809Netty.Core/Handlers/JT809DecodeHandler.cs
  5. +89
    -0
      src/JT809Netty.Core/Handlers/JT809DownMasterLinkConnectionHandler.cs
  6. +91
    -0
      src/JT809Netty.Core/Handlers/JT809DownSlaveLinkConnectionHandler.cs
  7. +19
    -0
      src/JT809Netty.Core/IAppSession.cs
  8. +101
    -0
      src/JT809Netty.Core/JT809DownMasterLinkNettyService.cs
  9. +104
    -0
      src/JT809Netty.Core/JT809DownSlaveLinkNettyService.cs
  10. +22
    -0
      src/JT809Netty.Core/JT809Netty.Core.csproj
  11. +39
    -0
      src/JT809Netty.Core/JT809Session.cs
  12. +220
    -0
      src/JT809Netty.Core/SessionManager.cs
  13. +25
    -0
      src/JT809Netty.sln
  14. +12
    -0
      src/JT809NettyServer/JT809NettyServer.csproj
  15. +12
    -0
      src/JT809NettyServer/Program.cs

+ 39
- 0
src/JT809Netty.Core/AtomicCounter.cs 查看文件

@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT809Netty.Core
{
/// <summary>
///
/// ref:Grpc.Core.Internal
/// </summary>
public class AtomicCounter
{
long counter = 0;

public AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 14
- 0
src/JT809Netty.Core/Configs/JT809NettyOptions.cs 查看文件

@@ -0,0 +1,14 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809Netty.Core.Configs
{
public class JT809NettyOptions
{
public string Host { get; set; }
public int Port { get; set; }
public List<string> IpWhiteList { get; set; } = new List<string>();
public bool IpWhiteListDisabled { get; set; }
}
}

+ 77
- 0
src/JT809Netty.Core/Handlers/JT808ServiceHandler.cs 查看文件

@@ -0,0 +1,77 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using JT808.Protocol;
using JT808.Protocol.Extensions;
using DotNetty.Common.Utilities;
using Microsoft.Extensions.Logging;
using Newtonsoft.Json;
using JT808.Protocol.Exceptions;
using System.Threading;

namespace GPS.JT808NettyServer.Handlers
{
public class JT808ServiceHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT808ServiceHandler> logger;

private readonly JT808MsgIdHandler jT808MsgIdHandler;

public JT808ServiceHandler(
JT808MsgIdHandler jT808MsgIdHandler,
ILoggerFactory loggerFactory)
{
this.jT808MsgIdHandler = jT808MsgIdHandler;
logger = loggerFactory.CreateLogger<JT808ServiceHandler>();
}

public override void ChannelRead(IChannelHandlerContext context, object message)
{
var jT808RequestInfo = (JT808RequestInfo)message;
string receive = string.Empty;
try
{
if (logger.IsEnabled(LogLevel.Debug))
{
receive = jT808RequestInfo.OriginalBuffer.ToHexString();
}
Func<JT808RequestInfo, IChannelHandlerContext,IJT808Package> handlerFunc;
if (jT808RequestInfo.JT808Package != null)
{
if (jT808MsgIdHandler.HandlerDict.TryGetValue(jT808RequestInfo.JT808Package.Header.MsgId, out handlerFunc))
{
IJT808Package jT808PackageImpl = handlerFunc(jT808RequestInfo, context);
if (jT808PackageImpl != null)
{
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug("send>>>" + jT808PackageImpl.JT808Package.Header.MsgId.ToString() + "-" + JT808Serializer.Serialize(jT808PackageImpl.JT808Package).ToHexString());
//logger.LogDebug("send>>>" + jT808PackageImpl.JT808Package.Header.MsgId.ToString() + "-" + JsonConvert.SerializeObject(jT808PackageImpl.JT808Package));
}
// 需要注意:
// 1.下发应答必须要在类中重写 ChannelReadComplete 不然客户端接收不到消息
// context.WriteAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808PackageImpl.JT808Package)));
// 2.直接发送
context.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808PackageImpl.JT808Package)));
}
}
}
}
catch (JT808Exception ex)
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError(ex, "JT808Exception receive<<<" + receive);
}
catch (Exception ex)
{
if (logger.IsEnabled(LogLevel.Error))
logger.LogError(ex, "Exception receive<<<" + receive);
}
}

public override void ChannelReadComplete(IChannelHandlerContext context) => context.Flush();
}
}

+ 64
- 0
src/JT809Netty.Core/Handlers/JT809DecodeHandler.cs 查看文件

@@ -0,0 +1,64 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using JT809.Protocol;
using JT809.Protocol.JT809Exceptions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT809Netty.Core.Handlers
{
/// <summary>
/// JT809解码
/// </summary>
public class JT809DecodeHandler : ByteToMessageDecoder
{
private readonly ILogger<JT809DecodeHandler> logger;

public JT809DecodeHandler(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT809DecodeHandler>();
}

private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter();

private static readonly AtomicCounter MsgFailCounter = new AtomicCounter();

protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
string msg = string.Empty;
byte[] buffer = null;
try
{
buffer = new byte[input.Capacity + 2];
input.ReadBytes(buffer,1, input.Capacity);
buffer[0] = JT809Package.BEGINFLAG;
buffer[input.Capacity + 1] = JT809Package.ENDFLAG;
output.Add(JT809Serializer.Deserialize(buffer));
MsgSuccessCounter.Increment();
if (logger.IsEnabled(LogLevel.Debug))
{
msg = ByteBufferUtil.HexDump(buffer);
logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString());
}
}
catch (JT809Exception ex)
{
MsgFailCounter.Increment();
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString());
logger.LogError(ex, $"{ex.ErrorCode.ToString()}accept msg<<<{msg}");
return;
}
catch (Exception ex)
{
MsgFailCounter.Increment();
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString());
logger.LogError(ex, "accept msg<<<" + msg);
return;
}
}
}
}

+ 89
- 0
src/JT809Netty.Core/Handlers/JT809DownMasterLinkConnectionHandler.cs 查看文件

@@ -0,0 +1,89 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT809Netty.Core.Configs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;

namespace JT809Netty.Core.Handlers
{
/// <summary>
/// 下级平台主链路
/// </summary>
public class JT809DownMasterLinkConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT809DownMasterLinkConnectionHandler> logger;

private IOptionsMonitor<JT809NettyOptions> optionsMonitor;

public JT809DownMasterLinkConnectionHandler(
IOptionsMonitor<JT809NettyOptions> optionsMonitor,
SessionManager sessionManager,
ILoggerFactory loggerFactory)
{
this.optionsMonitor = optionsMonitor;
logger = loggerFactory.CreateLogger<JT809DownMasterLinkConnectionHandler>();
}

public override void ChannelActive(IChannelHandlerContext context)
{
base.ChannelActive(context);
}

/// <summary>
/// 主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(">>>The client disconnects from the server.");
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug("<<<The server disconnects from the client.");
return base.CloseAsync(context);
}

/// <summary>
/// 主链路超时策略
/// 下级平台登录成功后,在与上级平台之间如果有应用业务数据包往来的情况下,不需要发送主链路保持数据包;
/// 否则,下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
switch (idleStateEvent.State)
{
//case IdleState.ReaderIdle:

// break;
case IdleState.WriterIdle:
#warning 发送心跳保持
break;
//case IdleState.AllIdle:

// break;
default:

break;
}
}
base.UserEventTriggered(context, evt);
}
}
}

+ 91
- 0
src/JT809Netty.Core/Handlers/JT809DownSlaveLinkConnectionHandler.cs 查看文件

@@ -0,0 +1,91 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using JT809Netty.Core.Configs;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System.Threading.Tasks;

namespace JT809Netty.Core.Handlers
{
/// <summary>
/// 下级平台从链路
/// </summary>
public class JT809DownSlaveLinkConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT809DownSlaveLinkConnectionHandler> logger;

private readonly SessionManager sessionManager;

private IOptionsMonitor<JT809NettyOptions> optionsMonitor;

public JT809DownSlaveLinkConnectionHandler(
IOptionsMonitor<JT809NettyOptions> optionsMonitor,
SessionManager sessionManager,
ILoggerFactory loggerFactory)
{
this.optionsMonitor = optionsMonitor;
this.sessionManager = sessionManager;
logger = loggerFactory.CreateLogger<JT809DownSlaveLinkConnectionHandler>();
}

public override void ChannelActive(IChannelHandlerContext context)
{
base.ChannelActive(context);
}

/// <summary>
/// 主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug(">>>The client disconnects from the server.");
sessionManager.RemoveSessionByID(context.Channel.Id.AsShortText());
base.ChannelInactive(context);
}
/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug("<<<The server disconnects from the client.");
return base.CloseAsync(context);
}

/// <summary>
/// 从链路超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
switch (idleStateEvent.State)
{
case IdleState.ReaderIdle:
//下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。
context.CloseAsync();
break;
//case IdleState.WriterIdle:

// break;
//case IdleState.AllIdle:

// break;
default:

break;
}
}
base.UserEventTriggered(context, evt);
}
}
}

+ 19
- 0
src/JT809Netty.Core/IAppSession.cs 查看文件

@@ -0,0 +1,19 @@
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;

namespace JT809Netty.Core
{
public interface IAppSession
{
string SessionID { get; }

IChannel Channel { get; }

DateTime LastActiveTime { get; set; }

DateTime StartTime { get; }
}
}

+ 101
- 0
src/JT809Netty.Core/JT809DownMasterLinkNettyService.cs 查看文件

@@ -0,0 +1,101 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Channels.Sockets;
using DotNetty.Transport.Libuv;
using JT809Netty.Core.Configs;
using JT809Netty.Core.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT809Netty.Core
{
/// <summary>
/// 下级平台主链路
/// </summary
public class JT809DownMasterLinkNettyService : IHostedService
{
IEventLoopGroup workerGroup;

Bootstrap bootstrap;

readonly IServiceProvider serviceProvider;

readonly IOptionsMonitor<JT809NettyOptions> nettyOptions;

public JT809DownMasterLinkNettyService(
IOptionsMonitor<JT809NettyOptions> nettyOptionsAccessor,
IServiceProvider serviceProvider)
{
nettyOptions = nettyOptionsAccessor;
this.serviceProvider = serviceProvider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
nettyOptions.OnChange(options =>
{
try
{
bootstrap.ConnectAsync(options.Host, options.Port);
}
catch (Exception ex)
{

}
});
try
{
workerGroup = new MultithreadEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.Group(workerGroup)
.Channel<TcpServerChannel>()
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
InitChannel(channel);
}))
.Option(ChannelOption.SoBacklog, 1048576);
bootstrap.ConnectAsync(nettyOptions.CurrentValue.Host, nettyOptions.CurrentValue.Port);
}
catch (Exception ex)
{

}
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
try
{
Task.WhenAll(workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)));
}
catch (Exception ex)
{

}
return Task.CompletedTask;
}

private void InitChannel(IChannel channel)
{
var scope = serviceProvider.CreateScope();
//下级平台应每 1min 发送一个主链路保持清求数据包到上级平台以保持链路连接
channel.Pipeline.AddLast("systemIdleState", new WriteTimeoutHandler(60));
channel.Pipeline.AddLast("jt809DownMasterLinkConnection", scope.ServiceProvider.GetRequiredService<JT809DownMasterLinkConnectionHandler>());
channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG })));
channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService<JT809DecodeHandler>());
//channel.Pipeline.AddLast("jt809Service", scope.ServiceProvider.GetRequiredService<JT808ServiceHandler>());
scope.Dispose();
}
}
}

+ 104
- 0
src/JT809Netty.Core/JT809DownSlaveLinkNettyService.cs 查看文件

@@ -0,0 +1,104 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT809Netty.Core.Configs;
using JT809Netty.Core.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT809Netty.Core
{
/// <summary>
/// 下级平台从链路
/// </summary>
public class JT809DownSlaveLinkNettyService : IHostedService
{
IEventLoopGroup bossGroup;

IEventLoopGroup workerGroup;

IChannel boundChannel;

readonly IServiceProvider serviceProvider;

readonly JT809NettyOptions nettyOptions;

public JT809DownSlaveLinkNettyService(
IOptions<JT809NettyOptions> nettyOptionsAccessor,
IServiceProvider serviceProvider)
{
nettyOptions = nettyOptionsAccessor.Value;
this.serviceProvider = serviceProvider;
}

public Task StartAsync(CancellationToken cancellationToken)
{
try
{
var dispatcher = new DispatcherEventLoopGroup();
bossGroup = dispatcher;
workerGroup = new WorkerEventLoopGroup(dispatcher);
var bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
bootstrap
//.Handler(new LoggingHandler("SRV-LSTN"))
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
InitChannel(channel);
}))
.Option(ChannelOption.SoBacklog, 1048576);
if (nettyOptions.Host == "")
{
boundChannel = bootstrap.BindAsync(nettyOptions.Port).Result;
}
else
{
boundChannel = bootstrap.BindAsync(nettyOptions.Host, nettyOptions.Port).Result;
}
}
catch (Exception ex)
{

}
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
try
{
Task.WhenAll(
bossGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
workerGroup.ShutdownGracefullyAsync(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)),
boundChannel.CloseAsync());
}
catch (Exception ex)
{

}
return Task.CompletedTask;
}

private void InitChannel(IChannel channel)
{
var scope = serviceProvider.CreateScope();
//下级平台连续 3min 未收到上级平台发送的从链路保持应答数据包,则认为上级平台的连接中断,将主动断开数据传输从链路。
channel.Pipeline.AddLast("systemIdleState", new ReadTimeoutHandler(180));
channel.Pipeline.AddLast("jt809DownSlaveLinkConnection", scope.ServiceProvider.GetRequiredService<JT809DownSlaveLinkConnectionHandler>());
channel.Pipeline.AddLast("jt809Buffer", new DelimiterBasedFrameDecoder(int.MaxValue, Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.BEGINFLAG }), Unpooled.CopiedBuffer(new byte[] { JT809.Protocol.JT809Package.ENDFLAG })));
channel.Pipeline.AddLast("jt809Decode", scope.ServiceProvider.GetRequiredService<JT809DecodeHandler>());
//channel.Pipeline.AddLast("jt809Service", scope.ServiceProvider.GetRequiredService<JT808ServiceHandler>());
scope.Dispose();
}
}
}

+ 22
- 0
src/JT809Netty.Core/JT809Netty.Core.csproj 查看文件

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<Compile Remove="Handlers\JT808ServiceHandler.cs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="DotNetty.Handlers" Version="0.5.0" />
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.5.0" />
<PackageReference Include="JT809" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.1" />
</ItemGroup>

</Project>

+ 39
- 0
src/JT809Netty.Core/JT809Session.cs 查看文件

@@ -0,0 +1,39 @@
using DotNetty.Transport.Channels;
using System;
using JT809.Protocol.JT809Enums;

namespace JT809Netty.Core
{
public class JT809Session: IAppSession
{
public JT809Session(IChannel channel, string vehicleNo,JT809VehicleColorType vehicleColor)
{
Channel = channel;
VehicleNo = vehicleNo;
VehicleColor = vehicleColor;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
SessionID = Channel.Id.AsShortText();
Key = $"{VehicleNo}_{VehicleColor.ToString()}";
}

/// <summary>
/// 车牌号
/// </summary>
public string VehicleNo { get; set; }
/// <summary>
/// 车牌颜色
/// </summary>
public JT809VehicleColorType VehicleColor { get; set; }

public string Key { get; set; }

public string SessionID { get; }

public IChannel Channel { get;}

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; }
}
}

+ 220
- 0
src/JT809Netty.Core/SessionManager.cs 查看文件

@@ -0,0 +1,220 @@
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Logging.Abstractions;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace JT809Netty.Core
{
public class SessionManager:IDisposable
{
private readonly ILogger<SessionManager> logger;

private readonly CancellationTokenSource cancellationTokenSource;

#if DEBUG
private const int timeout = 1 * 1000 * 60;
#else
private const int timeout = 5 * 1000 * 60;
#endif
public SessionManager(ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<SessionManager>();
cancellationTokenSource = new CancellationTokenSource();
Task.Run(() =>
{
while (!cancellationTokenSource.IsCancellationRequested)
{
logger.LogInformation($"Online Count>>>{SessionCount}");
if (SessionCount > 0)
{
logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}");
logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", CustomKey_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}");
}
Thread.Sleep(timeout);
}
}, cancellationTokenSource.Token);
}

/// <summary>
/// Netty生成的sessionID和Session的对应关系
/// key = seession id
/// value = Session
/// </summary>
private ConcurrentDictionary<string, JT809Session> SessionIdDict = new ConcurrentDictionary<string, JT809Session>(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// 自定义Key和netty生成的sessionID的对应关系
/// key = 终端手机号
/// value = seession id
/// </summary>
private ConcurrentDictionary<string, string> CustomKey_SessionId_Dict = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public void RegisterSession(JT809Session appSession)
{
if (CustomKey_SessionId_Dict.ContainsKey(appSession.Key))
{
return;
}
if (SessionIdDict.TryAdd(appSession.SessionID, appSession) &&
CustomKey_SessionId_Dict.TryAdd(appSession.Key, appSession.SessionID))
{
return;
}
}

public JT809Session GetSessionByID(string sessionID)
{
if (string.IsNullOrEmpty(sessionID))
return default;
JT809Session targetSession;
SessionIdDict.TryGetValue(sessionID, out targetSession);
return targetSession;
}

public JT809Session GetSessionByTerminalPhoneNo(string key)
{
try
{
if (string.IsNullOrEmpty(key))
return default;
if (CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId))
{
if (SessionIdDict.TryGetValue(sessionId, out JT809Session targetSession))
{
return targetSession;
}
else
{
return default;
}
}
else
{
return default;
}
}
catch (Exception ex)
{
logger.LogError(ex, key);
return default;
}
}

public void Heartbeat(string key)
{
try
{
if(CustomKey_SessionId_Dict.TryGetValue(key, out string sessionId))
{
if (SessionIdDict.TryGetValue(sessionId, out JT809Session oldjT808Session))
{
if (oldjT808Session.Channel.Active)
{
oldjT808Session.LastActiveTime = DateTime.Now;
if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session))
{

}
}
}
}
}
catch (Exception ex)
{
logger.LogError(ex, key);
}
}

/// <summary>
/// 通过通道Id和自定义key进行关联
/// </summary>
/// <param name="sessionID"></param>
/// <param name="key"></param>
public void UpdateSessionByID(string sessionID, string key)
{
try
{
if (SessionIdDict.TryGetValue(sessionID, out JT809Session oldjT808Session))
{
oldjT808Session.Key = key;
if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session))
{
CustomKey_SessionId_Dict.AddOrUpdate(key, sessionID, (tpn, sid) =>
{
return sessionID;
});
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $"{sessionID},{key}");
}
}

public void RemoveSessionByID(string sessionID)
{
if (sessionID == null) return;
try
{
if (SessionIdDict.TryRemove(sessionID, out JT809Session session))
{
if (session.Key != null)
{
if(CustomKey_SessionId_Dict.TryRemove(session.Key, out string sessionid))
{
logger.LogInformation($">>>{sessionID}-{session.Key} Session Remove.");
}
}
else
{
logger.LogInformation($">>>{sessionID} Session Remove.");
}
session.Channel.CloseAsync();
}
}
catch (Exception ex)
{
logger.LogError(ex, $">>>{sessionID} Session Remove Exception");
}
}

public void RemoveSessionByKey(string key)
{
if (key == null) return;
try
{
if (CustomKey_SessionId_Dict.TryRemove(key, out string sessionid))
{
if (SessionIdDict.TryRemove(sessionid, out JT809Session session))
{
logger.LogInformation($">>>{key}-{sessionid} Key Remove.");
}
else
{
logger.LogInformation($">>>{key} Key Remove.");
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $">>>{key} Key Remove Exception.");
}
}

public void Dispose()
{
cancellationTokenSource.Cancel();
}
}
}

+ 25
- 0
src/JT809Netty.sln 查看文件

@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28010.2016
MinimumVisualStudioVersion = 10.0.40219.1
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809Netty.Core", "JT809Netty.Core\JT809Netty.Core.csproj", "{2054D7E6-53B6-412F-BE9D-C6DABD80A111}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Debug|Any CPU.Build.0 = Debug|Any CPU
{2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Release|Any CPU.ActiveCfg = Release|Any CPU
{2054D7E6-53B6-412F-BE9D-C6DABD80A111}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D}
EndGlobalSection
EndGlobal

+ 12
- 0
src/JT809NettyServer/JT809NettyServer.csproj 查看文件

@@ -0,0 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp2.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JT809" Version="1.0.0" />
</ItemGroup>

</Project>

+ 12
- 0
src/JT809NettyServer/Program.cs 查看文件

@@ -0,0 +1,12 @@
using System;

namespace JT809NettyServer
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello World!");
}
}
}

Loading…
取消
儲存