Browse Source

1.添加tcp服务

2.添加tcp会话超时服务
3.添加内存队列的实现库
4.添加内存队列的测试服务
master
SmallChi(Koike) 4 years ago
parent
commit
7a91b0b35d
21 changed files with 533 additions and 54 deletions
  1. +1
    -1
      README.md
  2. +4
    -3
      src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
  3. +8
    -0
      src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs
  4. +31
    -0
      src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
  5. +26
    -0
      src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs
  6. +17
    -0
      src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs
  7. +60
    -0
      src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs
  8. +32
    -0
      src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs
  9. +24
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
  10. +43
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  11. +27
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs
  12. +15
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json
  13. +15
    -2
      src/JT1078.Gateway.sln
  14. +3
    -1
      src/JT1078.Gateway/JT1078.Gateway.csproj
  15. +42
    -3
      src/JT1078.Gateway/JT1078GatewayExtensions.cs
  16. +2
    -2
      src/JT1078.Gateway/JT1078TcpServer.cs
  17. +47
    -0
      src/JT1078.Gateway/JT1078UdpServer.cs
  18. +46
    -0
      src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
  19. +62
    -0
      src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs
  20. +16
    -0
      src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs
  21. +12
    -42
      src/JT1078.Gateway/Sessions/JT1078SessionManager.cs

+ 1
- 1
README.md View File

@@ -1,4 +1,4 @@
# JT1078.Gateway
# JT1078Gateway

## 前提条件



+ 4
- 3
src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj View File

@@ -7,8 +7,8 @@
<Authors>SmallChi(Koike)</Authors>
<PackageId>JT1078.Gateway.Abstractions</PackageId>
<Product>JT1078.Gateway.Abstractions</Product>
<Description>基于Pipeline实现的JT1078Gateway的抽象库</Description>
<PackageReleaseNotes>基于Pipeline实现的JT1078Gateway的抽象库</PackageReleaseNotes>
<Description>基于JT1078Gateway的抽象库</Description>
<PackageReleaseNotes>基于JT1078Gateway的抽象库</PackageReleaseNotes>
<RepositoryUrl>https://github.com/SmallChi/JT1078Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT1078Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</licenseUrl>
@@ -17,7 +17,7 @@
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Version>1.0.0-preview1</Version>
<Version>1.0.0-preview2</Version>
</PropertyGroup>
<ItemGroup>
@@ -30,5 +30,6 @@
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.5" />
<PackageReference Include="System.Text.Json" Version="4.7.2" />
</ItemGroup>
</Project>

+ 8
- 0
src/JT1078.Gateway.Abstractions/JT1078GatewayConstants.cs View File

@@ -0,0 +1,8 @@
namespace JT1078.Gateway.Abstractions
{
public static class JT1078GatewayConstants
{
public const string SessionOnline= "JT1078SessionOnline";
public const string SessionOffline = "JT1078SessionOffline";
}
}

+ 31
- 0
src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj View File

@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netstandard2.1</TargetFramework>
<LangVersion>8.0</LangVersion>
<Copyright>Copyright 2019.</Copyright>
<Authors>SmallChi(Koike)</Authors>
<PackageId>JT1078.Gateway.InMemoryMQ</PackageId>
<Product>JT1078.Gateway.InMemoryMQ</Product>
<Description>基于JT1078Gateway实现的内存队列</Description>
<PackageReleaseNotes>基于JT1078Gateway实现的内存队列</PackageReleaseNotes>
<RepositoryUrl>https://github.com/SmallChi/JT1078Gateway</RepositoryUrl>
<PackageProjectUrl>https://github.com/SmallChi/JT1078Gateway</PackageProjectUrl>
<licenseUrl>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</licenseUrl>
<license>https://github.com/SmallChi/JT1078Gateway/blob/master/LICENSE</license>
<GeneratePackageOnBuild>false</GeneratePackageOnBuild>
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Version>1.0.0-preview2</Version>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="System.Threading.Channels" Version="4.7.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT1078.Gateway\JT1078.Gateway.csproj" />
</ItemGroup>

</Project>

+ 26
- 0
src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs View File

@@ -0,0 +1,26 @@
using JT1078.Gateway.Abstractions;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.InMemoryMQ
{
public static class JT1078InMemoryMQExtensions
{
public static IJT1078NormalGatewayBuilder AddMsgProducer(this IJT1078NormalGatewayBuilder builder)
{
builder.JT1078Builder.Services.TryAddSingleton<JT1078MsgChannel>();
builder.JT1078Builder.Services.AddSingleton<IJT1078PackageProducer, JT1078PackageProducer>();
return builder;
}

public static IJT1078NormalGatewayBuilder AddMsgConsumer(this IJT1078NormalGatewayBuilder builder)
{
builder.JT1078Builder.Services.TryAddSingleton<JT1078MsgChannel>();
builder.JT1078Builder.Services.AddSingleton<IJT1078PackageConsumer, JT1078PackageConsumer>();
return builder;
}
}
}

+ 17
- 0
src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs View File

@@ -0,0 +1,17 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Channels;

namespace JT1078.Gateway.InMemoryMQ
{
public class JT1078MsgChannel
{
public Channel<(string, JT1078.Protocol.JT1078Package)> Channel { get;}

public JT1078MsgChannel()
{
Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, JT1078.Protocol.JT1078Package)>();
}
}
}

+ 60
- 0
src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs View File

@@ -0,0 +1,60 @@
using JT1078.Gateway.Abstractions;
using JT1078.Protocol;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.InMemoryMQ
{
public class JT1078PackageConsumer: IJT1078PackageConsumer
{
private JT1078MsgChannel Channel;
private readonly ILogger logger;

public JT1078PackageConsumer(ILoggerFactory loggerFactory,JT1078MsgChannel channel)
{
Channel = channel;
logger = loggerFactory.CreateLogger<JT1078PackageConsumer>();
}

public CancellationTokenSource Cts { get; } = new CancellationTokenSource();

public string TopicName { get; } = "JT1078Package";

public void Dispose()
{

}

public void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await Channel.Channel.Reader.ReadAsync(Cts.Token);
if (logger.IsEnabled(LogLevel.Trace))
{
logger.LogTrace(JsonSerializer.Serialize(reader.Item2));
}
callback(reader);
}
}, Cts.Token);
}

public void Subscribe()
{
}

public void Unsubscribe()
{

}
}
}

+ 32
- 0
src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs View File

@@ -0,0 +1,32 @@
using JT1078.Gateway.Abstractions;
using JT1078.Protocol;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT1078.Gateway.InMemoryMQ
{
public class JT1078PackageProducer : IJT1078PackageProducer
{
public string TopicName { get; }= "JT1078Package";

private JT1078MsgChannel Channel;

public JT1078PackageProducer(JT1078MsgChannel channel)
{
Channel = channel;
}

public void Dispose()
{
}

public async ValueTask ProduceAsync(string terminalNo, JT1078Package data)
{
await Channel.Channel.Writer.WriteAsync((terminalNo, data));
}
}
}

+ 24
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj View File

@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.5" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.5" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj" />
<ProjectReference Include="..\..\JT1078.Gateway\JT1078.Gateway.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
</Project>

+ 43
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs View File

@@ -0,0 +1,43 @@
using JT1078.Gateway.InMemoryMQ;
using JT1078.Gateway.TestNormalHosting.Services;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT1078.Gateway.TestNormalHosting
{
class Program
{
static async Task Main(string[] args)
{
var serverHostBuilder = new HostBuilder()
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
})
.ConfigureLogging((context, logging) =>
{
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Trace);
})
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//使用内存队列实现会话通知
services.AddJT1078Gateway(hostContext.Configuration)
.AddTcp()
.AddNormal()
.AddMsgProducer()
.AddMsgConsumer();
services.AddHostedService<JT1078NormalMsgHostedService>();
});

await serverHostBuilder.RunConsoleAsync();
}
}
}

+ 27
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078NormalMsgHostedService.cs View File

@@ -0,0 +1,27 @@
using JT1078.Gateway.Abstractions;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.TestNormalHosting.Services
{
public class JT1078NormalMsgHostedService : BackgroundService
{
private IJT1078PackageConsumer PackageConsumer;
public JT1078NormalMsgHostedService(IJT1078PackageConsumer packageConsumer)
{
PackageConsumer = packageConsumer;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
PackageConsumer.OnMessage((Message) =>
{
});
return Task.CompletedTask;
}
}
}

+ 15
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/appsettings.json View File

@@ -0,0 +1,15 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
}
}

+ 15
- 2
src/JT1078.Gateway.sln View File

@@ -5,11 +5,15 @@ VisualStudioVersion = 16.0.29418.71
MinimumVisualStudioVersion = 10.0.40219.1
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}"
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -29,12 +33,21 @@ Global
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU
{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.Build.0 = Release|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
EndGlobalSection
GlobalSection(NestedProjects) = preSolution
{DAC80A8E-4172-451F-910D-9032BF8640F9} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE} = {04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {9172690A-1D5A-491A-ACDD-3AF893980E0B}


+ 3
- 1
src/JT1078.Gateway/JT1078.Gateway.csproj View File

@@ -16,7 +16,7 @@
<SignAssembly>false</SignAssembly>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<PackageRequireLicenseAcceptance>true</PackageRequireLicenseAcceptance>
<Version>1.0.0-preview1</Version>
<Version>1.0.0-preview2</Version>
</PropertyGroup>
<ItemGroup>
<Compile Remove="Codecs\**" />
@@ -40,6 +40,8 @@
</ItemGroup>
<ItemGroup>
<Compile Remove="Metadata\JT1078HttpSession.cs" />
<Compile Remove="Metadata\JT1078Request.cs" />
<Compile Remove="Metadata\JT1078Response.cs" />
</ItemGroup>

<ItemGroup>


+ 42
- 3
src/JT1078.Gateway/JT1078GatewayExtensions.cs View File

@@ -1,6 +1,9 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Impl;
using JT1078.Gateway.Jobs;
using JT1078.Gateway.Services;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection.Extensions;
@@ -15,17 +18,53 @@ namespace JT1078.Gateway
public static class JT1078GatewayExtensions
{

public static IJT1078Builder AddJT1078Gateway(this IServiceCollection serviceDescriptors, IConfiguration configuration)
public static IJT1078GatewayBuilder AddJT1078Gateway(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure<JT1078Configuration>(configuration.GetSection("JT1078Configuration"));
return builder;
IJT1078GatewayBuilder jT1078GatewayBuilderDefault = new JT1078GatewayBuilderDefault(builder);
jT1078GatewayBuilderDefault.AddJT1078Core();
return jT1078GatewayBuilderDefault;
}

public static IJT1078Builder AddJT1078Gateway(this IServiceCollection serviceDescriptors, Action<JT1078Configuration> jt1078Options)
public static IJT1078GatewayBuilder AddJT1078Gateway(this IServiceCollection serviceDescriptors, Action<JT1078Configuration> jt1078Options)
{
IJT1078Builder builder = new JT1078BuilderDefault(serviceDescriptors);
builder.Services.Configure(jt1078Options);
IJT1078GatewayBuilder jT1078GatewayBuilderDefault = new JT1078GatewayBuilderDefault(builder);
jT1078GatewayBuilderDefault.AddJT1078Core();
return jT1078GatewayBuilderDefault;
}

public static IJT1078GatewayBuilder AddTcp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddHostedService<JT1078TcpReceiveTimeoutJob>();
builder.JT1078Builder.Services.AddHostedService<JT1078TcpServer>();
return builder;
}

public static IJT1078GatewayBuilder AddUdp(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddHostedService<JT1078UdpServer>();
return builder;
}

public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder)
{
return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder);
}

public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddHostedService<JT1078TcpReceiveTimeoutJob>();
return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder);
}

internal static IJT1078GatewayBuilder AddJT1078Core(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton<JT1078SessionNoticeService>();
builder.JT1078Builder.Services.AddSingleton<JT1078SessionManager>();
builder.JT1078Builder.Services.AddHostedService<JT1078SessionNoticeJob>();
return builder;
}
}

+ 2
- 2
src/JT1078.Gateway/JT1078TcpServer.cs View File

@@ -52,7 +52,7 @@ namespace JT1078.Gateway
{
SessionManager = jT1078SessionManager;
jT1078UseType = JT1078UseType.Normal;
Logger = loggerFactory.CreateLogger("JT1078TcpServer");
Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078PackageProducer = jT1078PackageProducer;
InitServer();
@@ -73,7 +73,7 @@ namespace JT1078.Gateway
{
SessionManager = jT1078SessionManager;
jT1078UseType = JT1078UseType.Queue;
Logger = loggerFactory.CreateLogger("JT1078TcpServer");
Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078MsgProducer = jT1078MsgProducer;
InitServer();


+ 47
- 0
src/JT1078.Gateway/JT1078UdpServer.cs View File

@@ -0,0 +1,47 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
using System.Collections.Generic;
using System.IO.Pipelines;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Abstractions.Enums;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using JT1078.Protocol;
using JT1078.Protocol.Enums;
using JT1078.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;

namespace JT1078.Gateway
{
public class JT1078UdpServer : IHostedService
{
public Task StartAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}

string ReadBCD(ReadOnlySpan<byte> readOnlySpan, int len)
{
int count = len / 2;
StringBuilder bcdSb = new StringBuilder(count);
for (int i = 0; i < count; i++)
{
bcdSb.Append(readOnlySpan[i].ToString("X2"));
}
return bcdSb.ToString().TrimStart('0');
}
}
}

+ 46
- 0
src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs View File

@@ -0,0 +1,46 @@
using JT1078.Gateway.Services;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;


namespace JT1078.Gateway.Jobs
{
public class JT1078SessionNoticeJob : BackgroundService
{
private readonly ILogger logger;
private JT1078SessionNoticeService SessionNoticeService;
public JT1078SessionNoticeJob(
JT1078SessionNoticeService sessionNoticeService,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger<JT1078SessionNoticeJob>();
SessionNoticeService = sessionNoticeService;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
await Task.Run(() => {
try
{
foreach (var notice in SessionNoticeService.SessionNoticeBlockingCollection.GetConsumingEnumerable(stoppingToken))
{
if (logger.IsEnabled(LogLevel.Information))
{
logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}");
}
}
}
catch
{

}
}, stoppingToken);
}
}
}

+ 62
- 0
src/JT1078.Gateway/Jobs/JT1078TcpReceiveTimeoutJob.cs View File

@@ -0,0 +1,62 @@
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT1078.Gateway.Jobs
{
internal class JT1078TcpReceiveTimeoutJob : BackgroundService
{
private readonly ILogger Logger;

private readonly JT1078SessionManager SessionManager;

private readonly IOptionsMonitor<JT1078Configuration> Configuration;
public JT1078TcpReceiveTimeoutJob(
IOptionsMonitor<JT1078Configuration> jT1078ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT1078SessionManager jT1078SessionManager
)
{
SessionManager = jT1078SessionManager;
Logger = loggerFactory.CreateLogger<JT1078TcpReceiveTimeoutJob>();
Configuration = jT1078ConfigurationAccessor;
}

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
try
{
foreach (var item in SessionManager.GetTcpAll())
{
if (item.ActiveTime.AddSeconds(Configuration.CurrentValue.TcpReaderIdleTimeSeconds) < DateTime.Now)
{
item.ReceiveTimeout.Cancel();
}
}
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[Check Receive Timeout]");
Logger.LogInformation($"[Session Online Count]:{SessionManager.TcpSessionCount}");
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"[Receive Timeout]");
}
finally
{
await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.TcpReceiveTimeoutCheckTimeSeconds), stoppingToken);
}
}
}
}
}

+ 16
- 0
src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs View File

@@ -0,0 +1,16 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;

namespace JT1078.Gateway.Services
{
public class JT1078SessionNoticeService
{
public BlockingCollection<(string SessionType, string TerminalPhoneNo,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; }
public JT1078SessionNoticeService()
{
SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string TerminalPhoneNo, string ProtocolType)>();
}
}
}

+ 12
- 42
src/JT1078.Gateway/Sessions/JT1078SessionManager.cs View File

@@ -1,5 +1,6 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Abstractions.Enums;
using JT1078.Gateway.Services;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Concurrent;
@@ -20,11 +21,15 @@ namespace JT1078.Gateway.Sessions
private readonly ILogger logger;
public ConcurrentDictionary<string, IJT1078Session> Sessions { get; }
public ConcurrentDictionary<string, IJT1078Session> TerminalPhoneNoSessions { get; }
public JT1078SessionManager(ILoggerFactory loggerFactory)
private readonly JT1078SessionNoticeService SessionNoticeService;
public JT1078SessionManager(
JT1078SessionNoticeService jT1078SessionNoticeService,
ILoggerFactory loggerFactory)
{
Sessions = new ConcurrentDictionary<string, IJT1078Session>(StringComparer.OrdinalIgnoreCase);
TerminalPhoneNoSessions = new ConcurrentDictionary<string, IJT1078Session>(StringComparer.OrdinalIgnoreCase);
logger = loggerFactory.CreateLogger("JT1078SessionManager");
logger = loggerFactory.CreateLogger<JT1078SessionManager>();
this.SessionNoticeService = jT1078SessionNoticeService;
}

public int TotalSessionCount
@@ -62,8 +67,7 @@ namespace JT1078.Gateway.Sessions
session.ActiveTime = curretDatetime;
TerminalPhoneNoSessions.TryUpdate(terminalPhoneNo, session, cacheSession);
//会话通知
//todo: 会话通知

SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, session.TransportProtocolType.ToString()));
}
else
{
@@ -77,40 +81,7 @@ namespace JT1078.Gateway.Sessions
if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
{
//会话通知
//todo: 会话通知

}
}
}

internal void TryLink(IJT1078Session session)
{
DateTime curretDatetime = DateTime.Now;
if (TerminalPhoneNoSessions.TryGetValue(session.SessionID, out IJT1078Session cacheSession))
{
if (session.SessionID != cacheSession.SessionID)
{
//从转发到直连的数据需要更新缓存
session.ActiveTime = curretDatetime;
TerminalPhoneNoSessions.TryUpdate(session.SessionID, session, cacheSession);
//会话通知
//todo: 会话通知

}
else
{
cacheSession.ActiveTime = curretDatetime;
TerminalPhoneNoSessions.TryUpdate(session.SessionID, cacheSession, cacheSession);
}
}
else
{
session.TerminalPhoneNo = session.SessionID;
if (TerminalPhoneNoSessions.TryAdd(session.SessionID, session))
{
//会话通知
//todo: 会话通知

SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, session.TransportProtocolType.ToString()));
}
}
}
@@ -133,7 +104,7 @@ namespace JT1078.Gateway.Sessions
currentSession = session;
}
//会话通知
//todo: 会话通知
SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOnline, terminalPhoneNo, currentSession.TransportProtocolType.ToString()));
return currentSession;
}

@@ -206,8 +177,7 @@ namespace JT1078.Gateway.Sessions
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{terminalPhoneNo}-{tmpTerminalPhoneNo}");
//会话通知
//todo: 会话通知 SessionOffline

SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOffline, terminalPhoneNo, removeTerminalPhoneNoSessions.TransportProtocolType.ToString()));
}
}
}
@@ -225,7 +195,7 @@ namespace JT1078.Gateway.Sessions
}
var tmpTerminalPhoneNo = string.Join(",", terminalPhoneNos);
//会话通知
//todo: 会话通知 SessionOffline tmpTerminalPhoneNo
SessionNoticeService.SessionNoticeBlockingCollection.Add((JT1078GatewayConstants.SessionOffline, tmpTerminalPhoneNo, removeSession.TransportProtocolType.ToString()));
if (logger.IsEnabled(LogLevel.Information))
logger.LogInformation($"[Session Remove]:{tmpTerminalPhoneNo}");
}


Loading…
Cancel
Save