Parcourir la source

增加netty的队列demo

tags/pipeline-1.1.0
SmallChi(Koike) il y a 4 ans
Parent
révision
46269c31d0
15 fichiers modifiés avec 244 ajouts et 19 suppressions
  1. +1
    -1
      simples/JT808.DotNetty.SimpleClient/Program.cs
  2. +29
    -1
      simples/JT808.DotNetty.SimpleClient/Services/UpService.cs
  3. +22
    -0
      simples/JT808.DotNetty.SimpleQueueServer/JT808.DotNetty.SimpleQueueServer.csproj
  4. +44
    -0
      simples/JT808.DotNetty.SimpleQueueServer/Program.cs
  5. +25
    -0
      simples/JT808.DotNetty.SimpleQueueServer/appsettings.json
  6. +24
    -0
      simples/JT808.DotNetty.SimpleQueueService/JT808.DotNetty.SimpleQueueService.csproj
  7. +39
    -0
      simples/JT808.DotNetty.SimpleQueueService/Program.cs
  8. +25
    -0
      simples/JT808.DotNetty.SimpleQueueService/appsettings.json
  9. +3
    -1
      simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs
  10. +14
    -0
      simples/JT808.Simples.sln
  11. +1
    -1
      src/JT808.DotNetty.Client/JT808TcpClient.cs
  12. +1
    -1
      src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808DotNettyMsgIdHandlerDefaultImpl.cs
  13. +1
    -0
      src/JT808.DotNetty.Tcp/Handlers/JT808TcpConnectionHandler.cs
  14. +1
    -0
      src/JT808.DotNetty.Tcp/JT808TcpServerHost.cs
  15. +14
    -14
      src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs

+ 1
- 1
simples/JT808.DotNetty.SimpleClient/Program.cs Voir le fichier

@@ -33,7 +33,7 @@ namespace JT808.DotNetty.SimpleClient
services.AddJT808Configure()
.AddJT808Client();
services.AddHostedService<UpService>();
services.AddHostedService<Up2019Service>();
//services.AddHostedService<Up2019Service>();
});
await serverHostBuilder.RunConsoleAsync();
}


+ 29
- 1
simples/JT808.DotNetty.SimpleClient/Services/UpService.cs Voir le fichier

@@ -22,8 +22,10 @@ namespace JT808.DotNetty.SimpleClient.Services

public Task StartAsync(CancellationToken cancellationToken)
{
string sim = "11111111111";
string sim = "11111111111";
JT808TcpClient client1 = jT808TcpClientFactory.Create(new JT808DeviceConfig(sim, "127.0.0.1", 808));
string sim2 = "33333333333";
JT808TcpClient client2 = jT808TcpClientFactory.Create(new JT808DeviceConfig(sim2, "127.0.0.1", 808));
Thread.Sleep(5000);
//1.终端注册
client1.Send(JT808MsgId.终端注册.Create(sim, new JT808_0x0100()
@@ -36,11 +38,26 @@ namespace JT808.DotNetty.SimpleClient.Services
TerminalId = "Koike001",
TerminalModel = "Koike001"
}));
client2.Send(JT808MsgId.终端注册.Create(sim2, new JT808_0x0100()
{
PlateNo = "粤A12345",
PlateColor = 2,
AreaID = 0,
CityOrCountyId = 0,
MakerId = "Koike001",
TerminalId = "Koike001",
TerminalModel = "Koike001"
}));
//2.终端鉴权
client1.Send(JT808MsgId.终端鉴权.Create(sim, new JT808_0x0102()
{
Code = "1234"
}));
//2.终端鉴权
client2.Send(JT808MsgId.终端鉴权.Create(sim2, new JT808_0x0102()
{
Code = "1234"
}));
Task.Run(() => {
while (true)
{
@@ -57,6 +74,17 @@ namespace JT808.DotNetty.SimpleClient.Services
Altitude = 50,
StatusFlag = 10
}));
client2.Send(JT808MsgId.位置信息汇报.Create(sim2, new JT808_0x0200()
{
Lat = 110000 + i,
Lng = 100000 + i,
GPSTime = DateTime.Now,
Speed = 50,
Direction = 30,
AlarmFlag = 5,
Altitude = 50,
StatusFlag = 10
}));
i++;
Thread.Sleep(5000);
}


+ 22
- 0
simples/JT808.DotNetty.SimpleQueueServer/JT808.DotNetty.SimpleQueueServer.csproj Voir le fichier

@@ -0,0 +1,22 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="JT808.DotNetty.Kafka" Version="2.3.2" />
<PackageReference Include="JT808.DotNetty.Tcp" Version="2.3.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.4" />
<PackageReference Include="WebApiClient.Extensions.DependencyInjection" Version="2.0.3" />
</ItemGroup>
</Project>

+ 44
- 0
simples/JT808.DotNetty.SimpleQueueServer/Program.cs Voir le fichier

@@ -0,0 +1,44 @@
using JT808.DotNetty.Core;
using JT808.DotNetty.Tcp;
using JT808.DotNetty.Kafka;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT808.DotNetty.SimpleQueueServer
{
class Program
{
static async Task Main(string[] args)
{
var serverHostBuilder = new HostBuilder()
.ConfigureAppConfiguration((hostingContext, config) =>
{
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
})
.ConfigureLogging((context, logging) =>
{
logging.AddConsole();
logging.SetMinimumLevel(LogLevel.Trace);
})
.ConfigureServices((hostContext, services) =>
{
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808NettyCore(hostContext.Configuration)
.AddJT808TcpNettyHost()
.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
.Builder();
});

await serverHostBuilder.RunConsoleAsync();
}
}
}

+ 25
- 0
simples/JT808.DotNetty.SimpleQueueServer/appsettings.json Voir le fichier

@@ -0,0 +1,25 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
},
"JT808MsgProducerConfig": {
"TopicName": "JT808NettyMsg",
"BootstrapServers": "127.0.0.1:9092"
},
"JT808MsgReplyConsumerConfig": {
"TopicName": "JT808NettyMsgReply",
"EnableAutoCommit": true,
"GroupId": "JT808.NettyMsgReply",
"BootstrapServers": "127.0.0.1:9092"
}
}

+ 24
- 0
simples/JT808.DotNetty.SimpleQueueService/JT808.DotNetty.SimpleQueueService.csproj Voir le fichier

@@ -0,0 +1,24 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>netcoreapp3.1</TargetFramework>
</PropertyGroup>

<ItemGroup>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>
<ItemGroup>
<PackageReference Include="JT808.DotNetty.Kafka" Version="2.3.2" />
<PackageReference Include="JT808.DotNetty.MsgIdHandler" Version="2.3.2" />
<PackageReference Include="JT808.DotNetty.ReplyMessage" Version="2.3.2" />
<PackageReference Include="JT808.DotNetty.Tcp" Version="2.3.2" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Hosting" Version="3.1.4" />
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="3.1.4" />
<PackageReference Include="WebApiClient.Extensions.DependencyInjection" Version="2.0.3" />
</ItemGroup>
</Project>

+ 39
- 0
simples/JT808.DotNetty.SimpleQueueService/Program.cs Voir le fichier

@@ -0,0 +1,39 @@
using JT808.DotNetty.Kafka;
using JT808.DotNetty.ReplyMessage;
using JT808.Protocol;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Threading.Tasks;

namespace JT808.DotNetty.SimpleQueueService
{
class Program
{
static async Task Main(string[] args)
{
var hostBuilder = new HostBuilder()
.ConfigureAppConfiguration((hostContext, config) => {
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
})
.ConfigureLogging((hostContext, configLogging) => {
configLogging.AddConsole();
configLogging.SetMinimumLevel(LogLevel.Trace);
})
.ConfigureServices((hostContext, services) => {
services.AddSingleton<ILoggerFactory, LoggerFactory>();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT808Configure()
.AddJT808ClientKafka()
.AddMsgReplyConsumer(hostContext.Configuration)
.AddMsgReplyConsumer(hostContext.Configuration)
;
});

await hostBuilder.RunConsoleAsync();
}
}
}

+ 25
- 0
simples/JT808.DotNetty.SimpleQueueService/appsettings.json Voir le fichier

@@ -0,0 +1,25 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
},
"JT808MsgConsumerConfig": {
"TopicName": "JT808NettyMsg",
"EnableAutoCommit": true,
"GroupId": "JT808.NettyMsgHandler",
"BootstrapServers": "127.0.0.1:9092"
},
"JT808MsgReplyProducerConfig": {
"TopicName": "JT808NettyMsgReply",
"BootstrapServers": "127.0.0.1:9092"
}
}

+ 3
- 1
simples/JT808.Gateway.SimpleServer/Impl/JT808NormalReplyMessageHandlerImpl.cs Voir le fichier

@@ -3,6 +3,7 @@ using JT808.Gateway.MsgLogging;
using JT808.Gateway.Traffic;
using JT808.Gateway.Transmit;
using JT808.Protocol;
using JT808.Protocol.Extensions;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
@@ -68,7 +69,8 @@ namespace JT808.Gateway.SimpleServer.Impl
/// <param name="session"></param>
public override byte[] Msg0x0200(JT808HeaderPackage request, IJT808Session session)
{
logger.LogDebug("重写自带Msg0x0200的消息");
//logger.LogDebug("重写自带Msg0x0200的消息");
logger.LogDebug($"重写自带Msg0x0200的消息{request.Header.TerminalPhoneNo}-{request.OriginalData.ToArray().ToHexString()}");
return base.Msg0x0200(request, session);
}



+ 14
- 0
simples/JT808.Simples.sln Voir le fichier

@@ -19,6 +19,10 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueSe
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.Gateway.SimpleQueueNotification", "JT808.Gateway.SimpleQueueNotification\JT808.Gateway.SimpleQueueNotification.csproj", "{163D2EE2-9A62-4E8A-B203-BF147909E89A}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.SimpleQueueServer", "JT808.DotNetty.SimpleQueueServer\JT808.DotNetty.SimpleQueueServer.csproj", "{1DEAC7EA-D662-420B-A1A7-A6E840568F7B}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.SimpleQueueService", "JT808.DotNetty.SimpleQueueService\JT808.DotNetty.SimpleQueueService.csproj", "{90E1F1C9-A953-4341-9792-9E2AF4471B68}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -53,6 +57,14 @@ Global
{163D2EE2-9A62-4E8A-B203-BF147909E89A}.Debug|Any CPU.Build.0 = Debug|Any CPU
{163D2EE2-9A62-4E8A-B203-BF147909E89A}.Release|Any CPU.ActiveCfg = Release|Any CPU
{163D2EE2-9A62-4E8A-B203-BF147909E89A}.Release|Any CPU.Build.0 = Release|Any CPU
{1DEAC7EA-D662-420B-A1A7-A6E840568F7B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{1DEAC7EA-D662-420B-A1A7-A6E840568F7B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{1DEAC7EA-D662-420B-A1A7-A6E840568F7B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{1DEAC7EA-D662-420B-A1A7-A6E840568F7B}.Release|Any CPU.Build.0 = Release|Any CPU
{90E1F1C9-A953-4341-9792-9E2AF4471B68}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{90E1F1C9-A953-4341-9792-9E2AF4471B68}.Debug|Any CPU.Build.0 = Debug|Any CPU
{90E1F1C9-A953-4341-9792-9E2AF4471B68}.Release|Any CPU.ActiveCfg = Release|Any CPU
{90E1F1C9-A953-4341-9792-9E2AF4471B68}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -65,6 +77,8 @@ Global
{8594AC7F-18B4-439D-B58B-1CEFF0833A1A} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
{E2D1CFEF-417A-4C44-BC2E-E5A160602485} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
{163D2EE2-9A62-4E8A-B203-BF147909E89A} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
{1DEAC7EA-D662-420B-A1A7-A6E840568F7B} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
{90E1F1C9-A953-4341-9792-9E2AF4471B68} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B}


+ 1
- 1
src/JT808.DotNetty.Client/JT808TcpClient.cs Voir le fichier

@@ -42,7 +42,7 @@ namespace JT808.DotNetty.Client
bootstrap.Group(group);
bootstrap.Channel<TcpSocketChannel>();
bootstrap
.Option(ChannelOption.Allocator, new PooledByteBufferAllocator())
.Option(ChannelOption.Allocator, new UnpooledByteBufferAllocator())
.Handler(new ActionChannelInitializer<IChannel>(channel =>
{
channel.Pipeline.AddLast("jt808TcpBuffer", new DelimiterBasedFrameDecoder(65535,


+ 1
- 1
src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808DotNettyMsgIdHandlerDefaultImpl.cs Voir le fichier

@@ -16,7 +16,7 @@ namespace JT808.DotNetty.MsgIdHandler.Test
{
public readonly ILogger<JT808DotNettyMsgIdHandlerDefaultImpl> logger;
public JT808DotNettyMsgIdHandlerDefaultImpl(ILoggerFactory loggerFactory,
IServiceProvider serviceProvider) {
IServiceProvider serviceProvider) {
logger = loggerFactory.CreateLogger<JT808DotNettyMsgIdHandlerDefaultImpl>();
Task.Run(()=> {
while (true)


+ 1
- 0
src/JT808.DotNetty.Tcp/Handlers/JT808TcpConnectionHandler.cs Voir le fichier

@@ -46,6 +46,7 @@ namespace JT808.DotNetty.Tcp.Handlers
string channelId = context.Channel.Id.AsShortText();
if (logger.IsEnabled(LogLevel.Debug))
logger.LogDebug($">>>{ channelId } The client disconnects from the server.");
base.ChannelInactive(context);
base.CloseAsync(context);
}



+ 1
- 0
src/JT808.DotNetty.Tcp/JT808TcpServerHost.cs Voir le fichier

@@ -47,6 +47,7 @@ namespace JT808.DotNetty.Tcp
bossGroup = new DispatcherEventLoopGroup();
workerGroup = new WorkerEventLoopGroup(bossGroup, configuration.EventLoopCount);
serverBufferAllocator = new PooledByteBufferAllocator();
//serverBufferAllocator = new UnpooledByteBufferAllocator();
ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.Group(bossGroup, workerGroup);
bootstrap.Channel<TcpServerChannel>();


+ 14
- 14
src/JT808.DotNetty.Tests/JT808.DotNetty.Hosting/Program.cs Voir le fichier

@@ -57,21 +57,21 @@ namespace JT808.DotNetty.Hosting
.AddJT808WebApiNettyHost()
//扩展webapi JT808MsgIdHttpHandlerBase
//.ReplaceMsgIdHandler<JT808MsgIdHttpCustomHandler>()
.Builder()
//添加kafka插件
.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
.Builder();
//添加kafka插件
//.AddJT808ServerKafkaMsgProducer(hostContext.Configuration)
//.AddJT808ServerKafkaMsgReplyConsumer(hostContext.Configuration)
//.AddJT808ServerKafkaSessionProducer(hostContext.Configuration)
//.Builder();
//使用微软自带的webapi客户端
//services.AddHttpClient("jt808webapi", c =>
//{
// c.BaseAddress = new Uri("http://localhost:828/");
// c.DefaultRequestHeaders.Add("token", "123456);
//})
//.AddTypedClient<JT808HttpClient>();
//var client = services.BuildServiceProvider().GetRequiredService<JT808HttpClient>();
//var result = client.GetTcpAtomicCounter();
//使用微软自带的webapi客户端
//services.AddHttpClient("jt808webapi", c =>
//{
// c.BaseAddress = new Uri("http://localhost:828/");
// c.DefaultRequestHeaders.Add("token", "123456);
//})
//.AddTypedClient<JT808HttpClient>();
//var client = services.BuildServiceProvider().GetRequiredService<JT808HttpClient>();
//var result = client.GetTcpAtomicCounter();
});

await serverHostBuilder.RunConsoleAsync();


Chargement…
Annuler
Enregistrer