Kaynağa Gözat

1.修改微软提供的发布订阅channel

2.修改开发环境扩展配置
3.修改文档
tags/pipeline-1.0.0
SmallChi(Koike) 5 yıl önce
ebeveyn
işleme
88fe3cbae2
8 değiştirilmiş dosya ile 74 ekleme ve 25 silme
  1. +23
    -10
      README.md
  2. +1
    -0
      src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj
  3. +17
    -2
      src/JT808.Gateway.TestHosting/Program.cs
  4. +2
    -3
      src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs
  5. +10
    -7
      src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs
  6. +19
    -1
      src/JT808.Gateway/Internal/JT808MsgService.cs
  7. +1
    -0
      src/JT808.Gateway/JT808.Gateway.csproj
  8. +1
    -2
      src/JT808.Gateway/JT808GatewayExtensions.cs

+ 23
- 10
README.md Dosyayı Görüntüle

@@ -27,7 +27,7 @@

![design_model](https://github.com/SmallChi/JT808DotNetty/blob/master/doc/img/design_model.png)

## 集成接口功能(JT808.DotNetty.Abstractions)
## 集成接口功能

|接口名称|接口说明|使用场景|
|:------:|:------|:------|
@@ -51,7 +51,7 @@
|Traffic|流量统计服务 |由于运营商sim卡查询流量滞后,通过流量统计服务可以实时准确的统计设备流量,可以最优配置设备的流量大小,以节省成本
|Transmit| 原包转发服务|该服务可以将设备上报原始数据转发到第三方,支持全部转发,指定终端号转发|

## 基于WebApi的消息业务处理程序(JT808.DotNetty.WebApi)
## 基于WebApi的消息业务处理程序

通过继承JT808.DotNetty.Core.Handlers.JT808MsgIdHttpHandlerBase去实现自定义的WebApi接口服务。

@@ -86,12 +86,12 @@
| Install-Package JT808.Gateway.Abstractions| ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/v/JT808.Gateway.Abstractions.svg) | ![JT808.Gateway.Abstractions](https://img.shields.io/nuget/dt/JT808.Gateway.Abstractions.svg) |
| Install-Package JT808.Gateway | ![JT808.Gateway](https://img.shields.io/nuget/v/JT808.Gateway.svg) | ![JT808.Gateway](https://img.shields.io/nuget/dt/JT808.Gateway.svg) |
| Install-Package JT808.Gateway.Kafka| ![JT808.Gateway.Kafka](https://img.shields.io/nuget/v/JT808.Gateway.Kafka.svg) | ![JT808.Gateway.Kafka](https://img.shields.io/nuget/dt/JT808.Gateway.Kafka.svg) |
| Install-Package JT808.Gateway.Transmit | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) |
| Install-Package JT808.Gateway.Traffic | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)|
| Install-Package JT808.Gateway.SessionNotice | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)|
| Install-Package JT808.Gateway.ReplyMessage | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)|
| Install-Package JT808.Gateway.MsgLogging | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)|
| Install-Package JT808.Gateway.MsgIdHandler | ![JT808](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)|
| Install-Package JT808.Gateway.Transmit | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/v/JT808.Gateway.Transmit.svg) | ![JT808.Gateway.Transmit](https://img.shields.io/nuget/dt/JT808.Gateway.Transmit.svg) |
| Install-Package JT808.Gateway.Traffic | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/v/JT808.Gateway.Traffic.svg) | ![JT808.Gateway.Traffic](https://img.shields.io/nuget/dt/JT808.Gateway.Traffic.svg)|
| Install-Package JT808.Gateway.SessionNotice | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/v/JT808.Gateway.SessionNotice.svg) | ![JT808.Gateway.SessionNotice](https://img.shields.io/nuget/dt/JT808.Gateway.SessionNotice.svg)|
| Install-Package JT808.Gateway.ReplyMessage | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/v/JT808.Gateway.ReplyMessage.svg) | ![JT808.Gateway.ReplyMessage](https://img.shields.io/nuget/dt/JT808.Gateway.ReplyMessage.svg)|
| Install-Package JT808.Gateway.MsgLogging | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/v/JT808.Gateway.MsgLogging.svg) | ![JT808.Gateway.MsgLogging](https://img.shields.io/nuget/dt/JT808.Gateway.MsgLogging.svg)|
| Install-Package JT808.Gateway.MsgIdHandler | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/v/JT808.Gateway.MsgIdHandler.svg) | ![JT808.Gateway.MsgIdHandler](https://img.shields.io/nuget/dt/JT808.Gateway.MsgIdHandler.svg)|

## 举个栗子1

@@ -168,10 +168,23 @@ static async Task Main(string[] args)
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808Gateway()
//用于测试网关
.AddJT808DevelopmentGateway()
//用于生产环境
//.AddJT808Gateway(options =>
//{
// options.TcpPort=8086;
// options.UdpPort=8086;
// options.MessageQueueType = JT808MessageQueueType.InPlug;
//})
.AddTcp()
.AddUdp()
.AddGrpc();
.AddGrpc()
//kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
;
//services.AddHostedService<CallGrpcClientJob>();
});



+ 1
- 0
src/JT808.Gateway.TestHosting/JT808.Gateway.TestHosting.csproj Dosyayı Görüntüle

@@ -14,6 +14,7 @@
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT808.Gateway.Kafka\JT808.Gateway.Kafka.csproj" />
<ProjectReference Include="..\JT808.Gateway\JT808.Gateway.csproj" />
</ItemGroup>



+ 17
- 2
src/JT808.Gateway.TestHosting/Program.cs Dosyayı Görüntüle

@@ -7,6 +7,8 @@ using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using NLog.Extensions.Logging;
using JT808.Gateway.TestHosting.Jobs;
using JT808.Gateway.Enums;
using JT808.Gateway.Kafka;

namespace JT808.Gateway.TestHosting
{
@@ -33,10 +35,23 @@ namespace JT808.Gateway.TestHosting
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808Gateway()
//用于测试网关
.AddJT808DevelopmentGateway()
//用于生产环境
//.AddJT808Gateway(options =>
//{
// options.TcpPort=8086;
// options.UdpPort=8086;
// options.MessageQueueType = JT808MessageQueueType.InPlug;
//})
.AddTcp()
.AddUdp()
.AddGrpc();
.AddGrpc()
//kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
;
//services.AddHostedService<CallGrpcClientJob>();
});



+ 2
- 3
src/JT808.Gateway/Internal/JT808MsgProducerDefault.cs Dosyayı Görüntüle

@@ -19,10 +19,9 @@ namespace JT808.Gateway.Internal
{
}
public ValueTask ProduceAsync(string terminalNo, byte[] data)
public async ValueTask ProduceAsync(string terminalNo, byte[] data)
{
JT808MsgService.MsgQueue.Add((terminalNo, data));
return default;
await JT808MsgService.WriteAsync(terminalNo, data);
}
}
}

+ 10
- 7
src/JT808.Gateway/Internal/JT808MsgReplyConsumerDefault.cs Dosyayı Görüntüle

@@ -50,19 +50,22 @@ namespace JT808.Gateway.Internal
{
Task.Run(() =>
{
foreach(var item in JT808MsgService.MsgQueue.GetConsumingEnumerable())
while (!Cts.IsCancellationRequested)
{
try
{
var package = JT808Serializer.HeaderDeserialize(item.Data);
if (HandlerDict.TryGetValue(package.Header.MsgId, out var func))
if(JT808MsgService.TryRead(out var item))
{
var buffer = func(package);
if (buffer != null)
JT808HeaderPackage package = JT808Serializer.HeaderDeserialize(item.Data);
if (HandlerDict.TryGetValue(package.Header.MsgId, out var func))
{
callback((item.TerminalNo, buffer));
var buffer = func(package);
if (buffer != null)
{
callback((item.TerminalNo, buffer));
}
}
}
}
}
catch
{


+ 19
- 1
src/JT808.Gateway/Internal/JT808MsgService.cs Dosyayı Görüntüle

@@ -1,11 +1,29 @@
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;

namespace JT808.Gateway.Internal
{
internal class JT808MsgService
{
public System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)> MsgQueue { get; set; } = new System.Collections.Concurrent.BlockingCollection<(string TerminalNo, byte[] Data)>();
private readonly Channel<(string TerminalNo, byte[] Data)> _channel;

public JT808MsgService()
{
_channel = Channel.CreateUnbounded<(string TerminalNo, byte[] Data)>();
}

public async ValueTask WriteAsync(string terminalNo, byte[] data)
{
await _channel.Writer.WriteAsync((terminalNo, data));
}

public bool TryRead(out (string TerminalNo, byte[] Data) item)
{
return _channel.Reader.TryRead(out item);
}
}
}

+ 1
- 0
src/JT808.Gateway/JT808.Gateway.csproj Dosyayı Görüntüle

@@ -24,6 +24,7 @@
<PackageReference Include="Microsoft.Extensions.Options" Version="3.1.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="3.1.0" />
<PackageReference Include="System.IO.Pipelines" Version="4.7.0" />
<PackageReference Include="System.Threading.Channels" Version="4.7.0" />
</ItemGroup>

<ItemGroup>


+ 1
- 2
src/JT808.Gateway/JT808GatewayExtensions.cs Dosyayı Görüntüle

@@ -16,7 +16,7 @@ namespace JT808.Gateway
{
public static partial class JT808GatewayExtensions
{
public static IJT808GatewayBuilder AddJT808Gateway(this IJT808Builder jt808Builder)
public static IJT808GatewayBuilder AddJT808DevelopmentGateway(this IJT808Builder jt808Builder)
{
IJT808GatewayBuilder server = new JT808GatewayBuilderDefault(jt808Builder);
server.JT808Builder.Services.TryAddSingleton<JT808Configuration>();
@@ -62,7 +62,6 @@ namespace JT808.Gateway

private static IJT808GatewayBuilder AddJT808Core(this IJT808GatewayBuilder config)
{
config.JT808Builder.Services.TryAddSingleton<JT808Configuration>();
config.JT808Builder.Services.TryAddSingleton<JT808AtomicCounterServiceFactory>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgProducer, JT808MsgProducerDefault>();
config.JT808Builder.Services.TryAddSingleton<IJT808MsgReplyConsumer, JT808MsgReplyConsumerDefault>();


Yükleniyor…
İptal
Kaydet