Преглед изворни кода

初始化JT808服务端

tags/v1.0.0
SmallChi пре 6 година
родитељ
комит
057840d33c
15 измењених фајлова са 893 додато и 0 уклоњено
  1. +15
    -0
      src/JT808.Abstractions/ISourcePackageDispatcher.cs
  2. +7
    -0
      src/JT808.Abstractions/JT808.Abstractions.csproj
  3. +25
    -0
      src/JT808.DotNetty.sln
  4. +68
    -0
      src/JT808.DotNetty/Codecs/JT808Decoder.cs
  5. +34
    -0
      src/JT808.DotNetty/Configurations/JT808Configuration.cs
  6. +27
    -0
      src/JT808.DotNetty/DotnettyExtensions.cs
  7. +96
    -0
      src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs
  8. +39
    -0
      src/JT808.DotNetty/Handlers/JT808ServerHandler.cs
  9. +39
    -0
      src/JT808.DotNetty/Internal/AtomicCounter.cs
  10. +16
    -0
      src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs
  11. +19
    -0
      src/JT808.DotNetty/JT808.DotNetty.csproj
  12. +162
    -0
      src/JT808.DotNetty/JT808MsgIdHandlerBase.cs
  13. +90
    -0
      src/JT808.DotNetty/JT808ServerHost.cs
  14. +32
    -0
      src/JT808.DotNetty/JT808Session.cs
  15. +224
    -0
      src/JT808.DotNetty/SessionManager.cs

+ 15
- 0
src/JT808.Abstractions/ISourcePackageDispatcher.cs Прегледај датотеку

@@ -0,0 +1,15 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Abstractions
{
/// <summary>
/// 源包分发器
/// </summary>
public interface ISourcePackageDispatcher
{
Task SendAsync(byte[] data);
}
}

+ 7
- 0
src/JT808.Abstractions/JT808.Abstractions.csproj Прегледај датотеку

@@ -0,0 +1,7 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
</PropertyGroup>

</Project>

+ 25
- 0
src/JT808.DotNetty.sln Прегледај датотеку

@@ -0,0 +1,25 @@

Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio 15
VisualStudioVersion = 15.0.28010.2016
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty", "JT808.DotNetty\JT808.DotNetty.csproj", "{80C7F67E-6B7C-4178-8726-ADD3695622DD}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{80C7F67E-6B7C-4178-8726-ADD3695622DD}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B}
EndGlobalSection
EndGlobal

+ 68
- 0
src/JT808.DotNetty/Codecs/JT808Decoder.cs Прегледај датотеку

@@ -0,0 +1,68 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using JT808.Protocol;
using JT808.DotNetty.Internal;

namespace JT808.DotNetty.Codecs
{
/// <summary>
/// JT808解码
/// </summary>
internal class JT808Decoder : ByteToMessageDecoder
{
private readonly ILogger<JT808Decoder> logger;

public JT808Decoder(ILogger<JT808Decoder> logger)
{
this.logger = logger;
}

public JT808Decoder(ILoggerFactory loggerFactory)
{
this.logger = loggerFactory.CreateLogger<JT808Decoder>();
}

private static readonly AtomicCounter MsgSuccessCounter = new AtomicCounter();

private static readonly AtomicCounter MsgFailCounter = new AtomicCounter();

protected override void Decode(IChannelHandlerContext context, IByteBuffer input, List<object> output)
{
byte[] buffer = new byte[input.Capacity + 2];
try
{
input.ReadBytes(buffer, 1, input.Capacity);
buffer[0] = JT808Package.BeginFlag;
buffer[input.Capacity + 1] = JT808Package.EndFlag;
JT808Package jT808Package = JT808Serializer.Deserialize<JT808Package>(buffer);
output.Add(jT808Package);
MsgSuccessCounter.Increment();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug("accept package success count<<<" + MsgSuccessCounter.Count.ToString());
}
catch (JT808.Protocol.Exceptions.JT808Exception ex)
{
MsgFailCounter.Increment();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString());
logger.LogError(ex, "accept msg<<<" + buffer);
}
}
catch (Exception ex)
{
MsgFailCounter.Increment();
if (logger.IsEnabled(LogLevel.Error))
{
logger.LogError("accept package fail count<<<" + MsgFailCounter.Count.ToString());
logger.LogError(ex, "accept msg<<<" + buffer);
}
}
}
}
}

+ 34
- 0
src/JT808.DotNetty/Configurations/JT808Configuration.cs Прегледај датотеку

@@ -0,0 +1,34 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Configurations
{
public class JT808Configuration
{
public int Port { get; set; } = 808;

public int QuietPeriodSeconds { get; set; } = 1;

public TimeSpan QuietPeriodTimeSpan => TimeSpan.FromSeconds(QuietPeriodSeconds);

public int ShutdownTimeoutSeconds { get; set; } = 3;

public TimeSpan ShutdownTimeoutTimeSpan => TimeSpan.FromSeconds(ShutdownTimeoutSeconds);

public int SoBacklog { get; set; } = 8192;

public int EventLoopCount { get; set; } = Environment.ProcessorCount;

public int ReaderIdleTimeSeconds { get; set; } = 3600;

public int WriterIdleTimeSeconds { get; set; } = 3600;

public int AllIdleTimeSeconds { get; set; } = 3600;
/// <summary>
/// 会话报时
/// 默认5分钟
/// </summary>
public int SessionReportTime { get; set; } = 30000;
}
}

+ 27
- 0
src/JT808.DotNetty/DotnettyExtensions.cs Прегледај датотеку

@@ -0,0 +1,27 @@
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Handlers;
using JT808.DotNetty.Internal;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using Microsoft.Extensions.Hosting;
using System;
using System.Reflection;

namespace JT808.DotNetty
{
public static class DotnettyExtensions
{
public static IHostBuilder UseJT808Host(this IHostBuilder builder)
{
return builder.ConfigureServices((hostContext, services) =>
{
services.Configure<JT808Configuration>(hostContext.Configuration.GetSection("JT808Configuration"));
services.TryAddSingleton<SessionManager>();
services.TryAddSingleton<JT808MsgIdDefaultHandler>();
services.TryAddScoped<JT808ConnectionHandler>();
services.TryAddScoped<JT808ServerHandler>();
services.AddHostedService<JT808ServerHost>();
});
}
}
}

+ 96
- 0
src/JT808.DotNetty/Handlers/JT808ConnectionHandler.cs Прегледај датотеку

@@ -0,0 +1,96 @@
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Channels;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Net;
using System.Text;
using System.Threading.Tasks;

namespace JT808.DotNetty.Handlers
{
internal class JT808ConnectionHandler : ChannelHandlerAdapter
{
private readonly ILogger<JT808ConnectionHandler> logger;

public JT808ConnectionHandler(
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT808ConnectionHandler>();
}

public JT808ConnectionHandler(
ILogger<JT808ConnectionHandler> logger)
{
this.logger = logger;
}

/// <summary>
/// 通道激活
/// </summary>
/// <param name="context"></param>
public override void ChannelActive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } Successful client connection to server.");
base.ChannelActive(context);
}

/// <summary>
/// 设备主动断开
/// </summary>
/// <param name="context"></param>
public override void ChannelInactive(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
base.ChannelInactive(context);
}

/// <summary>
/// 服务器主动断开
/// </summary>
/// <param name="context"></param>
/// <returns></returns>
public override Task CloseAsync(IChannelHandlerContext context)
{
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($"<<<{ channelId } The server disconnects from the client.");
return base.CloseAsync(context);
}

public override void ChannelReadComplete(IChannelHandlerContext context)
{
context.Flush();
}

/// <summary>
/// 超时策略
/// </summary>
/// <param name="context"></param>
/// <param name="evt"></param>
public override void UserEventTriggered(IChannelHandlerContext context, object evt)
{
IdleStateEvent idleStateEvent = evt as IdleStateEvent;
if (idleStateEvent != null)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogInformation($"{idleStateEvent.State.ToString()}>>>{channelId}");
// 由于808是设备发心跳,如果很久没有上报数据,那么就由服务器主动关闭连接。
context.CloseAsync();
}
base.UserEventTriggered(context, evt);
}

public override void ExceptionCaught(IChannelHandlerContext context, Exception exception)
{
string channelId = context.Channel.Id.AsShortText();
logger.LogError(exception,$"{channelId} {exception.Message}" );
context.CloseAsync();
}
}
}


+ 39
- 0
src/JT808.DotNetty/Handlers/JT808ServerHandler.cs Прегледај датотеку

@@ -0,0 +1,39 @@
using DotNetty.Buffers;
using DotNetty.Transport.Channels;
using JT808.Protocol;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Handlers
{
internal class JT808ServerHandler : SimpleChannelInboundHandler<JT808.Protocol.JT808Package>
{
private readonly JT808MsgIdHandlerBase handler;

public JT808ServerHandler(JT808MsgIdHandlerBase handler)
{
this.handler = handler;
}

protected override void ChannelRead0(IChannelHandlerContext ctx, JT808Package msg)
{
try
{
Func<JT808Package, IChannelHandlerContext, JT808Package> handlerFunc;
if (handler.HandlerDict.TryGetValue(msg.Header.MsgId, out handlerFunc))
{
JT808Package jT808Package = handlerFunc(msg, ctx);
if (jT808Package != null)
{
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(JT808Serializer.Serialize(jT808Package)));
}
}
}
catch
{
}
}
}
}

+ 39
- 0
src/JT808.DotNetty/Internal/AtomicCounter.cs Прегледај датотеку

@@ -0,0 +1,39 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace JT808.DotNetty.Internal
{
/// <summary>
///
/// <see cref="Grpc.Core.Internal"/>
/// </summary>
internal class AtomicCounter
{
long counter = 0;

public AtomicCounter(long initialCount = 0)
{
this.counter = initialCount;
}

public long Increment()
{
return Interlocked.Increment(ref counter);
}

public long Decrement()
{
return Interlocked.Decrement(ref counter);
}

public long Count
{
get
{
return Interlocked.Read(ref counter);
}
}
}
}

+ 16
- 0
src/JT808.DotNetty/Internal/JT808MsgIdDefaultHandler.cs Прегледај датотеку

@@ -0,0 +1,16 @@
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty.Internal
{
/// <summary>
/// 默认消息处理业务实现
/// </summary>
internal class JT808MsgIdDefaultHandler : JT808MsgIdHandlerBase
{
public JT808MsgIdDefaultHandler(SessionManager sessionManager) : base(sessionManager)
{
}
}
}

+ 19
- 0
src/JT808.DotNetty/JT808.DotNetty.csproj Прегледај датотеку

@@ -0,0 +1,19 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.0</TargetFramework>
<LangVersion>latest</LangVersion>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="DotNetty.Codecs" Version="0.6.0" />
<PackageReference Include="DotNetty.Handlers" Version="0.6.0" />
<PackageReference Include="DotNetty.Transport.Libuv" Version="0.6.0" />
<PackageReference Include="JT808" Version="1.0.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options" Version="2.1.1" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.1.1" />
</ItemGroup>

</Project>

+ 162
- 0
src/JT808.DotNetty/JT808MsgIdHandlerBase.cs Прегледај датотеку

@@ -0,0 +1,162 @@
using System;
using System.Collections.Generic;
using System.Text;
using DotNetty.Transport.Channels;
using JT808.Protocol;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;

namespace JT808.DotNetty
{
/// <summary>
/// 抽象消息处理业务
/// </summary>
public abstract class JT808MsgIdHandlerBase
{
protected SessionManager sessionManager { get; }
/// <summary>
/// 初始化消息处理业务
/// </summary>
protected JT808MsgIdHandlerBase(SessionManager sessionManager)
{
this.sessionManager = sessionManager;
HandlerDict = new Dictionary<JT808MsgId, Func<JT808Package, IChannelHandlerContext, JT808Package>>
{
{JT808MsgId.终端通用应答, Msg0x0001},
{JT808MsgId.终端鉴权, Msg0x0102},
{JT808MsgId.终端心跳, Msg0x0002},
{JT808MsgId.终端注销, Msg0x0003},
{JT808MsgId.终端注册, Msg0x0100},
{JT808MsgId.位置信息汇报,Msg0x0200 },
{JT808MsgId.定位数据批量上传,Msg0x0704 },
{JT808MsgId.数据上行透传,Msg0x0900 }
};
}

public Dictionary<JT808MsgId, Func<JT808Package, IChannelHandlerContext, JT808Package>> HandlerDict { get; }
/// <summary>
/// 终端通用应答
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0001(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 终端心跳
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0002(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
sessionManager.Heartbeat(reqJT808Package.Header.TerminalPhoneNo);
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 终端注销
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0003(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
sessionManager.RemoveSessionByTerminalPhoneNo(reqJT808Package.Header.TerminalPhoneNo);
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 终端注册
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0100(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
return JT808MsgId.终端注册应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8100()
{
Code = "J" + reqJT808Package.Header.TerminalPhoneNo,
JT808TerminalRegisterResult = JT808TerminalRegisterResult.成功,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 终端鉴权
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0102(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
sessionManager.RegisterSession(new JT808Session(ctx.Channel, reqJT808Package.Header.TerminalPhoneNo));
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 位置信息汇报
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0200(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 定位数据批量上传
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0704(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
/// <summary>
/// 数据上行透传
/// </summary>
/// <param name="reqJT808Package"></param>
/// <param name="ctx"></param>
/// <returns></returns>
public virtual JT808Package Msg0x0900(JT808Package reqJT808Package, IChannelHandlerContext ctx)
{
return JT808MsgId.平台通用应答.Create(reqJT808Package.Header.TerminalPhoneNo, new JT808_0x8001()
{
MsgId = reqJT808Package.Header.MsgId,
JT808PlatformResult = JT808PlatformResult.Success,
MsgNum = reqJT808Package.Header.MsgNum
});
}
}
}

+ 90
- 0
src/JT808.DotNetty/JT808ServerHost.cs Прегледај датотеку

@@ -0,0 +1,90 @@
using DotNetty.Buffers;
using DotNetty.Codecs;
using DotNetty.Handlers.Timeout;
using DotNetty.Transport.Bootstrapping;
using DotNetty.Transport.Channels;
using DotNetty.Transport.Libuv;
using JT808.DotNetty.Codecs;
using JT808.DotNetty.Configurations;
using JT808.DotNetty.Handlers;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Net;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.DotNetty
{
public class JT808ServerHost : IHostedService
{
public IServiceProvider Provider { get; }
private readonly JT808Configuration configuration;
private readonly ILogger<JT808ServerHost> logger;
private DispatcherEventLoopGroup bossGroup;
private WorkerEventLoopGroup workerGroup;
private IChannel bootstrapChannel;

public JT808ServerHost(
IServiceProvider provider,
ILoggerFactory loggerFactory,
IOptions<JT808Configuration> jT808ConfigurationAccessor)
{
Provider = provider;
configuration = jT808ConfigurationAccessor.Value;
logger=loggerFactory.CreateLogger<JT808ServerHost>();
}

public Task StartAsync(CancellationToken cancellationToken)
{
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();
if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux)
|| RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
{
bootstrap
.Option(ChannelOption.SoReuseport, true)
.ChildOption(ChannelOption.SoReuseaddr, true);
}
bootstrap
.Option(ChannelOption.SoBacklog, configuration.SoBacklog)
.ChildHandler(new ActionChannelInitializer<IChannel>(channel =>
{
IChannelPipeline pipeline = channel.Pipeline;
using(var scope= Provider.CreateScope())
{
channel.Pipeline.AddLast("systemIdleState", new IdleStateHandler(
configuration.ReaderIdleTimeSeconds,
configuration.WriterIdleTimeSeconds,
configuration.AllIdleTimeSeconds));
channel.Pipeline.AddLast("jt808Connection", scope.ServiceProvider.GetRequiredService<JT808ConnectionHandler>());
channel.Pipeline.AddLast("jt808Buffer", new DelimiterBasedFrameDecoder(int.MaxValue,
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.BeginFlag }),
Unpooled.CopiedBuffer(new byte[] { JT808.Protocol.JT808Package.EndFlag })));
channel.Pipeline.AddLast("jt808Decode", scope.ServiceProvider.GetRequiredService<JT808Decoder>());
channel.Pipeline.AddLast("jt808Service", scope.ServiceProvider.GetRequiredService<JT808ServerHandler>());
}
}));
logger.LogInformation($"Server start at {IPAddress.Any}:{configuration.Port}.");
return bootstrap.BindAsync(configuration.Port)
.ContinueWith(i => bootstrapChannel = i.Result);
}

public async Task StopAsync(CancellationToken cancellationToken)
{
await bootstrapChannel.CloseAsync();
var quietPeriod = configuration.QuietPeriodTimeSpan;
var shutdownTimeout = configuration.ShutdownTimeoutTimeSpan;
await workerGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
await bossGroup.ShutdownGracefullyAsync(quietPeriod, shutdownTimeout);
}
}
}

+ 32
- 0
src/JT808.DotNetty/JT808Session.cs Прегледај датотеку

@@ -0,0 +1,32 @@
using DotNetty.Transport.Channels;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.DotNetty
{
public class JT808Session
{
public JT808Session(IChannel channel, string terminalPhoneNo)
{
Channel = channel;
TerminalPhoneNo = terminalPhoneNo;
StartTime = DateTime.Now;
LastActiveTime = DateTime.Now;
SessionID = Channel.Id.AsShortText();
}

/// <summary>
/// 终端手机号
/// </summary>
public string TerminalPhoneNo { get; set; }

public string SessionID { get; }

public IChannel Channel { get; }

public DateTime LastActiveTime { get; set; }

public DateTime StartTime { get; }
}
}

+ 224
- 0
src/JT808.DotNetty/SessionManager.cs Прегледај датотеку

@@ -0,0 +1,224 @@
using JT808.DotNetty.Configurations;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.DotNetty
{
public class SessionManager: IDisposable
{
private readonly ILogger<SessionManager> logger;
private readonly JT808Configuration configuration;
private readonly CancellationTokenSource cancellationTokenSource;
public SessionManager(
IOptions<JT808Configuration> jT808ConfigurationAccessor,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<SessionManager>();
configuration = jT808ConfigurationAccessor.Value;
cancellationTokenSource = new CancellationTokenSource();
Task.Run(() =>
{
while (!cancellationTokenSource.IsCancellationRequested)
{
logger.LogInformation($"Online Count>>>{SessionCount}");
if (SessionCount > 0)
{
logger.LogInformation($"SessionIds>>>{string.Join(",", SessionIdDict.Select(s => s.Key))}");
logger.LogInformation($"TerminalPhoneNos>>>{string.Join(",", TerminalPhoneNo_SessionId_Dict.Select(s => $"{s.Key}-{s.Value}"))}");
}
Thread.Sleep(configuration.SessionReportTime);
}
}, cancellationTokenSource.Token);
}

/// <summary>
/// Netty生成的sessionID和Session的对应关系
/// key = seession id
/// value = Session
/// </summary>
private ConcurrentDictionary<string, JT808Session> SessionIdDict = new ConcurrentDictionary<string, JT808Session>(StringComparer.OrdinalIgnoreCase);
/// <summary>
/// 终端手机号和netty生成的sessionID的对应关系
/// key = 终端手机号
/// value = seession id
/// </summary>
private ConcurrentDictionary<string, string> TerminalPhoneNo_SessionId_Dict = new ConcurrentDictionary<string, string>(StringComparer.OrdinalIgnoreCase);

public int SessionCount
{
get
{
return SessionIdDict.Count;
}
}

public void RegisterSession(JT808Session appSession)
{
if (TerminalPhoneNo_SessionId_Dict.ContainsKey(appSession.TerminalPhoneNo))
{
return;
}
if (SessionIdDict.TryAdd(appSession.SessionID, appSession) &&
TerminalPhoneNo_SessionId_Dict.TryAdd(appSession.TerminalPhoneNo, appSession.SessionID))
{
return;
}
}

public JT808Session GetSessionByID(string sessionID)
{
if (string.IsNullOrEmpty(sessionID))
return default;
JT808Session targetSession;
SessionIdDict.TryGetValue(sessionID, out targetSession);
return targetSession;
}

public JT808Session GetSessionByTerminalPhoneNo(string terminalPhoneNo)
{
try
{
if (string.IsNullOrEmpty(terminalPhoneNo))
return default;
if (TerminalPhoneNo_SessionId_Dict.TryGetValue(terminalPhoneNo, out string sessionId))
{
if (SessionIdDict.TryGetValue(sessionId, out JT808Session targetSession))
{
return targetSession;
}
else
{
return default;
}
}
else
{
return default;
}
}
catch (Exception ex)
{
logger.LogError(ex, terminalPhoneNo);
return default;
}
}

public void Heartbeat(string terminalPhoneNo)
{
try
{
if (TerminalPhoneNo_SessionId_Dict.TryGetValue(terminalPhoneNo, out string sessionId))
{
if (SessionIdDict.TryGetValue(sessionId, out JT808Session oldjT808Session))
{
if (oldjT808Session.Channel.Active)
{
oldjT808Session.LastActiveTime = DateTime.Now;
if (SessionIdDict.TryUpdate(sessionId, oldjT808Session, oldjT808Session))
{

}
}
}
}
}
catch (Exception ex)
{
logger.LogError(ex, terminalPhoneNo);
}
}

/// <summary>
/// 通过通道Id和设备终端号进行关联
/// </summary>
/// <param name="sessionID"></param>
/// <param name="terminalPhoneNo"></param>
public void UpdateSessionByID(string sessionID, string terminalPhoneNo)
{
try
{
if (SessionIdDict.TryGetValue(sessionID, out JT808Session oldjT808Session))
{
oldjT808Session.TerminalPhoneNo = terminalPhoneNo;
if (SessionIdDict.TryUpdate(sessionID, oldjT808Session, oldjT808Session))
{
TerminalPhoneNo_SessionId_Dict.AddOrUpdate(terminalPhoneNo, sessionID, (tpn, sid) =>
{
return sessionID;
});
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $"{sessionID},{terminalPhoneNo}");
}
}

public void RemoveSessionByID(string sessionID)
{
if (sessionID == null) return;
try
{
if (SessionIdDict.TryRemove(sessionID, out JT808Session session))
{
if (session.TerminalPhoneNo != null)
{
if (TerminalPhoneNo_SessionId_Dict.TryRemove(session.TerminalPhoneNo, out string sessionid))
{
logger.LogInformation($">>>{sessionID}-{session.TerminalPhoneNo} Session Remove.");
}
}
else
{
logger.LogInformation($">>>{sessionID} Session Remove.");
}
// call GPS.JT808NettyServer.Handlers.JT808ConnectionHandler.CloseAsync
session.Channel.CloseAsync();
}
}
catch (Exception ex)
{
logger.LogError(ex, $">>>{sessionID} Session Remove Exception");
}
}

public void RemoveSessionByTerminalPhoneNo(string terminalPhoneNo)
{
if (terminalPhoneNo == null) return;
try
{
if (TerminalPhoneNo_SessionId_Dict.TryRemove(terminalPhoneNo, out string sessionid))
{
if (SessionIdDict.TryRemove(sessionid, out JT808Session session))
{
logger.LogInformation($">>>{terminalPhoneNo}-{sessionid} TerminalPhoneNo Remove.");
}
else
{
logger.LogInformation($">>>{terminalPhoneNo} TerminalPhoneNo Remove.");
}
}
}
catch (Exception ex)
{
logger.LogError(ex, $">>>{terminalPhoneNo} TerminalPhoneNo Remove Exception.");
}
}

public void Dispose()
{
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
}
}
}


Loading…
Откажи
Сачувај