Quellcode durchsuchen

1.修改demo

2.修复会话未关联sim卡号
3.修改文档
tags/pipeline-1.0.0
smallchi(Koike) vor 5 Jahren
Ursprung
Commit
91ec904e0d
13 geänderte Dateien mit 359 neuen und 25 gelöschten Zeilen
  1. +1
    -1
      README.md
  2. +0
    -1
      publish.gateway.bat
  3. +6
    -5
      simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj
  4. +67
    -0
      simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs
  5. +2
    -0
      simples/JT808.Gateway.SimpleClient/Program.cs
  6. +86
    -0
      simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs
  7. +63
    -0
      simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs
  8. +30
    -0
      simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs
  9. +7
    -7
      simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj
  10. +48
    -0
      simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs
  11. +18
    -10
      simples/JT808.Gateway.SimpleServer/Program.cs
  12. +28
    -0
      simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs
  13. +3
    -1
      src/JT808.Gateway/Session/JT808SessionManager.cs

+ 1
- 1
README.md Datei anzeigen

@@ -14,7 +14,7 @@


[玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md) [玩一玩压力测试](https://github.com/SmallChi/JT808Gateway/blob/master/doc/README.md)


[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)![.NET Core](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg)
[![MIT Licence](https://img.shields.io/github/license/mashape/apistatus.svg)](https://github.com/SmallChi/JT808Gateway/blob/master/LICENSE)![.NET Core](https://github.com/SmallChi/JT808Gateway/workflows/.NET%20Core/badge.svg?branch=master)


## 新网关的优势 ## 新网关的优势




+ 0
- 1
publish.gateway.bat Datei anzeigen

@@ -1,6 +1,5 @@
dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway\JT808.Gateway.csproj -c Release --output nupkgs
dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj -c Release --output nupkgs
dotnet pack .\src\JT808.Gateway.InMemoryMQ\JT808.Gateway.InMemoryMQ.csproj -c Release --output nupkgs
dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Abstractions\JT808.Gateway.Abstractions.csproj -c Release --output nupkgs
dotnet pack .\src\JT808.Gateway.Client\JT808.Gateway.Client.csproj -c Release --output nupkgs dotnet pack .\src\JT808.Gateway.Client\JT808.Gateway.Client.csproj -c Release --output nupkgs




+ 6
- 5
simples/JT808.Gateway.SimpleClient/JT808.Gateway.SimpleClient.csproj Datei anzeigen

@@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">


<PropertyGroup> <PropertyGroup>
<OutputType>Exe</OutputType> <OutputType>Exe</OutputType>
@@ -6,11 +6,12 @@
</PropertyGroup> </PropertyGroup>


<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway.Client" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.Abstractions" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.Client" Version="1.0.0-preview6" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.1" />
<PackageReference Include="WebApiClient.Extensions.DependencyInjection" Version="2.0.3" /> <PackageReference Include="WebApiClient.Extensions.DependencyInjection" Version="2.0.3" />
</ItemGroup> </ItemGroup>
</Project> </Project>

+ 67
- 0
simples/JT808.Gateway.SimpleClient/Jobs/CallGrpcClientJob.cs Datei anzeigen

@@ -0,0 +1,67 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Grpc.Core;
using JT808.Gateway.GrpcService;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System.Text.Json;

namespace JT808.Gateway.SimpleClient.Jobs
{
public class CallGrpcClientJob :IHostedService
{
private Channel channel;
private readonly ILogger Logger;
private Grpc.Core.Metadata AuthMetadata;
public CallGrpcClientJob(
ILoggerFactory loggerFactory)
{
Logger = loggerFactory.CreateLogger("CallGrpcClientJob");
channel = new Channel("localhost:828",
ChannelCredentials.Insecure);
AuthMetadata = new Grpc.Core.Metadata();
AuthMetadata.Add("token", "smallchi518");
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
JT808Gateway.JT808GatewayClient jT808GatewayClient = new JT808Gateway.JT808GatewayClient(channel);
try
{
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty(), AuthMetadata);
var result2 = jT808GatewayClient.GetTcpSessionAll(new Empty(), AuthMetadata);
Logger.LogInformation($"[GetTcpAtomicCounter]:{JsonSerializer.Serialize(result1)}");
Logger.LogInformation($"[GetTcpSessionAll]:{JsonSerializer.Serialize(result2)}");
}
catch (Exception ex)
{
Logger.LogError(ex, "Call Grpc Error");
}
try
{
var result1 = jT808GatewayClient.GetTcpAtomicCounter(new Empty());
}
catch (RpcException ex)
{
Logger.LogError($"{ex.StatusCode.ToString()}-{ex.Message}");
}
Thread.Sleep(3000);
}
}, cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
channel.ShutdownAsync();
return Task.CompletedTask;
}
}
}

+ 2
- 0
simples/JT808.Gateway.SimpleClient/Program.cs Datei anzeigen

@@ -9,6 +9,7 @@ using System.Threading;
using Microsoft.Extensions.Hosting; using Microsoft.Extensions.Hosting;
using JT808.Gateway.Client; using JT808.Gateway.Client;
using JT808.Gateway.SimpleClient.Services; using JT808.Gateway.SimpleClient.Services;
using JT808.Gateway.SimpleClient.Jobs;


namespace JT808.Gateway.SimpleClient namespace JT808.Gateway.SimpleClient
{ {
@@ -30,6 +31,7 @@ namespace JT808.Gateway.SimpleClient
.AddClient(); .AddClient();
services.AddHostedService<UpService>(); services.AddHostedService<UpService>();
services.AddHostedService<Up2019Service>(); services.AddHostedService<Up2019Service>();
services.AddHostedService<CallGrpcClientJob>();
}); });
await serverHostBuilder.RunConsoleAsync(); await serverHostBuilder.RunConsoleAsync();
} }


+ 86
- 0
simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs Datei anzeigen

@@ -0,0 +1,86 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.MsgLogging;
using JT808.Gateway.Traffic;
using JT808.Gateway.Transmit;
using JT808.Protocol;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT808.Gateway.SimpleServer.Impl
{
public class JT808NormalReplyMessageHandlerImpl : JT808NormalReplyMessageHandler
{
private readonly ILogger logger;
private readonly IJT808Traffic jT808Traffic;
private readonly IJT808MsgLogging jT808MsgLogging;
private readonly JT808TransmitService jT808TransmitService;
public JT808NormalReplyMessageHandlerImpl(
JT808TransmitService jT808TransmitService,
IJT808MsgLogging jT808MsgLogging,
IJT808Traffic jT808Traffic,
ILoggerFactory loggerFactory,
IJT808Config jT808Config) : base(jT808Config)
{
this.jT808TransmitService = jT808TransmitService;
this.jT808Traffic = jT808Traffic;
this.jT808MsgLogging = jT808MsgLogging;
logger =loggerFactory.CreateLogger("JT808NormalReplyMessageHandlerImpl");
//添加自定义消息
HandlerDict.Add(0x9999, Msg0x9999);
}

/// <summary>
/// 重写消息处理器
/// </summary>
/// <param name="request"></param>
/// <param name="session"></param>
public override byte[] Processor(JT808HeaderPackage request, IJT808Session session)
{
//AOP 可以自定义添加一些东西:上下行日志、数据转发
logger.LogDebug("可以自定义添加一些东西:上下行日志、数据转发");
//流量
jT808Traffic.Increment(request.Header.TerminalPhoneNo, DateTime.Now.ToString("yyyyMMdd"), request.OriginalData.Length);
var parameter = (request.Header.TerminalPhoneNo, request.OriginalData.ToArray());
//转发数据(可同步也可以使用队列进行异步)
try
{
jT808TransmitService.Send(parameter);
}
catch (Exception ex)
{
logger.LogError(ex,"");
}
//上行日志(可同步也可以使用队列进行异步)
jT808MsgLogging.Processor(parameter, JT808MsgLoggingType.up);
//处理上行消息
var down= base.Processor(request, session);
//下行日志(可同步也可以使用队列进行异步)
jT808MsgLogging.Processor((request.Header.TerminalPhoneNo, down), JT808MsgLoggingType.down);
return down;
}

/// <summary>
/// 重写自带的消息
/// </summary>
/// <param name="request"></param>
/// <param name="session"></param>
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("重写自带Msg0x0200的消息");
return base.Msg0x0200(request, session);
}

/// <summary>
/// 自定义消息
/// </summary>
/// <param name="request"></param>
/// <returns></returns>
public byte[] Msg0x9999(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("自定义消息");
return default;
}
}
}

+ 63
- 0
simples/JT808.Gateway.SimpleServer/Impl/JT808SessionConsumer.cs Datei anzeigen

@@ -0,0 +1,63 @@
using JT808.Gateway.Abstractions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using JT808.Gateway.SimpleServer.Services;

namespace JT808.Gateway.SimpleServer.Impl
{
public class JT808SessionConsumer : IJT808SessionConsumer
{
public CancellationTokenSource Cts => new CancellationTokenSource();

private readonly ILogger logger;

public string TopicName { get; } = JT808GatewayConstants.SessionTopic;

private readonly JT808SessionService JT808SessionService;
public JT808SessionConsumer(
JT808SessionService jT808SessionService,
ILoggerFactory loggerFactory)
{
logger = loggerFactory.CreateLogger("JT808SessionConsumer");
JT808SessionService = jT808SessionService;
}

public void OnMessage(Action<(string Notice, string TerminalNo)> callback)
{
Task.Run(async () =>
{
while (!Cts.IsCancellationRequested)
{
try
{
var item = await JT808SessionService.ReadAsync(Cts.Token);
callback(item);
}
catch (Exception ex)
{
logger.LogError(ex, "");
}
}
}, Cts.Token);
}

public void Unsubscribe()
{
Cts.Cancel();
}

public void Dispose()
{
Cts.Dispose();
}

public void Subscribe()
{

}
}
}

+ 30
- 0
simples/JT808.Gateway.SimpleServer/Impl/JT808SessionProducer.cs Datei anzeigen

@@ -0,0 +1,30 @@
using JT808.Gateway.Abstractions;
using JT808.Gateway.SimpleServer.Services;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

namespace JT808.Gateway.SimpleServer.Impl
{
public class JT808SessionProducer : IJT808SessionProducer
{
public string TopicName { get; } = JT808GatewayConstants.SessionTopic;

private readonly JT808SessionService JT808SessionService;

public JT808SessionProducer(JT808SessionService jT808SessionService)
{
JT808SessionService = jT808SessionService;
}

public async ValueTask ProduceAsync(string notice,string terminalNo)
{
await JT808SessionService.WriteAsync(notice, terminalNo);
}

public void Dispose()
{
}
}
}

+ 7
- 7
simples/JT808.Gateway.SimpleServer/JT808.Gateway.SimpleServer.csproj Datei anzeigen

@@ -5,13 +5,13 @@
<TargetFramework>netcoreapp3.1</TargetFramework> <TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup> </PropertyGroup>
<ItemGroup> <ItemGroup>
<PackageReference Include="JT808.Gateway" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.InMemoryMQ" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway.Traffic" Version="1.0.0-preview5" />
<PackageReference Include="JT808.Gateway" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.MsgIdHandler" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.MsgLogging" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.ReplyMessage" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.SessionNotice" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.Traffic" Version="1.0.0-preview6" />
<PackageReference Include="JT808.Gateway.Transmit" Version="1.0.0-preview6" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.1" /> <PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.1" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" /> <PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.1" />


+ 48
- 0
simples/JT808.Gateway.SimpleServer/Jobs/TrafficJob.cs Datei anzeigen

@@ -0,0 +1,48 @@
using JT808.Gateway.Traffic;
using JT808.Protocol.Enums;
using JT808.Protocol.Extensions;
using JT808.Protocol.MessageBody;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace JT808.Gateway.SimpleServer.Jobs
{
public class TrafficJob : IHostedService
{
private readonly IJT808Traffic jT808Traffic;
private readonly ILogger Logger;
public TrafficJob(
ILoggerFactory loggerFactory,
IJT808Traffic jT808Traffic)
{
Logger = loggerFactory.CreateLogger("TrafficJob");
this.jT808Traffic = jT808Traffic;
}

public Task StartAsync(CancellationToken cancellationToken)
{
Task.Run(async () =>
{
while (!cancellationToken.IsCancellationRequested)
{
await Task.Delay(2 * 1000);
foreach (var item in jT808Traffic.GetAll())
{
Logger.LogDebug($"{item.Item1}-{item.Item2}");
}
}
}, cancellationToken);
return Task.CompletedTask;
}

public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

+ 18
- 10
simples/JT808.Gateway.SimpleServer/Program.cs Datei anzeigen

@@ -1,5 +1,4 @@
using JT808.Gateway.Abstractions.Enums; using JT808.Gateway.Abstractions.Enums;
using JT808.Gateway.InMemoryMQ;
using JT808.Gateway.ReplyMessage; using JT808.Gateway.ReplyMessage;
using JT808.Gateway.MsgLogging; using JT808.Gateway.MsgLogging;
using JT808.Gateway.Traffic; using JT808.Gateway.Traffic;
@@ -14,6 +13,10 @@ using Microsoft.Extensions.Logging;
using System; using System;
using System.Threading.Tasks; using System.Threading.Tasks;
using JT808.Gateway.SimpleServer.Impl; using JT808.Gateway.SimpleServer.Impl;
using JT808.Gateway.SimpleServer.Services;
using JT808.Gateway.Abstractions;
using JT808.Gateway.Transmit;
using JT808.Gateway.SimpleServer.Jobs;


namespace JT808.Gateway.SimpleServer namespace JT808.Gateway.SimpleServer
{ {
@@ -36,18 +39,23 @@ namespace JT808.Gateway.SimpleServer
{ {
services.AddSingleton<ILoggerFactory, LoggerFactory>(); services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
//使用内存队列实现会话通知
services.AddSingleton<JT808SessionService>();
services.AddSingleton<IJT808SessionProducer, JT808SessionProducer>();
services.AddSingleton<IJT808SessionConsumer, JT808SessionConsumer>();
services.AddJT808Configure() services.AddJT808Configure()
.AddGateway(hostContext.Configuration)
.AddNormalGateway(hostContext.Configuration)
.ReplaceNormalReplyMessageHandler<JT808NormalReplyMessageHandlerImpl>()
.AddMsgLogging<JT808MsgLogging>()
.AddTraffic()
.AddSessionNotice()
.AddTransmit(hostContext.Configuration)
.AddTcp() .AddTcp()
.AddServerInMemoryMQ(JT808ConsumerType.MsgIdHandlerConsumer|
JT808ConsumerType.ReplyMessageConsumer |
JT808ConsumerType.MsgLoggingConsumer |
JT808ConsumerType.ReplyMessageLoggingConsumer)
.AddInMemoryMsgIdHandler<JT808MsgIdHandler>()
.AddInMemoryReplyMessage()
.AddInMemoryMsgLogging<JT808MsgLogging>()
.AddInMemorySessionNotice()
.AddUdp()
.AddGrpc()
.Builder(); .Builder();
//流量统计
services.AddHostedService<TrafficJob>();
}); });


await serverHostBuilder.RunConsoleAsync(); await serverHostBuilder.RunConsoleAsync();


+ 28
- 0
simples/JT808.Gateway.SimpleServer/Services/JT808SessionService.cs Datei anzeigen

@@ -0,0 +1,28 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.SimpleServer.Services
{
public class JT808SessionService
{
private readonly Channel<(string Notice, string TerminalNo)> _channel;

public JT808SessionService()
{
_channel = Channel.CreateUnbounded<(string Notice, string TerminalNo)>();
}

public async ValueTask WriteAsync(string notice, string terminalNo)
{
await _channel.Writer.WriteAsync((notice, terminalNo));
}
public async ValueTask<(string Notice, string TerminalNo)> ReadAsync(CancellationToken cancellationToken)
{
return await _channel.Reader.ReadAsync(cancellationToken);
}
}
}

+ 3
- 1
src/JT808.Gateway/Session/JT808SessionManager.cs Datei anzeigen

@@ -84,7 +84,8 @@ namespace JT808.Gateway.Session
} }
else else
{ {
if(TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
session.TerminalPhoneNo = terminalPhoneNo;
if (TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session))
{ {
//会话通知 //会话通知
JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo); JT808SessionProducer?.ProduceAsync(JT808GatewayConstants.SessionOnline, terminalPhoneNo);
@@ -104,6 +105,7 @@ namespace JT808.Gateway.Session
else else
{ {
JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint); JT808UdpSession session = new JT808UdpSession(socket, remoteEndPoint);
session.TerminalPhoneNo = terminalPhoneNo;
Sessions.TryAdd(session.SessionID, session); Sessions.TryAdd(session.SessionID, session);
TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session); TerminalPhoneNoSessions.TryAdd(terminalPhoneNo, session);
currentSession = session; currentSession = session;


Laden…
Abbrechen
Speichern