diff --git a/README.md b/README.md
index a0bf4c8..df0dc98 100644
--- a/README.md
+++ b/README.md
@@ -4,7 +4,7 @@
## 作为上级平台(企业对企业)
-目前只需要的是实时上传车辆定位信息、车辆定位信息自动补报。
+目前只需要的是实时上传车辆定位信息。
> 注意:有些企业协议按照国标,但是链路没有遵循,所以企业对企业对接数据需要兼容不需要从链路的情况。
diff --git a/doc/img/superior_dataflow.png b/doc/img/superior_dataflow.png
new file mode 100644
index 0000000..0cce8b9
Binary files /dev/null and b/doc/img/superior_dataflow.png differ
diff --git a/doc/img/superior_demo.png b/doc/img/superior_demo.png
new file mode 100644
index 0000000..6812186
Binary files /dev/null and b/doc/img/superior_demo.png differ
diff --git a/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs b/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs
index 2044d14..1a8802f 100644
--- a/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs
+++ b/src/JT809.DotNetty.Core/Handlers/JT809MainServerHandler.cs
@@ -40,7 +40,7 @@ namespace JT809.DotNetty.Core.Handlers
}
- protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
+ protected override async void ChannelRead0(IChannelHandlerContext ctx, byte[] msg)
{
try
{
@@ -58,7 +58,7 @@ namespace JT809.DotNetty.Core.Handlers
if (jT808Response != null)
{
var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize);
- ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData));
+ await ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData));
}
}
}
diff --git a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs
index 42b33fa..81444b4 100644
--- a/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs
+++ b/src/JT809.DotNetty.Core/JT809CoreDotnettyExtensions.cs
@@ -92,39 +92,6 @@ namespace JT809.DotNetty.Core
return serviceDescriptors;
}
- ///
- /// 上级平台
- /// 主链路为服务端
- /// 从链路为客户端
- ///
- ///
- ///
- public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action options)
- {
- serviceDescriptors.Configure(options);
- serviceDescriptors.TryAddSingleton();
- //主从链路客户端和服务端连接处理器
- serviceDescriptors.TryAddScoped();
- serviceDescriptors.TryAddScoped();
- //主链路服务端会话管理
- serviceDescriptors.TryAddSingleton();
- //主从链路接收消息默认业务处理器
- serviceDescriptors.TryAddSingleton();
- //主从链路消息接收处理器
- serviceDescriptors.TryAddScoped();
- serviceDescriptors.TryAddScoped();
- serviceDescriptors.TryAddSingleton();
- serviceDescriptors.TryAddSingleton();
- //从链路客户端
- serviceDescriptors.TryAddSingleton();
- //主链路服务端
- serviceDescriptors.AddHostedService();
- //上级平台webapi
- serviceDescriptors.TryAddSingleton();
- serviceDescriptors.TryAddScoped();
- serviceDescriptors.AddHostedService();
- return serviceDescriptors;
- }
///
/// 上级平台
@@ -133,9 +100,16 @@ namespace JT809.DotNetty.Core
///
///
///
- public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors, IConfiguration superiorPlatformConfiguration)
+ public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors, IConfiguration superiorPlatformConfiguration=null, Action options=null)
{
- serviceDescriptors.Configure(superiorPlatformConfiguration.GetSection("JT809SuperiorPlatformConfiguration"));
+ if (superiorPlatformConfiguration != null)
+ {
+ serviceDescriptors.Configure(superiorPlatformConfiguration.GetSection("JT809SuperiorPlatformConfiguration"));
+ }
+ if (options != null)
+ {
+ serviceDescriptors.Configure(options);
+ }
serviceDescriptors.TryAddSingleton();
//主从链路客户端和服务端连接处理器
serviceDescriptors.TryAddScoped();
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs
new file mode 100644
index 0000000..7f32ab2
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/GpsConsumerService.cs
@@ -0,0 +1,49 @@
+using JT809.GrpcProtos;
+using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Newtonsoft.Json;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT809.GpsConsumer
+{
+ public class GpsConsumerService : IHostedService
+ {
+ private readonly IJT808ConsumerOfT jT808Consumer;
+
+ private readonly ILogger logger;
+
+ public GpsConsumerService(
+ ILoggerFactory loggerFactory,
+ IJT808ConsumerOfT jT808Consumer)
+ {
+ this.jT808Consumer = jT808Consumer;
+ logger = loggerFactory.CreateLogger();
+ }
+ public Task StartAsync(CancellationToken cancellationToken)
+ {
+ logger.LogDebug("StartAsync ...");
+ jT808Consumer.OnMessage((Message) =>
+ {
+ //处理数据来源FromChannel
+ //处理入库
+ //处理缓存
+ //...
+ logger.LogDebug($"Receive MsgId:{Message.MsgId}");
+ logger.LogDebug($"Receive Data:{JsonConvert.SerializeObject(Message.Data)}");
+ });
+ return Task.CompletedTask;
+ }
+
+ public Task StopAsync(CancellationToken cancellationToken)
+ {
+ jT808Consumer.Unsubscribe();
+ logger.LogDebug("StopAsync ...");
+ return Task.CompletedTask;
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/JT809.GpsConsumer.csproj b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/JT809.GpsConsumer.csproj
new file mode 100644
index 0000000..41116a8
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/JT809.GpsConsumer.csproj
@@ -0,0 +1,26 @@
+
+
+
+ Exe
+ netcoreapp2.2
+
+
+
+
+ Always
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs
new file mode 100644
index 0000000..3f35374
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/Program.cs
@@ -0,0 +1,37 @@
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
+using JT809.KafkaService;
+
+namespace JT809.GpsConsumer
+{
+ class Program
+ {
+ static async Task Main(string[] args)
+ {
+ var serverHostBuilder = new HostBuilder()
+ .ConfigureAppConfiguration((hostingContext, config) =>
+ {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
+ })
+ .ConfigureLogging((context, logging) =>
+ {
+ logging.AddConsole();
+ logging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) =>
+ {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT809KafkaConsumerPartitionsService(hostContext.Configuration, options => options.Partition = 10);
+ services.AddHostedService();
+ });
+
+ await serverHostBuilder.RunConsoleAsync();
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/appsettings.json b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/appsettings.json
new file mode 100644
index 0000000..f19b98a
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.GpsConsumer/appsettings.json
@@ -0,0 +1,19 @@
+{
+ "Logging": {
+ "IncludeScopes": false,
+ "Debug": {
+ "LogLevel": {
+ "Default": "Trace"
+ }
+ },
+ "Console": {
+ "LogLevel": {
+ "Default": "Trace"
+ }
+ }
+ },
+ "KafkaConsumerConfig": {
+ "BootstrapServers": "127.0.0.1:9092",
+ "EnableAutoCommit": true
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
index 2b45769..17d1da7 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
@@ -17,11 +17,11 @@ namespace JT809.KafkaService
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig"));
- serviceDescriptors.AddSingleton(typeof(JT809Producer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => {
var producerConfig = service.GetRequiredService>();
return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig);
});
- serviceDescriptors.AddSingleton(typeof(JT809Producer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => {
var producerConfig = service.GetRequiredService>();
return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig);
});
@@ -33,13 +33,13 @@ namespace JT809.KafkaService
serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig"));
serviceDescriptors.Configure(action);
serviceDescriptors.AddSingleton();
- serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => {
var producerConfig = service.GetRequiredService>();
var producerPartitionFactory = service.GetRequiredService();
var partitionOptions = service.GetRequiredService>();
return new JT809_Same_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.same" }, producerConfig, producerPartitionFactory, partitionOptions);
});
- serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT), (service) => {
var producerConfig = service.GetRequiredService>();
var producerPartitionFactory = service.GetRequiredService();
var partitionOptions = service.GetRequiredService>();
@@ -51,13 +51,13 @@ namespace JT809.KafkaService
public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig"));
- serviceDescriptors.AddSingleton(typeof(JT809Consumer), (service)=> {
+ serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service)=> {
var loggerFactory = service.GetRequiredService();
var consumerConfig = service.GetRequiredService>();
consumerConfig.Value.GroupId = "JT809.same.Test";
return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory);
});
- serviceDescriptors.AddSingleton(typeof(JT809Consumer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service) => {
var loggerFactory = service.GetRequiredService();
var consumerConfig = service.GetRequiredService>();
consumerConfig.Value.GroupId = "JT809.gps.Test";
@@ -70,14 +70,14 @@ namespace JT809.KafkaService
{
serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig"));
serviceDescriptors.Configure(action);
- serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service) => {
var loggerFactory = service.GetRequiredService();
var consumerConfig = service.GetRequiredService>();
var partitionOptions = service.GetRequiredService>();
consumerConfig.Value.GroupId = "JT809.partition.same.Test";
return new JT809_Same_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.same" } , loggerFactory);
});
- serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer), (service) => {
+ serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT), (service) => {
var loggerFactory = service.GetRequiredService();
var consumerConfig = service.GetRequiredService>();
var partitionOptions = service.GetRequiredService>();
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
index 04b5ef6..8e386af 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
@@ -1,5 +1,6 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
+using JT809.PubSub.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ConsumerTestPartitionService
{
- public JT809PartitionConsumer SameConsumer { get; }
- public JT809PartitionConsumer GpsConsumer { get; }
- public ConsumerTestPartitionService(JT809PartitionConsumer sameConsumer, JT809PartitionConsumer gpsConsumer)
+ public IJT808ConsumerOfT SameConsumer { get; }
+ public IJT808ConsumerOfT GpsConsumer { get; }
+ public ConsumerTestPartitionService(IJT808ConsumerOfT sameConsumer, IJT808ConsumerOfT gpsConsumer)
{
SameConsumer = sameConsumer;
GpsConsumer = gpsConsumer;
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs
index 968e877..f657756 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs
@@ -1,5 +1,6 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
+using JT809.PubSub.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ConsumerTestService
{
- public JT809Consumer SameConsumer { get; }
- public JT809Consumer GpsConsumer { get; }
- public ConsumerTestService(JT809Consumer sameConsumer, JT809Consumer gpsConsumer)
+ public IJT808ConsumerOfT SameConsumer { get; }
+ public IJT808ConsumerOfT GpsConsumer { get; }
+ public ConsumerTestService(IJT808ConsumerOfT sameConsumer, IJT808ConsumerOfT gpsConsumer)
{
SameConsumer = sameConsumer;
GpsConsumer = gpsConsumer;
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
index 90e9759..c148ed0 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
@@ -1,5 +1,6 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
+using JT809.PubSub.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ProducerTestPartitionService
{
- public JT809PartitionProducer SameProducer { get; }
- public JT809PartitionProducer GpsProducer { get; }
- public ProducerTestPartitionService(JT809PartitionProducer sameProducer, JT809PartitionProducer gpsProducer)
+ public IJT809ProducerOfT SameProducer { get; }
+ public IJT809ProducerOfT GpsProducer { get; }
+ public ProducerTestPartitionService(IJT809ProducerOfT sameProducer, IJT809ProducerOfT gpsProducer)
{
SameProducer = sameProducer;
GpsProducer = gpsProducer;
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs
index 7e9a479..ff04f26 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs
@@ -1,5 +1,6 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
+using JT809.PubSub.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest
{
public class ProducerTestService
{
- public JT809Producer SameProducer { get; }
- public JT809Producer GpsProducer { get; }
- public ProducerTestService(JT809Producer sameProducer, JT809Producer gpsProducer)
+ public IJT809ProducerOfT SameProducer { get; }
+ public IJT809ProducerOfT GpsProducer { get; }
+ public ProducerTestService(IJT809ProducerOfT sameProducer, IJT809ProducerOfT gpsProducer)
{
SameProducer = sameProducer;
GpsProducer = gpsProducer;
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Configs/JT809GpsOptions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Configs/JT809GpsOptions.cs
new file mode 100644
index 0000000..d3e94fc
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Configs/JT809GpsOptions.cs
@@ -0,0 +1,13 @@
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.Superior.Server.Configs
+{
+ public class JT809GpsOptions : IOptions
+ {
+ public string FromChannel { get; set; }
+ public JT809GpsOptions Value =>this;
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809.Superior.Server.csproj b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809.Superior.Server.csproj
new file mode 100644
index 0000000..d81d19b
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809.Superior.Server.csproj
@@ -0,0 +1,24 @@
+
+
+
+ Exe
+ netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
+
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs
new file mode 100644
index 0000000..4ef3f99
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/JT809SuperiorMsgIdReceiveHandler.cs
@@ -0,0 +1,62 @@
+using JT809.DotNetty.Core.Handlers;
+using JT809.DotNetty.Core.Interfaces;
+using JT809.DotNetty.Core.Metadata;
+using JT809.GrpcProtos;
+using JT809.Protocol;
+using JT809.Protocol.SubMessageBody;
+using JT809.PubSub.Abstractions;
+using JT809.Superior.Server.Configs;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.Superior.Server
+{
+ public sealed class JT809SuperiorMsgIdReceiveHandler : JT809SuperiorMsgIdReceiveHandlerBase
+ {
+ private readonly IJT809ProducerOfT producer;
+ private readonly JT809GpsOptions gpsOptions;
+ public JT809SuperiorMsgIdReceiveHandler(
+ IOptionsjt809GpsAccessor,
+ IJT809ProducerOfT producer,
+ ILoggerFactory loggerFactory,
+ IJT809SubordinateLoginService jT809SubordinateLoginService,
+ IJT809VerifyCodeGenerator verifyCodeGenerator)
+ : base(loggerFactory, jT809SubordinateLoginService, verifyCodeGenerator)
+ {
+ this.producer = producer;
+ this.gpsOptions = jt809GpsAccessor.Value;
+ }
+
+ public override JT809Response Msg0x1200_0x1202(JT809Request request)
+ {
+ var exchangeMessageBodies = request.Package.Bodies as JT809ExchangeMessageBodies;
+ var gpsBodies = exchangeMessageBodies.SubBodies as JT809_0x1200_0x1202;
+ JT809GpsPosition gpsPosition = new JT809GpsPosition();
+ gpsPosition.Vno = exchangeMessageBodies.VehicleNo;
+ gpsPosition.VColor = (byte)exchangeMessageBodies.VehicleColor;
+ gpsPosition.Alarm = (int)gpsBodies.VehiclePosition.Alarm;
+ gpsPosition.Altitude = gpsBodies.VehiclePosition.Altitude;
+ gpsPosition.Direction = gpsBodies.VehiclePosition.Direction;
+ gpsPosition.Encrypt = (byte)gpsBodies.VehiclePosition.Encrypt;
+ gpsPosition.State = (int)gpsBodies.VehiclePosition.State;
+ gpsPosition.Lat = gpsBodies.VehiclePosition.Lat;
+ gpsPosition.Lon= gpsBodies.VehiclePosition.Lon;
+ gpsPosition.Vec1 = gpsBodies.VehiclePosition.Vec1;
+ gpsPosition.Vec2 = gpsBodies.VehiclePosition.Vec2;
+ gpsPosition.Vec3 =(int)gpsBodies.VehiclePosition.Vec3;
+ gpsPosition.GpsTime = (new DateTime(
+ gpsBodies.VehiclePosition.Year,
+ gpsBodies.VehiclePosition.Month,
+ gpsBodies.VehiclePosition.Day,
+ gpsBodies.VehiclePosition.Hour,
+ gpsBodies.VehiclePosition.Minute,
+ gpsBodies.VehiclePosition.Second).ToUniversalTime().Ticks - 621355968000000000) / 10000000;
+ gpsPosition.FromChannel = gpsOptions.FromChannel;
+ producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition);
+ return base.Msg0x1200_0x1202(request);
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs
new file mode 100644
index 0000000..be81401
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/Program.cs
@@ -0,0 +1,50 @@
+using JT809.DotNetty.Core;
+using JT809.DotNetty.Core.Configurations;
+using JT809.DotNetty.Core.Handlers;
+using JT809.Protocol.Configs;
+using JT809.Superior.Server.Configs;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.DependencyInjection.Extensions;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
+using JT809.KafkaService;
+
+namespace JT809.Superior.Server
+{
+ class Program
+ {
+ static async Task Main(string[] args)
+ {
+
+ //5B0000005A02000000A81200013414CE010000000000270FD4C14131323334353600000000000000000000000002120200000024000E0407E316130F07EF4D80017018400032003300000096002D002D0000000300000000E08F5D
+ var serverHostBuilder = new HostBuilder()
+ .ConfigureAppConfiguration((hostingContext, config) =>
+ {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true);
+ })
+ .ConfigureLogging((context, logging) =>
+ {
+ logging.AddConsole();
+ logging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) =>
+ {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT809Core(hostContext.Configuration)
+ .AddJT809SuperiorPlatform(options:options => {
+ options.TcpPort = 808;
+ });
+ services.Configure(hostContext.Configuration.GetSection("JT809GpsOptions"));
+ services.AddJT809KafkaProducerPartitionsService(hostContext.Configuration,options=> options.Partition=10);
+ services.Replace(new ServiceDescriptor(typeof(JT809SuperiorMsgIdReceiveHandlerBase), typeof(JT809SuperiorMsgIdReceiveHandler), ServiceLifetime.Singleton));
+ });
+
+ await serverHostBuilder.RunConsoleAsync();
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/appsettings.json b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/appsettings.json
new file mode 100644
index 0000000..b548f32
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.Superior.Server/appsettings.json
@@ -0,0 +1,24 @@
+{
+ "Logging": {
+ "IncludeScopes": false,
+ "Debug": {
+ "LogLevel": {
+ "Default": "Trace"
+ }
+ },
+ "Console": {
+ "LogLevel": {
+ "Default": "Trace"
+ }
+ }
+ },
+ "JT809Configuration": {
+ "SubordinateClientEnable": false
+ },
+ "KafkaProducerConfig": {
+ "BootstrapServers": "127.0.0.1:9092"
+ },
+ "JT809GpsOptions": {
+ "FromChannel": "test"
+ }
+}
diff --git a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs
index d401b7d..e82bdf9 100644
--- a/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs
+++ b/src/JT809.DotNetty.Tests/JT809.DotNetty.Host.Test/Program.cs
@@ -45,7 +45,7 @@ namespace JT809.DotNetty.Host.Test
services.AddSingleton();
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
services.AddJT809Core(hostContext.Configuration)
- .AddJT809SuperiorPlatform(options=> {
+ .AddJT809SuperiorPlatform(options:options=> {
options.TcpPort = 839;
});
});
diff --git a/src/JT809.DotNetty.sln b/src/JT809.DotNetty.sln
index 93eb1fa..b7c4c2b 100644
--- a/src/JT809.DotNetty.sln
+++ b/src/JT809.DotNetty.sln
@@ -25,7 +25,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.GrpcProtos", "JT809.D
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}"
EndProject
-Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}"
+Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.Superior.Server", "JT809.DotNetty.Simples\Superior\JT809.Superior.Server\JT809.Superior.Server.csproj", "{8620735D-FBD5-4832-882F-A2F607DC6861}"
+EndProject
+Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.GpsConsumer", "JT809.DotNetty.Simples\Superior\JT809.GpsConsumer\JT809.GpsConsumer.csproj", "{FBC06008-6F18-4CC3-B7C2-5B476317F92D}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
@@ -65,6 +69,14 @@ Global
{22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.Build.0 = Release|Any CPU
+ {8620735D-FBD5-4832-882F-A2F607DC6861}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {8620735D-FBD5-4832-882F-A2F607DC6861}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {8620735D-FBD5-4832-882F-A2F607DC6861}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {8620735D-FBD5-4832-882F-A2F607DC6861}.Release|Any CPU.Build.0 = Release|Any CPU
+ {FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -77,6 +89,8 @@ Global
{D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{22F008D5-61F8-4889-80DB-91B37591322F} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
+ {8620735D-FBD5-4832-882F-A2F607DC6861} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
+ {FBC06008-6F18-4CC3-B7C2-5B476317F92D} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D}