Ver código fonte

1.修改http下发数据

2.修改tcp解析还存在问题
master
SmallChi(Koike) 4 anos atrás
pai
commit
c45dc2b1f4
15 arquivos alterados com 3586 adições e 87 exclusões
  1. +1
    -1
      src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj
  2. +156
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs
  3. +3106
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/NLog.xsd
  4. +37
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config
  5. +44
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config
  6. +12
    -2
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj
  7. +4
    -1
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs
  8. +65
    -11
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs
  9. +3
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/startup.ini
  10. +7
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv.min.js
  11. +41
    -0
      src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html
  12. +12
    -9
      src/JT1078.Gateway/JT1078HttpServer.cs
  13. +61
    -22
      src/JT1078.Gateway/JT1078TcpServer.cs
  14. +3
    -0
      src/JT1078.Gateway/Metadata/JT1078HttpContext.cs
  15. +34
    -41
      src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs

+ 1
- 1
src/JT1078.Gateway.Abstractions/JT1078.Gateway.Abstractions.csproj Ver arquivo

@@ -25,7 +25,7 @@
</ItemGroup> </ItemGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT1078" Version="1.0.3" />
<PackageReference Include="JT1078" Version="1.0.4-preview1" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.Hosting.Abstractions" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.7" />


+ 156
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.Test/PipeTest.cs
Diferenças do arquivo suprimidas por serem muito extensas
Ver arquivo


+ 3106
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/NLog.xsd
Diferenças do arquivo suprimidas por serem muito extensas
Ver arquivo


+ 37
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Unix.config Ver arquivo

@@ -0,0 +1,37 @@
<?xml version="1.0" encoding="utf-8" ?>
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="/data/serviceslogs/JT1078.Gateway.TestNormalHosting/internalLog.txt"
internalLogLevel="Debug">
<variable name="Directory" value="/data/serviceslogs/JT1078.Gateway.TestNormalHosting"/>
<targets>
<target name="all" xsi:type="File"
fileName="${Directory}/all.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="JT1078FlvNormalMsgHostedService" xsi:type="File"
fileName="${Directory}/JT1078FlvNormalMsgHostedService.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="JT1078TcpServer" xsi:type="File"
fileName="${Directory}/JT1078TcpServer.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="JT1078Logging" xsi:type="File"
fileName="${Directory}/JT1078Logging.${shortdate}.log"
layout="${message}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="all"/>
<logger name="JT1078.Gateway.TestNormalHosting.Services.JT1078FlvNormalMsgHostedService" minlevel="Trace" writeTo="JT1078FlvNormalMsgHostedService"/>
<logger name="JT1078.Gateway.JT1078TcpServer" minlevel="Trace" writeTo="JT1078TcpServer"/>
<logger name="JT1078Logging" minlevel="Trace" writeTo="JT1078Logging"/>
</rules>
</nlog>

+ 44
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Configs/nlog.Win32NT.config Ver arquivo

@@ -0,0 +1,44 @@
<?xml version="1.0" encoding="utf-8" ?>
<!--
参考:http://www.cnblogs.com/fuchongjundream/p/3936431.html
autoReload:自动再配置
internalLogFile:可以让NLog把内部的调试和异常信息都写入指定文件里程序没问题了,日志却出了问题。这个该怎么办,到底是哪里不正确了?假如日志本身除了bug该如何解决?这就需要日志排错。把日志的错误信息写入日志。
<nlog throwExceptions="true" />
<nlog internalLogFile="file.txt" />- 设置internalLogFile属性可以让NLog把内部的调试和异常信息都写入指定文件里。
<nlog internalLogLevel="Trace|Debug|Info|Warn|Error|Fatal" /> - 决定内部日志的级别,级别越高,输出的日志信息越简洁。
<nlog internalLogToConsole="false|true" /> - 是否把内部日志输出到标准控制台。
<nlog internalLogToConsoleError="false|true" /> - 是否把内部日志输出到标准错误控制台 (stderr)。
设置throwExceptions属性为“true”可以让NLog不再阻挡这类异常,而是把它们抛给调用者。在部署是这样做可以帮我们快速定位问题。一旦应用程序已经正确配置了,我们建议把throwExceptions的值设为“false”,这样由于日志引发的问题不至于导致应用程序的崩溃。
-->
<nlog xmlns="http://www.nlog-project.org/schemas/NLog.xsd" xsi:schemaLocation="NLog NLog.xsd"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
autoReload="true"
internalLogFile="/wwwroot/logs/JT1078.Gateway.TestNormalHosting/internalLog.txt"
internalLogLevel="Debug" >
<variable name="Directory" value="/wwwroot/logs/JT1078.Gateway.TestNormalHosting"/>
<targets>
<target name="all" xsi:type="File"
fileName="${Directory}/all.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="JT1078TcpServer" xsi:type="File"
fileName="${Directory}/JT1078TcpServer.${shortdate}.log"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level}:${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}"/>
<target name="JT1078Logging" xsi:type="File"
fileName="${Directory}/JT1078Logging.${shortdate}.log"
layout="${message}"/>
<target name="console" xsi:type="ColoredConsole"
useDefaultRowHighlightingRules="false"
layout="${date:format=yyyyMMddHHmmss} ${callsite} ${level} ${message} ${onexception:${exception:format=tostring} ${newline} ${stacktrace} ${newline}">
<highlight-row condition="level == LogLevel.Debug" foregroundColor="DarkGray" />
<highlight-row condition="level == LogLevel.Info" foregroundColor="Gray" />
<highlight-row condition="level == LogLevel.Warn" foregroundColor="Yellow" />
<highlight-row condition="level == LogLevel.Error" foregroundColor="Red" />
<highlight-row condition="level == LogLevel.Fatal" foregroundColor="Red" backgroundColor="White" />
</target>
</targets>
<rules>
<logger name="*" minlevel="Trace" writeTo="all,console"/>
<logger name="JT1078.Gateway.JT1078TcpServer" minlevel="Trace" writeTo="JT1078TcpServer,console"/>
<logger name="JT1078Logging" minlevel="Trace" writeTo="JT1078Logging,console"/>
</rules>
</nlog>

+ 12
- 2
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/JT1078.Gateway.TestNormalHosting.csproj Ver arquivo

@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">


<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
@@ -6,12 +6,13 @@
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="JT1078.Flv" Version="1.0.0-preview8" />
<PackageReference Include="JT1078.Flv" Version="1.0.0-preview11" />
<PackageReference Include="JT1078.Hls" Version="1.0.0-preview2" /> <PackageReference Include="JT1078.Hls" Version="1.0.0-preview2" />
<PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.Caching.Memory" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.7" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.7" /> <PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.7" />
<PackageReference Include="NLog.Extensions.Logging" Version="1.6.4" />
</ItemGroup> </ItemGroup>


<ItemGroup> <ItemGroup>
@@ -23,6 +24,15 @@
<None Update="appsettings.json"> <None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>
<None Update="Configs\nlog.Unix.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\nlog.Win32NT.config">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="Configs\NLog.xsd">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="wwwroot\demo\demo.m3u8"> <None Update="wwwroot\demo\demo.m3u8">
<CopyToOutputDirectory>Always</CopyToOutputDirectory> <CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None> </None>


+ 4
- 1
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Program.cs Ver arquivo

@@ -7,6 +7,7 @@ using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging; using Microsoft.Extensions.Logging;
using NLog.Extensions.Logging;
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;


@@ -24,8 +25,10 @@ namespace JT1078.Gateway.TestNormalHosting
}) })
.ConfigureLogging((context, logging) => .ConfigureLogging((context, logging) =>
{ {
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Trace); logging.SetMinimumLevel(LogLevel.Trace);
Console.WriteLine($"Environment.OSVersion.Platform:{Environment.OSVersion.Platform.ToString()}");
NLog.LogManager.LoadConfiguration($"Configs/nlog.{Environment.OSVersion.Platform.ToString()}.config");
logging.AddNLog(new NLogProviderOptions { CaptureMessageTemplates = true, CaptureMessageProperties = true });
}) })
.ConfigureServices((hostContext, services) => .ConfigureServices((hostContext, services) =>
{ {


+ 65
- 11
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/Services/JT1078FlvNormalMsgHostedService.cs Ver arquivo

@@ -8,6 +8,12 @@ using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using System.Linq; using System.Linq;
using Microsoft.Extensions.Logging;
using JT1078.Flv.Extensions;
using Microsoft.Extensions.Caching.Memory;
using JT1078.Protocol;
using System.Text.Json;
using System.Text.Json.Serialization;


namespace JT1078.Gateway.TestNormalHosting.Services namespace JT1078.Gateway.TestNormalHosting.Services
{ {
@@ -16,36 +22,84 @@ namespace JT1078.Gateway.TestNormalHosting.Services
private IJT1078PackageConsumer PackageConsumer; private IJT1078PackageConsumer PackageConsumer;
private JT1078HttpSessionManager HttpSessionManager; private JT1078HttpSessionManager HttpSessionManager;
private FlvEncoder FlvEncoder; private FlvEncoder FlvEncoder;
private ILogger Logger;
private IMemoryCache memoryCache;
private const string ikey = "IKEY";

public JT1078FlvNormalMsgHostedService( public JT1078FlvNormalMsgHostedService(
IMemoryCache memoryCache,
ILoggerFactory loggerFactory,
FlvEncoder flvEncoder, FlvEncoder flvEncoder,
JT1078HttpSessionManager httpSessionManager, JT1078HttpSessionManager httpSessionManager,
IJT1078PackageConsumer packageConsumer) IJT1078PackageConsumer packageConsumer)
{ {
Logger = loggerFactory.CreateLogger<JT1078FlvNormalMsgHostedService>();
PackageConsumer = packageConsumer; PackageConsumer = packageConsumer;
HttpSessionManager = httpSessionManager; HttpSessionManager = httpSessionManager;
FlvEncoder = flvEncoder; FlvEncoder = flvEncoder;
this.memoryCache = memoryCache;
} }
protected override Task ExecuteAsync(CancellationToken stoppingToken) protected override Task ExecuteAsync(CancellationToken stoppingToken)
{ {
PackageConsumer.OnMessage((Message) => PackageConsumer.OnMessage((Message) =>
{ {
var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data);
if (merge != null)
try
{ {
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM, Message.Data.LogicChannelNumber);
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
if (firstHttpSessions.Count > 0)
if (Logger.IsEnabled(LogLevel.Debug))
{ {
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, true);
HttpSessionManager.SendAVData(firstHttpSessions, flvVideoBuffer, true);
Logger.LogDebug(JsonSerializer.Serialize(HttpSessionManager.GetAll()));
Logger.LogDebug($"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
} }
var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList();
if (otherHttpSessions.Count > 0)
var merge = JT1078.Protocol.JT1078Serializer.Merge(Message.Data);
string key = $"{Message.Data.GetKey()}_{ikey}";
if (merge != null)
{ {
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
HttpSessionManager.SendAVData(otherHttpSessions, flvVideoBuffer, false);
if (merge.Label3.DataType == Protocol.Enums.JT1078DataType.视频I帧)
{
memoryCache.Set(key, merge);
}
var httpSessions = HttpSessionManager.GetAllBySimAndChannelNo(Message.Data.SIM.TrimStart('0'), Message.Data.LogicChannelNumber);
var firstHttpSessions = httpSessions.Where(w => !w.FirstSend).ToList();
if (firstHttpSessions.Count > 0)
{
if (memoryCache.TryGetValue(key, out JT1078Package idata))
{
try
{
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(idata, true);
foreach (var session in firstHttpSessions)
{
HttpSessionManager.SendAVData(session, flvVideoBuffer, true);
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"{Message.Data.SIM},{true},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
}
}
}
var otherHttpSessions = httpSessions.Where(w => w.FirstSend).ToList();
if (otherHttpSessions.Count > 0)
{
try
{
var flvVideoBuffer = FlvEncoder.EncoderVideoTag(merge, false);
foreach (var session in otherHttpSessions)
{
HttpSessionManager.SendAVData(session, flvVideoBuffer, false);
}
}
catch (Exception ex)
{
Logger.LogError(ex, $"{Message.Data.SIM},{false},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
}
}
} }
} }
catch (Exception ex)
{
Logger.LogError(ex, $"{Message.Data.SIM},{Message.Data.SN},{Message.Data.LogicChannelNumber},{Message.Data.Label3.DataType.ToString()},{Message.Data.Label3.SubpackageType.ToString()},{Message.Data.Bodies.ToHexString()}");
}
}); });
return Task.CompletedTask; return Task.CompletedTask;
} }


+ 3
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/startup.ini Ver arquivo

@@ -0,0 +1,3 @@
pm2 start "dotnet JT1078.Gateway.TestNormalHosting.dll ASPNETCORE_ENVIRONMENT=Production" --max-restarts=1 -n "JT1078.Gateway.TestNormalHosting" -o "/data/pm2Logs/JT1078.Gateway.TestNormalHosting/out.log" -e "/data/pm2Logs/JT1078.Gateway.TestNormalHosting/error.log"

pm2 start "dotnet JT1078.Gateway.TestNormalHosting.dll ASPNETCORE_ENVIRONMENT=Development" --max-restarts=1 -n "JT1078.Gateway.TestNormalHosting.Dev" -o "/data/pm2Logs/JT1078.Gateway.TestNormalHosting.Dev/out.log" -e "/data/pm2Logs/JT1078.Gateway.TestNormalHosting.Dev/error.log"

+ 7
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/flv.min.js
Diferenças do arquivo suprimidas por serem muito extensas
Ver arquivo


+ 41
- 0
src/JT1078.Gateway.Tests/JT1078.Gateway.TestNormalHosting/wwwroot/index.html Ver arquivo

@@ -0,0 +1,41 @@
<!DOCTYPE html>

<html lang="en" xmlns="http://www.w3.org/1999/xhtml">
<head>
<meta charset="utf-8" />
<title></title>
<script src="flv.min.js"></script>
</head>
<body>
<video muted="muted" webkit-playsinline="true" autoplay="true" id="player"></video>
<script>
if (flvjs.isSupported()) {
var player = document.getElementById('player');
var flvPlayer = flvjs.createPlayer({
type: 'flv',
isLive: true,
url: "ws://127.0.0.1:15555/?sim=11234567810&channel=1&token=123456"
});
flvPlayer.attachMediaElement(player);
flvPlayer.load();
flvPlayer.play();
function componentDidMount() {
this.cleanBuff = setInterval(function () {
let buffered = player.buffered
console.log("start...")
if (buffered.length > 0) {
let end = buffered.end(0)
if (end - player.currentTime > 0.15) {
player.currentTime = end - 0.1;
console.log("exe... start")
}
}
console.log("end...")
}, 3 * 10 * 1000)
};
componentDidMount();
}
</script>
</body>
</html>

+ 12
- 9
src/JT1078.Gateway/JT1078HttpServer.cs Ver arquivo

@@ -27,6 +27,7 @@ namespace JT1078.Gateway
private readonly JT1078Configuration Configuration; private readonly JT1078Configuration Configuration;


private readonly IJT1078Authorization authorization; private readonly IJT1078Authorization authorization;

private IMemoryCache memoryCache; private IMemoryCache memoryCache;


private HttpListener listener; private HttpListener listener;
@@ -102,15 +103,16 @@ namespace JT1078.Gateway
context.Http404(); context.Http404();
return; return;
} }
var uri = new Uri(context.Request.RawUrl);
string url = uri.AbsolutePath;
var queryParams = uri.Query.Substring(1, uri.Query.Length - 1).Split('&');
if (queryParams.Length < 2) {
context.Http404();
return;
}
if (url.EndsWith(".m3u8") || url.EndsWith(".ts"))
if (context.Request.RawUrl.EndsWith(".m3u8") || context.Request.RawUrl.EndsWith(".ts"))
{ {
var uri = new Uri(context.Request.RawUrl);
string url = uri.AbsolutePath;
var queryParams = uri.Query.Substring(1, uri.Query.Length - 1).Split('&');
if (queryParams.Length < 2)
{
context.Http404();
return;
}
string key = $"{queryParams[0].Split('=')[1]}_{queryParams[1].Split('=')[1]}";//默认queryParams第一个参数是终端号,第二个参数是通道号 string key = $"{queryParams[0].Split('=')[1]}_{queryParams[1].Split('=')[1]}";//默认queryParams第一个参数是终端号,第二个参数是通道号
memoryCache.GetOrCreate(key, (cacheEntry) => { memoryCache.GetOrCreate(key, (cacheEntry) => {
cacheEntry.SetSlidingExpiration(TimeSpan.FromSeconds(20)); cacheEntry.SetSlidingExpiration(TimeSpan.FromSeconds(20));
@@ -175,7 +177,8 @@ namespace JT1078.Gateway
jT1078HttpContext.Sim = sim; jT1078HttpContext.Sim = sim;
jT1078HttpContext.ChannelNo = channelNo; jT1078HttpContext.ChannelNo = channelNo;
SessionManager.TryAdd(jT1078HttpContext); SessionManager.TryAdd(jT1078HttpContext);
await jT1078HttpContext.WebSocketSendHelloAsync();
//这个发送出去,flv.js就报错了
//await jT1078HttpContext.WebSocketSendHelloAsync();
await Task.Factory.StartNew(async(state) => await Task.Factory.StartNew(async(state) =>
{ {
//https://www.bejson.com/httputil/websocket/ //https://www.bejson.com/httputil/websocket/


+ 61
- 22
src/JT1078.Gateway/JT1078TcpServer.cs Ver arquivo

@@ -3,6 +3,7 @@ using System.Buffers;
using System.Buffers.Binary; using System.Buffers.Binary;
using System.Collections.Generic; using System.Collections.Generic;
using System.IO.Pipelines; using System.IO.Pipelines;
using System.Linq;
using System.Net; using System.Net;
using System.Net.Sockets; using System.Net.Sockets;
using System.Text; using System.Text;
@@ -27,13 +28,15 @@ namespace JT1078.Gateway


private readonly ILogger Logger; private readonly ILogger Logger;


private readonly ILogger LogLogger;

private readonly JT1078Configuration Configuration; private readonly JT1078Configuration Configuration;


private readonly JT1078SessionManager SessionManager; private readonly JT1078SessionManager SessionManager;


private readonly IJT1078PackageProducer jT1078PackageProducer;
private readonly IJT1078PackageProducer jT1078PackageProducer;


private readonly IJT1078MsgProducer jT1078MsgProducer;
private readonly IJT1078MsgProducer jT1078MsgProducer;


private readonly JT1078UseType jT1078UseType; private readonly JT1078UseType jT1078UseType;


@@ -53,6 +56,7 @@ namespace JT1078.Gateway
SessionManager = jT1078SessionManager; SessionManager = jT1078SessionManager;
jT1078UseType = JT1078UseType.Normal; jT1078UseType = JT1078UseType.Normal;
Logger = loggerFactory.CreateLogger<JT1078TcpServer>(); Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
LogLogger = loggerFactory.CreateLogger("JT1078Logging");
Configuration = jT1078ConfigurationAccessor.Value; Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078PackageProducer = jT1078PackageProducer; this.jT1078PackageProducer = jT1078PackageProducer;
InitServer(); InitServer();
@@ -74,6 +78,7 @@ namespace JT1078.Gateway
SessionManager = jT1078SessionManager; SessionManager = jT1078SessionManager;
jT1078UseType = JT1078UseType.Queue; jT1078UseType = JT1078UseType.Queue;
Logger = loggerFactory.CreateLogger<JT1078TcpServer>(); Logger = loggerFactory.CreateLogger<JT1078TcpServer>();
LogLogger = loggerFactory.CreateLogger("JT1078Logging");
Configuration = jT1078ConfigurationAccessor.Value; Configuration = jT1078ConfigurationAccessor.Value;
this.jT1078MsgProducer = jT1078MsgProducer; this.jT1078MsgProducer = jT1078MsgProducer;
InitServer(); InitServer();
@@ -199,29 +204,30 @@ namespace JT1078.Gateway
} }
private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, FixedHeaderInfo fixedHeaderInfo, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined) private void ReaderBuffer(ref ReadOnlySequence<byte> buffer, FixedHeaderInfo fixedHeaderInfo, JT1078TcpSession session, out SequencePosition consumed, out SequencePosition examined)
{ {
consumed = buffer.Start;
examined = buffer.End;
SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer); SequenceReader<byte> seqReader = new SequenceReader<byte>(buffer);
long totalConsumed = 0; long totalConsumed = 0;
while (!seqReader.End) while (!seqReader.End)
{ {
if (seqReader.Remaining < 30)
{
fixedHeaderInfo.Reset();
break;
}
if (!fixedHeaderInfo.FoundHeader) if (!fixedHeaderInfo.FoundHeader)
{ {
if (seqReader.Length < 4)
throw new ArgumentException("not JT1078 package");
var header = seqReader.Sequence.Slice(seqReader.Consumed, 4);
var header = seqReader.Sequence.Slice(0, 4);
var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan); var headerValue = BinaryPrimitives.ReadUInt32BigEndian(header.FirstSpan);
if (JT1078Package.FH == headerValue) if (JT1078Package.FH == headerValue)
{ {
//sim //sim
fixedHeaderInfo.SIM = ReadBCD(seqReader.Sequence.Slice(seqReader.Consumed + 8, 6).FirstSpan, 12);
fixedHeaderInfo.SIM = ReadBCD(seqReader.Sequence.Slice(8, 6).FirstSpan, 12);
if (string.IsNullOrEmpty(fixedHeaderInfo.SIM)) if (string.IsNullOrEmpty(fixedHeaderInfo.SIM))
{ {
fixedHeaderInfo.SIM = session.SessionID; fixedHeaderInfo.SIM = session.SessionID;
} }
//根据数据类型处理对应的数据长度 //根据数据类型处理对应的数据长度
fixedHeaderInfo.TotalSize += 15; fixedHeaderInfo.TotalSize += 15;
var dataType = seqReader.Sequence.Slice(seqReader.Consumed+fixedHeaderInfo.TotalSize, 1).FirstSpan[0];
var dataType = seqReader.Sequence.Slice(fixedHeaderInfo.TotalSize, 1).FirstSpan[0];
fixedHeaderInfo.TotalSize += 1; fixedHeaderInfo.TotalSize += 1;
JT1078Label3 label3 = new JT1078Label3(dataType); JT1078Label3 label3 = new JT1078Label3(dataType);
int bodyLength = 0; int bodyLength = 0;
@@ -240,33 +246,43 @@ namespace JT1078.Gateway
bodyLength += 4; bodyLength += 4;
} }
fixedHeaderInfo.TotalSize += bodyLength; fixedHeaderInfo.TotalSize += bodyLength;
var bodyLengthFirstSpan = seqReader.Sequence.Slice(seqReader.Consumed + fixedHeaderInfo.TotalSize, 2).FirstSpan;
var bodyLengthFirstSpan = seqReader.Sequence.Slice(fixedHeaderInfo.TotalSize, 2).FirstSpan;
//数据体长度 //数据体长度
fixedHeaderInfo.TotalSize += 2; fixedHeaderInfo.TotalSize += 2;
bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan); bodyLength = BinaryPrimitives.ReadUInt16BigEndian(bodyLengthFirstSpan);
if (bodyLength < 0)
{
fixedHeaderInfo.Reset();
throw new ArgumentException("jt1078 package body length Error.");
}
if (bodyLength == 0)//数据体长度为0 if (bodyLength == 0)//数据体长度为0
{ {
seqReader.Advance(fixedHeaderInfo.TotalSize); seqReader.Advance(fixedHeaderInfo.TotalSize);
var package1 = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed);
var package1 = seqReader.Sequence.Slice(0, fixedHeaderInfo.TotalSize).ToArray();
if (LogLogger.IsEnabled(LogLevel.Trace))
{
LogLogger.LogTrace($"{package1.ToHexString()}");
}
try try
{ {
SessionManager.TryLink(fixedHeaderInfo.SIM, session); SessionManager.TryLink(fixedHeaderInfo.SIM, session);
if (jT1078UseType == JT1078UseType.Queue) if (jT1078UseType == JT1078UseType.Queue)
{ {
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.FirstSpan.ToArray());
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package1.ToArray());
} }
else else
{ {
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1.FirstSpan));
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package1));
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, $"[Parse]:{package1.ToArray().ToHexString()}");
LogLogger.LogError($"[Error Parse 1]:{package1.ToHexString()}");
Logger.LogError(ex, $"[Error Parse 1]:{package1.ToHexString()}");
} }
finally finally
{ {
totalConsumed += (seqReader.Consumed - totalConsumed);
totalConsumed += seqReader.Consumed;
fixedHeaderInfo.Reset(); fixedHeaderInfo.Reset();
} }
continue; continue;
@@ -275,44 +291,67 @@ namespace JT1078.Gateway
fixedHeaderInfo.TotalSize += bodyLength; fixedHeaderInfo.TotalSize += bodyLength;
fixedHeaderInfo.FoundHeader = true; fixedHeaderInfo.FoundHeader = true;
} }
else
{
fixedHeaderInfo.Reset();
throw new ArgumentException("not JT1078 package.");
}
} }
if (seqReader.Length < fixedHeaderInfo.TotalSize) break;
if ((seqReader.Remaining - fixedHeaderInfo.TotalSize) < 0) break;
seqReader.Advance(fixedHeaderInfo.TotalSize); seqReader.Advance(fixedHeaderInfo.TotalSize);
var package = seqReader.Sequence.Slice(totalConsumed, seqReader.Consumed - totalConsumed);
var package = seqReader.Sequence.Slice(0, fixedHeaderInfo.TotalSize).ToArray();
if (LogLogger.IsEnabled(LogLevel.Trace))
{
LogLogger.LogTrace($"{package.ToHexString()}");
}
try try
{ {
SessionManager.TryLink(fixedHeaderInfo.SIM, session); SessionManager.TryLink(fixedHeaderInfo.SIM, session);
if (jT1078UseType == JT1078UseType.Queue) if (jT1078UseType == JT1078UseType.Queue)
{ {
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package.ToArray());
jT1078MsgProducer.ProduceAsync(fixedHeaderInfo.SIM, package);
} }
else else
{ {
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package.FirstSpan));
jT1078PackageProducer.ProduceAsync(fixedHeaderInfo.SIM, JT1078Serializer.Deserialize(package));
} }
} }
catch (Exception ex) catch (Exception ex)
{ {
Logger.LogError(ex, $"[Parse]:{package.ToArray().ToHexString()}");
LogLogger.LogError($"[Error Parse 2]:{package.ToHexString()}");
Logger.LogError(ex, $"[Error Parse 2]:{package.ToHexString()}");
} }
finally finally
{ {
totalConsumed += (seqReader.Consumed - totalConsumed);
totalConsumed += seqReader.Consumed;
fixedHeaderInfo.Reset(); fixedHeaderInfo.Reset();
seqReader = new SequenceReader<byte>(seqReader.Sequence.Slice(seqReader.Consumed));
} }
} }
if (seqReader.Length == totalConsumed)
if (seqReader.End)
{ {
examined = consumed = buffer.End; examined = consumed = buffer.End;
} }
else else
{ {
consumed = buffer.GetPosition(totalConsumed); consumed = buffer.GetPosition(totalConsumed);
examined = buffer.End;
} }
} }
public Task StopAsync(CancellationToken cancellationToken) public Task StopAsync(CancellationToken cancellationToken)
{ {
Logger.LogInformation("JT1078 Tcp Server Stop"); Logger.LogInformation("JT1078 Tcp Server Stop");
SessionManager.GetTcpAll().ForEach(session =>
{
try
{
session.Close();
}
catch (Exception ex)
{

}
});
if (server?.Connected ?? false) if (server?.Connected ?? false)
server.Shutdown(SocketShutdown.Both); server.Shutdown(SocketShutdown.Both);
server?.Close(); server?.Close();


+ 3
- 0
src/JT1078.Gateway/Metadata/JT1078HttpContext.cs Ver arquivo

@@ -4,13 +4,16 @@ using System.Net;
using System.Net.WebSockets; using System.Net.WebSockets;
using System.Security.Principal; using System.Security.Principal;
using System.Text; using System.Text;
using System.Text.Json.Serialization;


namespace JT1078.Gateway.Metadata namespace JT1078.Gateway.Metadata
{ {
public class JT1078HttpContext public class JT1078HttpContext
{ {
public string SessionId { get; } public string SessionId { get; }
[JsonIgnore]
public HttpListenerContext Context { get; } public HttpListenerContext Context { get; }
[JsonIgnore]
public HttpListenerWebSocketContext WebSocketContext { get; } public HttpListenerWebSocketContext WebSocketContext { get; }
public IPrincipal User { get; } public IPrincipal User { get; }
public string Sim { get; set; } public string Sim { get; set; }


+ 34
- 41
src/JT1078.Gateway/Sessions/JT1078HttpSessionManager.cs Ver arquivo

@@ -88,72 +88,65 @@ namespace JT1078.Gateway.Sessions
/// <summary> /// <summary>
/// 发送音视频数据 /// 发送音视频数据
/// </summary> /// </summary>
/// <param name="httpContexts"></param>
/// <param name="httpContext"></param>
/// <param name="data"></param> /// <param name="data"></param>
/// <param name="firstSend"></param> /// <param name="firstSend"></param>
public void SendAVData(List<JT1078HttpContext> httpContexts, byte[] data, bool firstSend)
public async void SendAVData(JT1078HttpContext httpContext, byte[] data, bool firstSend)
{ {
ParallelLoopResult parallelLoopResult = Parallel.ForEach(httpContexts, async (context) =>
if (httpContext.IsWebSocket)
{ {
if (context.IsWebSocket)
if (firstSend)
{ {
if (firstSend)
httpContext.FirstSend = firstSend;
Sessions.TryUpdate(httpContext.SessionId, httpContext, httpContext);
}
try
{
await httpContext.WebSocketSendBinaryAsync(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{ {
context.FirstSend = firstSend;
Sessions.TryUpdate(context.SessionId, context, context);
Logger.LogInformation($"[ws close]:{httpContext.SessionId}-{httpContext.Sim}-{httpContext.ChannelNo}-{httpContext.StartTime:yyyyMMddhhmmss}");
} }
remove(httpContext.SessionId);
}
}
else
{
if (firstSend)
{
httpContext.FirstSend = firstSend;
Sessions.TryUpdate(httpContext.SessionId, httpContext, httpContext);
try try
{ {
await context.WebSocketSendBinaryAsync(data);
await httpContext.HttpSendFirstChunked(data);
} }
catch (Exception ex) catch (Exception ex)
{ {
if (Logger.IsEnabled(LogLevel.Information)) if (Logger.IsEnabled(LogLevel.Information))
{ {
Logger.LogInformation($"[ws close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
Logger.LogInformation($"[http close]:{httpContext.SessionId}-{httpContext.Sim}-{httpContext.ChannelNo}-{httpContext.StartTime:yyyyMMddhhmmss}");
} }
remove(context.SessionId);
}
remove(httpContext.SessionId);
}
} }
else else
{ {
if (firstSend)
try
{ {
context.FirstSend = firstSend;
Sessions.TryUpdate(context.SessionId, context, context);
try
{
await context.HttpSendFirstChunked(data);
}
catch (Exception ex)
{
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
}
await httpContext.HttpSendChunked(data);
} }
else
catch (Exception ex)
{ {
try
{
await context.HttpSendChunked(data);
}
catch (Exception ex)
if (Logger.IsEnabled(LogLevel.Information))
{ {
if (Logger.IsEnabled(LogLevel.Information))
{
Logger.LogInformation($"[http close]:{context.SessionId}-{context.Sim}-{context.ChannelNo}-{context.StartTime:yyyyMMddhhmmss}");
}
remove(context.SessionId);
Logger.LogInformation($"[http close]:{httpContext.SessionId}-{httpContext.Sim}-{httpContext.ChannelNo}-{httpContext.StartTime:yyyyMMddhhmmss}");
} }
remove(httpContext.SessionId);
} }
} }
});
if (parallelLoopResult.IsCompleted)
{

} }
} }




Carregando…
Cancelar
Salvar