diff --git a/.gitignore b/.gitignore index bb78718..e76584a 100644 --- a/.gitignore +++ b/.gitignore @@ -52,7 +52,6 @@ BenchmarkDotNet.Artifacts/ project.lock.json project.fragment.lock.json artifacts/ -**/Properties/launchSettings.json # StyleCop StyleCopReport.xml diff --git a/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj new file mode 100644 index 0000000..b8a8308 --- /dev/null +++ b/src/JT1078.Gateway.Coordinator/JT1078.Gateway.Coordinator.csproj @@ -0,0 +1,9 @@ + + + + netcoreapp3.1 + true + + + + diff --git a/src/JT1078.Gateway.Coordinator/Program.cs b/src/JT1078.Gateway.Coordinator/Program.cs new file mode 100644 index 0000000..cace568 --- /dev/null +++ b/src/JT1078.Gateway.Coordinator/Program.cs @@ -0,0 +1,65 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using Microsoft.AspNetCore.Builder; +using Microsoft.AspNetCore.Hosting; +using Microsoft.AspNetCore.HttpOverrides; +using Microsoft.Extensions.Configuration; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; + +namespace JT1078.Gateway.Coordinator +{ + public class Program + { + public static void Main(string[] args) + { + Host.CreateDefaultBuilder(args) + .UseEnvironment(args[0]) + .ConfigureWebHostDefaults(webBuilder => + { + webBuilder.Configure(app => + { + app.UseRouting(); + app.UseCors("any"); + app.UseEndpoints(endpoints => + { + endpoints.MapControllers(); + }); + app.UseForwardedHeaders(new ForwardedHeadersOptions + { + ForwardedHeaders = ForwardedHeaders.XForwardedFor | ForwardedHeaders.XForwardedProto | ForwardedHeaders.XForwardedHost + }); + }) + .ConfigureServices(services => + { + services.AddCors(options => + options.AddPolicy("any", builder => + builder.AllowAnyOrigin() + .AllowAnyMethod() + .AllowAnyHeader() + .AllowAnyOrigin() + .SetIsOriginAllowed(o=>true))); + services.AddMemoryCache(); + services.AddControllers(); + services.AddMvc(); + }); + }) + .ConfigureAppConfiguration((hostingContext, config) => + { + config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); + config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true) + .AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true); + }) + .ConfigureServices((hostContext, services) => + { + services.AddSingleton(); + services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); + }) + .Build() + .Run(); + } + } +} diff --git a/src/JT1078.Gateway.Coordinator/Properties/launchSettings.json b/src/JT1078.Gateway.Coordinator/Properties/launchSettings.json new file mode 100644 index 0000000..b3247fb --- /dev/null +++ b/src/JT1078.Gateway.Coordinator/Properties/launchSettings.json @@ -0,0 +1,13 @@ +{ + "profiles": { + "JT1078.Gateway.Coordinator": { + "commandName": "Project", + "launchBrowser": true, + "commandLineArgs": "Development", + "applicationUrl": "http://localhost:1080", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development" + } + } + } +} diff --git a/src/JT1078.Gateway.Coordinator/appsettings.Development.json b/src/JT1078.Gateway.Coordinator/appsettings.Development.json new file mode 100644 index 0000000..8983e0f --- /dev/null +++ b/src/JT1078.Gateway.Coordinator/appsettings.Development.json @@ -0,0 +1,9 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + } +} diff --git a/src/JT1078.Gateway.Coordinator/appsettings.json b/src/JT1078.Gateway.Coordinator/appsettings.json new file mode 100644 index 0000000..d9d9a9b --- /dev/null +++ b/src/JT1078.Gateway.Coordinator/appsettings.json @@ -0,0 +1,10 @@ +{ + "Logging": { + "LogLevel": { + "Default": "Information", + "Microsoft": "Warning", + "Microsoft.Hosting.Lifetime": "Information" + } + }, + "AllowedHosts": "*" +} diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs index a6cfc23..56e4035 100644 --- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs +++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs @@ -30,12 +30,13 @@ namespace JT1078.Gateway.TestNormalHosting services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); //使用内存队列实现会话通知 services.AddJT1078Gateway(hostContext.Configuration) - .AddHttp() - .AddUdp() - .AddTcp() - .AddNormal() - .AddMsgProducer() - .AddMsgConsumer(); + .AddHttp() + .AddUdp() + .AddTcp() + //.AddCoordinatorHttpClient() + .AddNormal() + .AddMsgProducer() + .AddMsgConsumer(); services.AddHostedService(); }); diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln index df3bbe5..ac29fd6 100644 --- a/src/JT1078.Gateway.sln +++ b/src/JT1078.Gateway.sln @@ -15,6 +15,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Coordinator", "JT1078.Gateway.Coordinator\JT1078.Gateway.Coordinator.csproj", "{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -41,6 +43,10 @@ Global {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.Build.0 = Debug|Any CPU {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.ActiveCfg = Release|Any CPU {6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.Build.0 = Release|Any CPU + {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Debug|Any CPU.Build.0 = Debug|Any CPU + {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.ActiveCfg = Release|Any CPU + {B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs index 57f582e..37a76e0 100644 --- a/src/JT1078.Gateway/Configurations/JT1078Configuration.cs +++ b/src/JT1078.Gateway/Configurations/JT1078Configuration.cs @@ -33,7 +33,17 @@ namespace JT1078.Gateway.Configurations /// Hls根目录 /// public string HlsRootDirectory { get; set; } = "wwwroot"; - + /// + /// 协调器发送心跳时间 + /// 默认60s发送一次 + /// + public int CoordinatorHeartbeatTimeSeconds { get; set; } = 60; + /// + /// 协调器Coordinator主机 + /// http://localhost/ + /// http://127.0.0.1/ + /// + public string CoordinatorUri { get; set; } = "http://localhost:1080/"; public JT1078Configuration Value => this; } } diff --git a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs index 93bd045..f983a62 100644 --- a/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs +++ b/src/JT1078.Gateway/JT1078CoordinatorHttpClient.cs @@ -1,7 +1,10 @@ -using System; +using JT1078.Gateway.Configurations; +using Microsoft.Extensions.Options; +using System; using System.Collections.Generic; using System.Net.Http; using System.Text; +using System.Threading.Tasks; namespace JT1078.Gateway { @@ -12,18 +15,33 @@ namespace JT1078.Gateway { private HttpClient httpClient; - public JT1078CoordinatorHttpClient(HttpClient httpClient) + private JT1078Configuration Configuration; + + private const string endpoint = "/JT1078WebApi"; + + public JT1078CoordinatorHttpClient(IOptions configurationAccessor) + { + Configuration = configurationAccessor.Value; + this.httpClient = new HttpClient(); + this.httpClient.BaseAddress = new Uri(Configuration.CoordinatorUri); + this.httpClient.Timeout = TimeSpan.FromSeconds(3); + } + + /// + /// 发送重制至协调器中 + /// + public async ValueTask Reset() { - this.httpClient = httpClient; + await httpClient.GetAsync($"{endpoint}/reset"); } /// /// 发送心跳至协调器中 /// /// - public async void Heartbeat(string content) + public async ValueTask Heartbeat(string content) { - await httpClient.PostAsync("/heartbeat", new StringContent(content)); + await httpClient.PostAsync($"{endpoint}/heartbeat", new StringContent(content)); } } } diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs index ef9f8b5..c66dc4f 100644 --- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs +++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs @@ -7,8 +7,8 @@ using JT1078.Gateway.Sessions; using Microsoft.Extensions.Configuration; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection.Extensions; -using Microsoft.Extensions.Options; using System; +using System.Net.Http; using System.Runtime.CompilerServices; [assembly: InternalsVisibleTo("JT1078.Gateway.Test")] @@ -58,6 +58,22 @@ namespace JT1078.Gateway return builder; } + public static IJT1078GatewayBuilder AddHttp(this IJT1078GatewayBuilder builder) + where TIJT1078Authorization: IJT1078Authorization + { + builder.JT1078Builder.Services.AddSingleton(typeof(IJT1078Authorization), typeof(TIJT1078Authorization)); + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddHostedService(); + return builder; + } + + public static IJT1078GatewayBuilder AddCoordinatorHttpClient(this IJT1078GatewayBuilder builder) + { + builder.JT1078Builder.Services.AddSingleton(); + builder.JT1078Builder.Services.AddHostedService(); + return builder; + } + public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder) { return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder); diff --git a/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs b/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs new file mode 100644 index 0000000..6ace38c --- /dev/null +++ b/src/JT1078.Gateway/Jobs/JT1078HeartbeatJob.cs @@ -0,0 +1,92 @@ +using JT1078.Gateway.Configurations; +using JT1078.Gateway.Sessions; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.Logging; +using Microsoft.Extensions.Options; +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using System.Text.Json; +using System.IO; + +namespace JT1078.Gateway.Jobs +{ + public class JT1078HeartbeatJob : BackgroundService + { + private readonly ILogger Logger; + + private readonly JT1078SessionManager SessionManager; + + private readonly JT1078HttpSessionManager HttpSessionManager; + + private readonly IOptionsMonitor Configuration; + + private readonly JT1078CoordinatorHttpClient CoordinatorHttpClient; + public JT1078HeartbeatJob( + JT1078CoordinatorHttpClient jT1078CoordinatorHttpClient, + JT1078HttpSessionManager jT1078HttpSessionManager, + IOptionsMonitor jT1078ConfigurationAccessor, + ILoggerFactory loggerFactory, + JT1078SessionManager jT1078SessionManager + ) + { + SessionManager = jT1078SessionManager; + HttpSessionManager = jT1078HttpSessionManager; + Logger = loggerFactory.CreateLogger(); + Configuration = jT1078ConfigurationAccessor; + CoordinatorHttpClient = jT1078CoordinatorHttpClient; + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) + { + try + { + await CoordinatorHttpClient.Reset(); + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Coordinator Reset]"); + } + while (!stoppingToken.IsCancellationRequested) + { + try + { + string json = ""; + using (var stream = new MemoryStream()) + { + using (var writer = new Utf8JsonWriter(stream)) + { + writer.WriteStartObject(); + writer.WriteNumber(nameof(Configuration.CurrentValue.HttpPort), Configuration.CurrentValue.HttpPort); + writer.WriteNumber(nameof(Configuration.CurrentValue.TcpPort), Configuration.CurrentValue.TcpPort); + writer.WriteNumber(nameof(Configuration.CurrentValue.UdpPort), Configuration.CurrentValue.UdpPort); + writer.WriteNumber(nameof(SessionManager.TcpSessionCount), SessionManager.TcpSessionCount); + writer.WriteNumber(nameof(SessionManager.UdpSessionCount), SessionManager.UdpSessionCount); + writer.WriteNumber(nameof(HttpSessionManager.HttpSessionCount), HttpSessionManager.HttpSessionCount); + writer.WriteNumber(nameof(HttpSessionManager.WebSocketSessionCount), HttpSessionManager.WebSocketSessionCount); + writer.WriteEndObject(); + } + json = Encoding.UTF8.GetString(stream.ToArray()); + } + if (json != "") + { + if (Logger.IsEnabled(LogLevel.Information)) + { + Logger.LogInformation(json); + } + await CoordinatorHttpClient.Heartbeat(json); + } + } + catch (Exception ex) + { + Logger.LogError(ex, $"[Coordinator Heartbeat]"); + } + finally + { + await Task.Delay(TimeSpan.FromSeconds(Configuration.CurrentValue.CoordinatorHeartbeatTimeSeconds), stoppingToken); + } + } + } + } +} diff --git a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs index 276799a..27ab7a4 100644 --- a/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs +++ b/src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs @@ -222,6 +222,22 @@ namespace JT1078.Gateway.Sessions } } + public int HttpSessionCount + { + get + { + return Sessions.Count(c=>!c.Value.IsWebSocket); + } + } + + public int WebSocketSessionCount + { + get + { + return Sessions.Count(c => c.Value.IsWebSocket); + } + } + public List GetAll() { return Sessions.Select(s => s.Value).ToList();