diff --git a/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs b/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs
deleted file mode 100644
index 8f6c35f..0000000
--- a/src/JT1078.Gateway.Abstractions/Enums/JT1078UseType.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JT1078.Gateway.Abstractions.Enums
-{
- public enum JT1078UseType : byte
- {
- ///
- /// 使用正常方式
- ///
- Normal = 1,
- ///
- /// 使用队列方式
- ///
- Queue = 2
- }
-}
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs
index 53a391f..b6266ef 100644
--- a/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs
+++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgConsumer.cs
@@ -7,7 +7,7 @@ namespace JT1078.Gateway.Abstractions
{
public interface IJT1078MsgConsumer : IJT1078PubSub, IDisposable
{
- void OnMessage(Action<(string TerminalNo, byte[] Data)> callback);
+ void OnMessage(Action<(string SIM, byte[] Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs
index 30a557e..4abab68 100644
--- a/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs
+++ b/src/JT1078.Gateway.Abstractions/IJT1078MsgProducer.cs
@@ -10,8 +10,8 @@ namespace JT1078.Gateway.Abstractions
///
///
///
- /// 设备终端号
+ /// 设备sim终端号
/// jt1078 hex data
- ValueTask ProduceAsync(string terminalNo, byte[] data);
+ ValueTask ProduceAsync(string sim, byte[] data);
}
}
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs
deleted file mode 100644
index fd574d1..0000000
--- a/src/JT1078.Gateway.Abstractions/IJT1078NormalGatewayBuilder.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-using Microsoft.Extensions.DependencyInjection;
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JT1078.Gateway.Abstractions
-{
- public interface IJT1078NormalGatewayBuilder: IJT1078GatewayBuilder
- {
-
- }
-}
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs
deleted file mode 100644
index 7b50be2..0000000
--- a/src/JT1078.Gateway.Abstractions/IJT1078PackageConsumer.cs
+++ /dev/null
@@ -1,16 +0,0 @@
-using JT1078.Protocol;
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading;
-
-namespace JT1078.Gateway.Abstractions
-{
- public interface IJT1078PackageConsumer : IJT1078PubSub, IDisposable
- {
- void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback);
- CancellationTokenSource Cts { get; }
- void Subscribe();
- void Unsubscribe();
- }
-}
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs b/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs
deleted file mode 100644
index 8f6e1d6..0000000
--- a/src/JT1078.Gateway.Abstractions/IJT1078PackageProducer.cs
+++ /dev/null
@@ -1,18 +0,0 @@
-using JT1078.Protocol;
-using System;
-using System.Collections.Generic;
-using System.Text;
-using System.Threading.Tasks;
-
-namespace JT1078.Gateway.Abstractions
-{
- public interface IJT1078PackageProducer : IJT1078PubSub, IDisposable
- {
- ///
- ///
- ///
- /// 设备终端号
- /// jt1078 package data
- ValueTask ProduceAsync(string terminalNo, JT1078Package data);
- }
-}
diff --git a/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs b/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs
deleted file mode 100644
index 84dc38e..0000000
--- a/src/JT1078.Gateway.Abstractions/IJT1078QueueGatewayBuilder.cs
+++ /dev/null
@@ -1,12 +0,0 @@
-using Microsoft.Extensions.DependencyInjection;
-using System;
-using System.Collections.Generic;
-using System.Text;
-
-namespace JT1078.Gateway.Abstractions
-{
- public interface IJT1078QueueGatewayBuilder: IJT1078GatewayBuilder
- {
-
- }
-}
diff --git a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
index 4f3d6b4..c5ad943 100644
--- a/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
+++ b/src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
@@ -1,5 +1,5 @@
-
+
netstandard2.1
8.0
@@ -17,8 +17,15 @@
false
LICENSE
true
- 1.0.0-preview2
+ $(JT1078PackageVersion)
+
+
+
+
+
+
+
diff --git a/src/JT1078.Gateway.Coordinator/Controller/UserController.cs b/src/JT1078.Gateway.Coordinator/Controller/UserController.cs
index 7267601..2655b0e 100644
--- a/src/JT1078.Gateway.Coordinator/Controller/UserController.cs
+++ b/src/JT1078.Gateway.Coordinator/Controller/UserController.cs
@@ -16,8 +16,6 @@ namespace JT1078.Gateway.Coordinator.Controller
[EnableCors("any")]
public class UserController : ControllerBase
{
-
-
///
/// 登录
///
diff --git a/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs b/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs
index bbd0470..9364aae 100644
--- a/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs
+++ b/src/JT1078.Gateway.Coordinator/Dtos/ChannelCloseRequest.cs
@@ -10,7 +10,7 @@ namespace JT1078.Gateway.Coordinator.Dtos
///
/// 设备sim卡号
///
- public string TerminalPhoneNo { get; set; }
+ public string Sim { get; set; }
///
/// 通道号
///
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
index 0577923..6c6386f 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078.Gateway.InMemoryMQ.csproj
@@ -1,5 +1,5 @@
-
+
netstandard2.1
8.0
@@ -17,7 +17,7 @@
false
LICENSE
true
- 1.0.0-preview2
+ $(JT1078PackageVersion)
@@ -25,7 +25,7 @@
-
+
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs
index 4291b15..4828a6d 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078InMemoryMQExtensions.cs
@@ -9,17 +9,17 @@ namespace JT1078.Gateway.InMemoryMQ
{
public static class JT1078InMemoryMQExtensions
{
- public static IJT1078NormalGatewayBuilder AddMsgProducer(this IJT1078NormalGatewayBuilder builder)
+ public static IJT1078GatewayBuilder AddMsgProducer(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.TryAddSingleton();
- builder.JT1078Builder.Services.AddSingleton();
+ builder.JT1078Builder.Services.AddSingleton();
return builder;
}
- public static IJT1078NormalGatewayBuilder AddMsgConsumer(this IJT1078NormalGatewayBuilder builder)
+ public static IJT1078GatewayBuilder AddMsgConsumer(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.TryAddSingleton();
- builder.JT1078Builder.Services.AddSingleton();
+ builder.JT1078Builder.Services.AddSingleton();
return builder;
}
}
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs
index ae3ff54..943e96e 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgChannel.cs
@@ -7,11 +7,11 @@ namespace JT1078.Gateway.InMemoryMQ
{
public class JT1078MsgChannel
{
- public Channel<(string, JT1078.Protocol.JT1078Package)> Channel { get;}
+ public Channel<(string, byte[])> Channel { get;}
public JT1078MsgChannel()
{
- Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, JT1078.Protocol.JT1078Package)>();
+ Channel = System.Threading.Channels.Channel.CreateUnbounded<(string, byte[])>();
}
}
}
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs
similarity index 56%
rename from src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs
rename to src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs
index 31ccaa0..febad0f 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageConsumer.cs
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgConsumer.cs
@@ -1,25 +1,18 @@
using JT1078.Gateway.Abstractions;
-using JT1078.Protocol;
-using Microsoft.Extensions.Logging;
using System;
-using System.Collections.Generic;
-using System.Text;
using System.Text.Json;
-using System.Text.Json.Serialization;
using System.Threading;
using System.Threading.Tasks;
namespace JT1078.Gateway.InMemoryMQ
{
- public class JT1078PackageConsumer: IJT1078PackageConsumer
+ public class JT1078MsgConsumer : IJT1078MsgConsumer
{
private JT1078MsgChannel Channel;
- private readonly ILogger logger;
- public JT1078PackageConsumer(ILoggerFactory loggerFactory,JT1078MsgChannel channel)
+ public JT1078MsgConsumer(JT1078MsgChannel channel)
{
Channel = channel;
- logger = loggerFactory.CreateLogger();
}
public CancellationTokenSource Cts { get; } = new CancellationTokenSource();
@@ -31,17 +24,13 @@ namespace JT1078.Gateway.InMemoryMQ
}
- public void OnMessage(Action<(string TerminalNo, JT1078Package Data)> callback)
+ public void OnMessage(Action<(string SIM, byte[] Data)> callback)
{
Task.Run(async() =>
{
while (!Cts.IsCancellationRequested)
{
var reader = await Channel.Channel.Reader.ReadAsync(Cts.Token);
- if (logger.IsEnabled(LogLevel.Trace))
- {
- logger.LogTrace(JsonSerializer.Serialize(reader.Item2));
- }
callback(reader);
}
}, Cts.Token);
diff --git a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs
similarity index 63%
rename from src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs
rename to src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs
index f33134b..1b93f42 100644
--- a/src/JT1078.Gateway.InMemoryMQ/JT1078PackageProducer.cs
+++ b/src/JT1078.Gateway.InMemoryMQ/JT1078MsgProducer.cs
@@ -8,13 +8,13 @@ using System.Threading.Tasks;
namespace JT1078.Gateway.InMemoryMQ
{
- public class JT1078PackageProducer : IJT1078PackageProducer
+ public class JT1078MsgProducer : IJT1078MsgProducer
{
public string TopicName { get; }= "JT1078Package";
private JT1078MsgChannel Channel;
- public JT1078PackageProducer(JT1078MsgChannel channel)
+ public JT1078MsgProducer(JT1078MsgChannel channel)
{
Channel = channel;
}
@@ -24,9 +24,9 @@ namespace JT1078.Gateway.InMemoryMQ
}
- public async ValueTask ProduceAsync(string terminalNo, JT1078Package data)
+ public async ValueTask ProduceAsync(string sim, byte[] data)
{
- await Channel.Channel.Writer.WriteAsync((terminalNo, data));
+ await Channel.Channel.Writer.WriteAsync((sim, data));
}
}
}
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config
index 333e593..8c8388d 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config
@@ -15,6 +15,9 @@
+
@@ -32,6 +35,7 @@
-
+
+
\ No newline at end of file
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config
index 07098c1..78f7b08 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config
@@ -23,6 +23,9 @@
+
@@ -37,8 +40,9 @@
-
+
-
+
+
\ No newline at end of file
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
index 4793456..5e000ab 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
@@ -14,7 +14,7 @@
-
+
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
index 6f96a52..ff8fd8b 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
@@ -39,7 +39,7 @@ namespace JT1078.Gateway.TestNormalHosting
services.AddSingleton();
services.AddSingleton(new M3U8Option
{
-
+
});
services.AddSingleton();
//使用内存队列实现会话通知
@@ -48,9 +48,9 @@ namespace JT1078.Gateway.TestNormalHosting
.AddUdp()
.AddHttp()
//.AddCoordinatorHttpClient()
- .AddNormal()
.AddMsgProducer()
.AddMsgConsumer();
+ //内存队列没有做分发,可以自己实现。
services.AddHostedService();
//services.AddHostedService();
});
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs
index 129a8ff..27a9ce7 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs
@@ -19,7 +19,7 @@ namespace JT1078.Gateway.TestNormalHosting.Services
{
public class JT1078FlvNormalMsgHostedService : BackgroundService
{
- private IJT1078PackageConsumer PackageConsumer;
+ private IJT1078MsgConsumer JT1078MsgConsumer;
private JT1078HttpSessionManager HttpSessionManager;
private FlvEncoder FlvEncoder;
private ILogger Logger;
@@ -31,74 +31,73 @@ namespace JT1078.Gateway.TestNormalHosting.Services
ILoggerFactory loggerFactory,
FlvEncoder flvEncoder,
JT1078HttpSessionManager httpSessionManager,
- IJT1078PackageConsumer packageConsumer)
+ IJT1078MsgConsumer msgConsumer)
{
Logger = loggerFactory.CreateLogger();
- PackageConsumer = packageConsumer;
+ JT1078MsgConsumer = msgConsumer;
HttpSessionManager = httpSessionManager;
FlvEncoder = flvEncoder;
this.memoryCache = memoryCache;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
- PackageConsumer.OnMessage((Message) =>
+ JT1078MsgConsumer.OnMessage((Message) =>
{
+ JT1078Package package = JT1078Serializer.Deserialize(Message.Data);
+ if (Logger.IsEnabled(LogLevel.Debug))
+ {
+ Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
+ Logger.LogDebug($"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
+ }
try
{
- if (Logger.IsEnabled(LogLevel.Debug))
+ var merge = JT1078Serializer.Merge(package);
+ if (merge == null) return;
+ string key = $"{package.GetKey()}_{ikey}";
+ if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
{
- Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
- Logger.LogDebug($"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
+ memoryCache.Set(key, merge);
}
- var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data);
- string key = $"{Message.Data.GetKey()}_{ikey}";
- if (merge != null)
+ var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(package.SIM.TrimStart('0'), package.LogicChannelNumber);
+ var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
+ if (firstHttpSessions.Count > 0)
{
- if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
- {
- memoryCache.Set(key, merge);
- }
- var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM.TrimStart('0'), Message.Data.LogicChannelNumber);
- var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
- if (firstHttpSessions.Count > 0)
- {
- if (memoryCache.TryGetValue(key, out JT1078Package idata))
- {
- try
- {
- var flvVideoBuffer = FlvEncoder.EncoderVideoTag(idata, true);
- foreach (var session in firstHttpSessions)
- {
- HttpSessionManager.SendAVData(session, flvVideoBuffer, true);
- }
- }
- catch (Exception ex)
- {
- Logger.LogError(ex, $"{Message.Data.SIM},{true},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
- }
- }
- }
- var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList();
- if (otherHttpSessions.Count > 0)
+ if (memoryCache.TryGetValue(key, out JT1078Package idata))
{
try
{
- var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
- foreach (var session in otherHttpSessions)
+ var flvVideoBuffer = FlvEncoder.EncoderVideoTag(idata, true);
+ foreach (var session in firstHttpSessions)
{
- HttpSessionManager.SendAVData(session, flvVideoBuffer, false);
+ HttpSessionManager.SendAVData(session, flvVideoBuffer, true);
}
}
catch (Exception ex)
{
- Logger.LogError(ex, $"{Message.Data.SIM},{false},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
+ Logger.LogError(ex, $"{package.SIM},{true},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
+ }
+ }
+ }
+ var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList();
+ if (otherHttpSessions.Count > 0)
+ {
+ try
+ {
+ var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
+ foreach (var session in otherHttpSessions)
+ {
+ HttpSessionManager.SendAVData(session, flvVideoBuffer, false);
}
}
+ catch (Exception ex)
+ {
+ Logger.LogError(ex, $"{package.SIM},{false},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
+ }
}
}
catch (Exception ex)
{
- Logger.LogError(ex, $"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
+ Logger.LogError(ex, $"{package.SIM},{package.SN},{package.LogicChannelNumber},{package.Label3.DataType.ToString()},{package.Label3.SubpackageType.ToString()},{package.Bodies.ToHexString()}");
}
});
return Task.CompletedTask;
diff --git a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs
index bae5a20..4767edd 100644
--- a/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs
+++ b/src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078HlsNormalMsgHostedService.cs
@@ -1,6 +1,7 @@
using JT1078.Gateway.Abstractions;
using JT1078.Gateway.Sessions;
using JT1078.Hls;
+using JT1078.Protocol;
using Microsoft.Extensions.Hosting;
using System;
using System.Collections.Generic;
@@ -13,23 +14,24 @@ namespace JT1078.Gateway.TestNormalHosting.Services
{
public class JT1078HlsNormalMsgHostedService : BackgroundService
{
- private IJT1078PackageConsumer PackageConsumer;
+ private IJT1078MsgConsumer MsgConsumer;
private JT1078HttpSessionManager HttpSessionManager;
private M3U8FileManage M3U8FileManage;
public JT1078HlsNormalMsgHostedService(
M3U8FileManage M3U8FileManage,
JT1078HttpSessionManager httpSessionManager,
- IJT1078PackageConsumer packageConsumer)
+ IJT1078MsgConsumer msgConsumer)
{
- PackageConsumer = packageConsumer;
+ MsgConsumer = msgConsumer;
HttpSessionManager = httpSessionManager;
this.M3U8FileManage = M3U8FileManage;
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
- PackageConsumer.OnMessage((Message) =>
+ MsgConsumer.OnMessage((Message) =>
{
- var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data);
+ JT1078Package package = JT1078Serializer.Deserialize(Message.Data);
+ var merge = JT1078.Protocol.JT1078Serializer.Merge(package);
if (merge != null)
{
var hasHttpSessionn = HttpSessionManager.GetAllHttpContextBySimAndChannelNo(merge.SIM, merge.LogicChannelNumber);
diff --git a/src/JT1078.Gateway.sln b/src/JT1078.Gateway.sln
index ac29fd6..4cf5778 100644
--- a/src/JT1078.Gateway.sln
+++ b/src/JT1078.Gateway.sln
@@ -3,30 +3,26 @@ Microsoft Visual Studio Solution File, Format Version 12.00
# Visual Studio Version 16
VisualStudioVersion = 16.0.29418.71
MinimumVisualStudioVersion = 10.0.40219.1
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{39A90F1C-FC46-4905-81F1-3AEE44D6D86D}"
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Test", "JT1078.Gateway.Tests\JT1078.Gateway.Test\JT1078.Gateway.Test.csproj", "{DAC80A8E-4172-451F-910D-9032BF8640F9}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{04C0F72A-5BC4-4CEE-B1E9-CCFAA72E373E}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Abstractions", "JT1078.Gateway.Abstractions\JT1078.Gateway.Abstractions.csproj", "{EE50F2A6-5F28-4640-BC20-44A8BED8F311}"
EndProject
-Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{C7554790-0B3B-490F-B2F1-436C1FD0B4DD}"
-EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.TestNormalHosting", "JT1078.Gateway.Tests\JT1078.Gateway.TestNormalHosting\JT1078.Gateway.TestNormalHosting.csproj", "{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.Coordinator", "JT1078.Gateway.Coordinator\JT1078.Gateway.Coordinator.csproj", "{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}"
EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway", "JT1078.Gateway\JT1078.Gateway.csproj", "{9042CA92-E01A-46CF-9C82-D954325A69B8}"
+EndProject
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT1078.Gateway.InMemoryMQ", "JT1078.Gateway.InMemoryMQ\JT1078.Gateway.InMemoryMQ.csproj", "{011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Release|Any CPU = Release|Any CPU
EndGlobalSection
GlobalSection(ProjectConfigurationPlatforms) = postSolution
- {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {39A90F1C-FC46-4905-81F1-3AEE44D6D86D}.Release|Any CPU.Build.0 = Release|Any CPU
{DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{DAC80A8E-4172-451F-910D-9032BF8640F9}.Debug|Any CPU.Build.0 = Debug|Any CPU
{DAC80A8E-4172-451F-910D-9032BF8640F9}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -35,10 +31,6 @@ Global
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Debug|Any CPU.Build.0 = Debug|Any CPU
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.ActiveCfg = Release|Any CPU
{EE50F2A6-5F28-4640-BC20-44A8BED8F311}.Release|Any CPU.Build.0 = Release|Any CPU
- {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
- {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Debug|Any CPU.Build.0 = Debug|Any CPU
- {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.ActiveCfg = Release|Any CPU
- {C7554790-0B3B-490F-B2F1-436C1FD0B4DD}.Release|Any CPU.Build.0 = Release|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6E2DAA64-E2A1-4459-BE61-E807B4EF2CCE}.Release|Any CPU.ActiveCfg = Release|Any CPU
@@ -47,6 +39,14 @@ Global
{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Debug|Any CPU.Build.0 = Debug|Any CPU
{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.ActiveCfg = Release|Any CPU
{B0A24D3A-FDA3-4DFE-9C31-032C7C22F303}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9042CA92-E01A-46CF-9C82-D954325A69B8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9042CA92-E01A-46CF-9C82-D954325A69B8}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9042CA92-E01A-46CF-9C82-D954325A69B8}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9042CA92-E01A-46CF-9C82-D954325A69B8}.Release|Any CPU.Build.0 = Release|Any CPU
+ {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {011934D6-BEE7-4CDA-9F67-4D6D7D672D6C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
diff --git a/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs
deleted file mode 100644
index e99363a..0000000
--- a/src/JT1078.Gateway/Impl/JT1078NormalGatewayBuilderDefault.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using JT1078.Gateway.Abstractions;
-
-namespace JT1078.Gateway.Impl
-{
- public class JT1078NormalGatewayBuilderDefault : IJT1078NormalGatewayBuilder
- {
- public IJT1078Builder JT1078Builder { get; }
-
- public JT1078NormalGatewayBuilderDefault(IJT1078Builder builder)
- {
- JT1078Builder = builder;
- }
-
- public IJT1078Builder Builder()
- {
- return JT1078Builder;
- }
- }
-}
\ No newline at end of file
diff --git a/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs b/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs
deleted file mode 100644
index faf51c0..0000000
--- a/src/JT1078.Gateway/Impl/JT1078QueueGatewayBuilderDefault.cs
+++ /dev/null
@@ -1,19 +0,0 @@
-using JT1078.Gateway.Abstractions;
-
-namespace JT1078.Gateway.Impl
-{
- public class JT1078QueueGatewayBuilderDefault : IJT1078QueueGatewayBuilder
- {
- public IJT1078Builder JT1078Builder { get; }
-
- public JT1078QueueGatewayBuilderDefault(IJT1078Builder builder)
- {
- JT1078Builder = builder;
- }
-
- public IJT1078Builder Builder()
- {
- return JT1078Builder;
- }
- }
-}
\ No newline at end of file
diff --git a/src/JT1078.Gateway/JT1078.Gateway.csproj b/src/JT1078.Gateway/JT1078.Gateway.csproj
index 5c94106..39b649e 100644
--- a/src/JT1078.Gateway/JT1078.Gateway.csproj
+++ b/src/JT1078.Gateway/JT1078.Gateway.csproj
@@ -1,4 +1,5 @@
+
netstandard2.1
8.0
@@ -16,7 +17,7 @@
false
LICENSE
true
- 1.0.0-preview2
+ $(JT1078PackageVersion)
diff --git a/src/JT1078.Gateway/JT1078GatewayExtensions.cs b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
index c66dc4f..f9551c7 100644
--- a/src/JT1078.Gateway/JT1078GatewayExtensions.cs
+++ b/src/JT1078.Gateway/JT1078GatewayExtensions.cs
@@ -66,7 +66,7 @@ namespace JT1078.Gateway
builder.JT1078Builder.Services.AddHostedService();
return builder;
}
-
+
public static IJT1078GatewayBuilder AddCoordinatorHttpClient(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton();
@@ -74,16 +74,6 @@ namespace JT1078.Gateway
return builder;
}
- public static IJT1078NormalGatewayBuilder AddNormal(this IJT1078GatewayBuilder builder)
- {
- return new JT1078NormalGatewayBuilderDefault(builder.JT1078Builder);
- }
-
- public static IJT1078QueueGatewayBuilder AddQueue(this IJT1078GatewayBuilder builder)
- {
- return new JT1078QueueGatewayBuilderDefault(builder.JT1078Builder);
- }
-
internal static IJT1078GatewayBuilder AddJT1078Core(this IJT1078GatewayBuilder builder)
{
builder.JT1078Builder.Services.AddSingleton();
diff --git a/src/JT1078.Gateway/JT1078TcpServer.cs b/src/JT1078.Gateway/JT1078TcpServer.cs
index 2997c87..f73f80a 100644
--- a/src/JT1078.Gateway/JT1078TcpServer.cs
+++ b/src/JT1078.Gateway/JT1078TcpServer.cs
@@ -1,7 +1,6 @@
using System;
using System.Buffers;
using System.Buffers.Binary;
-using System.Collections.Generic;
using System.IO.Pipelines;
using System.Linq;
using System.Net;
@@ -10,7 +9,6 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT1078.Gateway.Abstractions;
-using JT1078.Gateway.Abstractions.Enums;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using JT1078.Protocol;
@@ -34,34 +32,8 @@ namespace JT1078.Gateway
private readonly JT1078SessionManager SessionManager;
- private readonly IJT1078PackageProducer jT1078PackageProducer;
-
private readonly IJT1078MsgProducer jT1078MsgProducer;
- private readonly JT1078UseType jT1078UseType;
-
- ///
- /// 使用正常方式
- ///
- ///
- ///
- ///
- ///
- public JT1078TcpServer(
- IJT1078PackageProducer jT1078PackageProducer,
- IOptions jT1078ConfigurationAccessor,
- ILoggerFactory loggerFactory,
- JT1078SessionManager jT1078SessionManager)
- {
- SessionManager = jT1078SessionManager;
- jT1078UseType = JT1078UseType.Normal;
- Logger = loggerFactory.CreateLogger();
- LogLogger = loggerFactory.CreateLogger("JT1078Logging");
- Configuration = jT1078ConfigurationAccessor.Value;
- this.jT1078PackageProducer = jT1078PackageProducer;
- InitServer();
- }
-
///
/// 使用队列方式
///
@@ -70,15 +42,14 @@ namespace JT1078.Gateway
///
///
public JT1078TcpServer(
- IJT1078MsgProducer jT1078MsgProducer,
+ IJT1078MsgProducer jT1078MsgProducer,
IOptions jT1078ConfigurationAccessor,
ILoggerFactory loggerFactory,
JT1078SessionManager jT1078SessionManager)
{
SessionManager = jT1078SessionManager;
- jT1078UseType = JT1078UseType.Queue;
Logger = loggerFactory.CreateLogger();
- LogLogger = loggerFactory.CreateLogger("JT1078Logging");
+ LogLogger = loggerFactory.CreateLogger("JT1078.Gateway.JT1078Logging");
Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078MsgProducer = jT1078MsgProducer;
InitServer();
@@ -266,14 +237,7 @@ namespace JT1078.Gateway
try
{
SessionManager.TryLink(fixedHeaderInfo.SIM, session);
- if (jT1078UseType == JT1078UseType.Queue)
- {
- jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.ToArray());
- }
- else
- {
- jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1));
- }
+ jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.ToArray());
}
catch (Exception ex)
{
@@ -307,14 +271,7 @@ namespace JT1078.Gateway
try
{
SessionManager.TryLink(fixedHeaderInfo.SIM, session);
- if (jT1078UseType == JT1078UseType.Queue)
- {
- jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package);
- }
- else
- {
- jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package));
- }
+ jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package);
}
catch (Exception ex)
{
diff --git a/src/JT1078.Gateway/JT1078UdpServer.cs b/src/JT1078.Gateway/JT1078UdpServer.cs
index b4c13f6..de2bd71 100644
--- a/src/JT1078.Gateway/JT1078UdpServer.cs
+++ b/src/JT1078.Gateway/JT1078UdpServer.cs
@@ -1,20 +1,13 @@
using System;
using System.Buffers;
-using System.Buffers.Binary;
-using System.Collections.Generic;
-using System.IO.Pipelines;
-using System.Linq;
using System.Net;
using System.Net.Sockets;
-using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT1078.Gateway.Abstractions;
-using JT1078.Gateway.Abstractions.Enums;
using JT1078.Gateway.Configurations;
using JT1078.Gateway.Sessions;
using JT1078.Protocol;
-using JT1078.Protocol.Enums;
using JT1078.Protocol.Extensions;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
@@ -32,33 +25,8 @@ namespace JT1078.Gateway
private readonly JT1078SessionManager SessionManager;
- private readonly IJT1078PackageProducer jT1078PackageProducer;
-
private readonly IJT1078MsgProducer jT1078MsgProducer;
- private readonly JT1078UseType jT1078UseType;
-
- ///
- /// 使用正常方式
- ///
- ///
- ///
- ///
- ///
- public JT1078UdpServer(
- IJT1078PackageProducer jT1078PackageProducer,
- IOptions jT1078ConfigurationAccessor,
- ILoggerFactory loggerFactory,
- JT1078SessionManager jT1078SessionManager)
- {
- SessionManager = jT1078SessionManager;
- jT1078UseType = JT1078UseType.Normal;
- Logger = loggerFactory.CreateLogger();
- Configuration = jT1078ConfigurationAccessor.Value;
- this.jT1078PackageProducer = jT1078PackageProducer;
- InitServer();
- }
-
///
/// 使用队列方式
///
@@ -73,7 +41,6 @@ namespace JT1078.Gateway
JT1078SessionManager jT1078SessionManager)
{
SessionManager = jT1078SessionManager;
- jT1078UseType = JT1078UseType.Queue;
Logger = loggerFactory.CreateLogger();
Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078MsgProducer = jT1078MsgProducer;
@@ -134,14 +101,7 @@ namespace JT1078.Gateway
{
Logger.LogInformation($"[Connected]:{receiveMessageFromResult.RemoteEndPoint}");
}
- if (jT1078UseType == JT1078UseType.Queue)
- {
- jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray());
- }
- else
- {
- jT1078PackageProducer.ProduceAsync(package.SIM, package);
- }
+ jT1078MsgProducer.ProduceAsync(package.SIM, buffer.ToArray());
}
catch (NotImplementedException ex)
{
diff --git a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
index 3de8190..b8ae525 100644
--- a/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
+++ b/src/JT1078.Gateway/Jobs/JT1078SessionNoticeJob.cs
@@ -38,14 +38,14 @@ namespace JT1078.Gateway.Jobs
{
if (logger.IsEnabled(LogLevel.Information))
{
- logger.LogInformation($"[Notice]:{notice.TerminalPhoneNo}-{notice.ProtocolType}-{notice.SessionType}");
+ logger.LogInformation($"[Notice]:{notice.SIM}-{notice.ProtocolType}-{notice.SessionType}");
}
if(JT1078GatewayConstants.SessionOffline== notice.SessionType)
{
if (HttpSessionManager != null)
{
//当1078设备主动断开的情况下,需要关闭所有再观看的连接
- HttpSessionManager.TryRemoveBySim(notice.TerminalPhoneNo);
+ HttpSessionManager.TryRemoveBySim(notice.SIM);
}
}
}
diff --git a/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs
index 3d1b70d..ecdb843 100644
--- a/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs
+++ b/src/JT1078.Gateway/Services/JT1078SessionNoticeService.cs
@@ -7,10 +7,10 @@ namespace JT1078.Gateway.Services
{
public class JT1078SessionNoticeService
{
- public BlockingCollection<(string SessionType, string TerminalPhoneNo,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; }
+ public BlockingCollection<(string SessionType, string SIM,string ProtocolType)> SessionNoticeBlockingCollection { get;internal set; }
public JT1078SessionNoticeService()
{
- SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string TerminalPhoneNo, string ProtocolType)>();
+ SessionNoticeBlockingCollection = new BlockingCollection<(string SessionType, string SIM, string ProtocolType)>();
}
}
}
diff --git a/src/Version.props b/src/Version.props
new file mode 100644
index 0000000..19ae7ec
--- /dev/null
+++ b/src/Version.props
@@ -0,0 +1,5 @@
+
+
+ 1.0.0-preview3
+
+
\ No newline at end of file