@@ -4,7 +4,7 @@ | |||||
## 作为上级平台(企业对企业) | ## 作为上级平台(企业对企业) | ||||
目前只需要的是实时上传车辆定位信息、车辆定位信息自动补报。 | |||||
目前只需要的是实时上传车辆定位信息。 | |||||
> 注意:有些企业协议按照国标,但是链路没有遵循,所以企业对企业对接数据需要兼容不需要从链路的情况。 | > 注意:有些企业协议按照国标,但是链路没有遵循,所以企业对企业对接数据需要兼容不需要从链路的情况。 | ||||
@@ -40,7 +40,7 @@ namespace JT809.DotNetty.Core.Handlers | |||||
} | } | ||||
protected override void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) | |||||
protected override async void ChannelRead0(IChannelHandlerContext ctx, byte[] msg) | |||||
{ | { | ||||
try | try | ||||
{ | { | ||||
@@ -58,7 +58,7 @@ namespace JT809.DotNetty.Core.Handlers | |||||
if (jT808Response != null) | if (jT808Response != null) | ||||
{ | { | ||||
var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize); | var sendData = JT809Serializer.Serialize(jT808Response.Package, jT808Response.MinBufferSize); | ||||
ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData)); | |||||
await ctx.WriteAndFlushAsync(Unpooled.WrappedBuffer(sendData)); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -92,39 +92,6 @@ namespace JT809.DotNetty.Core | |||||
return serviceDescriptors; | return serviceDescriptors; | ||||
} | } | ||||
/// <summary> | |||||
/// 上级平台 | |||||
/// 主链路为服务端 | |||||
/// 从链路为客户端 | |||||
/// </summary> | |||||
/// <param name="serviceDescriptors"></param> | |||||
/// <returns></returns> | |||||
public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors,Action<JT809SuperiorPlatformOptions> options) | |||||
{ | |||||
serviceDescriptors.Configure(options); | |||||
serviceDescriptors.TryAddSingleton<IJT809VerifyCodeGenerator, JT809VerifyCodeGeneratorDefaultImpl>(); | |||||
//主从链路客户端和服务端连接处理器 | |||||
serviceDescriptors.TryAddScoped<JT809MainServerConnectionHandler>(); | |||||
serviceDescriptors.TryAddScoped<JT809SubordinateClientConnectionHandler>(); | |||||
//主链路服务端会话管理 | |||||
serviceDescriptors.TryAddSingleton<JT809SuperiorMainSessionManager>(); | |||||
//主从链路接收消息默认业务处理器 | |||||
serviceDescriptors.TryAddSingleton<JT809SuperiorMsgIdReceiveHandlerBase, JT809SuperiorMsgIdReceiveDefaultHandler>(); | |||||
//主从链路消息接收处理器 | |||||
serviceDescriptors.TryAddScoped<JT809MainServerHandler>(); | |||||
serviceDescriptors.TryAddScoped<JT809SubordinateClientHandler>(); | |||||
serviceDescriptors.TryAddSingleton<IJT809SubordinateLoginService, JT809SubordinateLoginImplService>(); | |||||
serviceDescriptors.TryAddSingleton<IJT809SubordinateLinkNotifyService, JT809SubordinateLinkNotifyImplService>(); | |||||
//从链路客户端 | |||||
serviceDescriptors.TryAddSingleton<JT809SubordinateClient>(); | |||||
//主链路服务端 | |||||
serviceDescriptors.AddHostedService<JT809MainServerHost>(); | |||||
//上级平台webapi | |||||
serviceDescriptors.TryAddSingleton<JT809SuperiorWebAPIHandlerBase, JT809SuperiorWebAPIDefaultHandler>(); | |||||
serviceDescriptors.TryAddScoped<JT809SuperiorWebAPIServerHandler>(); | |||||
serviceDescriptors.AddHostedService<JT809MainWebAPIServerHost>(); | |||||
return serviceDescriptors; | |||||
} | |||||
/// <summary> | /// <summary> | ||||
/// 上级平台 | /// 上级平台 | ||||
@@ -133,9 +100,16 @@ namespace JT809.DotNetty.Core | |||||
/// </summary> | /// </summary> | ||||
/// <param name="serviceDescriptors"></param> | /// <param name="serviceDescriptors"></param> | ||||
/// <returns></returns> | /// <returns></returns> | ||||
public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors, IConfiguration superiorPlatformConfiguration) | |||||
public static IServiceCollection AddJT809SuperiorPlatform(this IServiceCollection serviceDescriptors, IConfiguration superiorPlatformConfiguration=null, Action<JT809SuperiorPlatformOptions> options=null) | |||||
{ | { | ||||
serviceDescriptors.Configure<JT809SuperiorPlatformOptions>(superiorPlatformConfiguration.GetSection("JT809SuperiorPlatformConfiguration")); | |||||
if (superiorPlatformConfiguration != null) | |||||
{ | |||||
serviceDescriptors.Configure<JT809SuperiorPlatformOptions>(superiorPlatformConfiguration.GetSection("JT809SuperiorPlatformConfiguration")); | |||||
} | |||||
if (options != null) | |||||
{ | |||||
serviceDescriptors.Configure(options); | |||||
} | |||||
serviceDescriptors.TryAddSingleton<IJT809VerifyCodeGenerator, JT809VerifyCodeGeneratorDefaultImpl>(); | serviceDescriptors.TryAddSingleton<IJT809VerifyCodeGenerator, JT809VerifyCodeGeneratorDefaultImpl>(); | ||||
//主从链路客户端和服务端连接处理器 | //主从链路客户端和服务端连接处理器 | ||||
serviceDescriptors.TryAddScoped<JT809MainServerConnectionHandler>(); | serviceDescriptors.TryAddScoped<JT809MainServerConnectionHandler>(); | ||||
@@ -0,0 +1,49 @@ | |||||
using JT809.GrpcProtos; | |||||
using JT809.PubSub.Abstractions; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using Newtonsoft.Json; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using System.Threading.Tasks; | |||||
namespace JT809.GpsConsumer | |||||
{ | |||||
public class GpsConsumerService : IHostedService | |||||
{ | |||||
private readonly IJT808ConsumerOfT<JT809GpsPosition> jT808Consumer; | |||||
private readonly ILogger logger; | |||||
public GpsConsumerService( | |||||
ILoggerFactory loggerFactory, | |||||
IJT808ConsumerOfT<JT809GpsPosition> jT808Consumer) | |||||
{ | |||||
this.jT808Consumer = jT808Consumer; | |||||
logger = loggerFactory.CreateLogger<GpsConsumerService>(); | |||||
} | |||||
public Task StartAsync(CancellationToken cancellationToken) | |||||
{ | |||||
logger.LogDebug("StartAsync ..."); | |||||
jT808Consumer.OnMessage((Message) => | |||||
{ | |||||
//处理数据来源FromChannel | |||||
//处理入库 | |||||
//处理缓存 | |||||
//... | |||||
logger.LogDebug($"Receive MsgId:{Message.MsgId}"); | |||||
logger.LogDebug($"Receive Data:{JsonConvert.SerializeObject(Message.Data)}"); | |||||
}); | |||||
return Task.CompletedTask; | |||||
} | |||||
public Task StopAsync(CancellationToken cancellationToken) | |||||
{ | |||||
jT808Consumer.Unsubscribe(); | |||||
logger.LogDebug("StopAsync ..."); | |||||
return Task.CompletedTask; | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,26 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp2.2</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<None Update="appsettings.json"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" /> | |||||
<PackageReference Include="Newtonsoft.Json" Version="12.0.1" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\JT809.KafkaService\JT809.KafkaService.csproj" /> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,37 @@ | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using JT809.KafkaService; | |||||
namespace JT809.GpsConsumer | |||||
{ | |||||
class Program | |||||
{ | |||||
static async Task Main(string[] args) | |||||
{ | |||||
var serverHostBuilder = new HostBuilder() | |||||
.ConfigureAppConfiguration((hostingContext, config) => | |||||
{ | |||||
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||||
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); | |||||
}) | |||||
.ConfigureLogging((context, logging) => | |||||
{ | |||||
logging.AddConsole(); | |||||
logging.SetMinimumLevel(LogLevel.Trace); | |||||
}) | |||||
.ConfigureServices((hostContext, services) => | |||||
{ | |||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
services.AddJT809KafkaConsumerPartitionsService(hostContext.Configuration, options => options.Partition = 10); | |||||
services.AddHostedService<GpsConsumerService>(); | |||||
}); | |||||
await serverHostBuilder.RunConsoleAsync(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,19 @@ | |||||
{ | |||||
"Logging": { | |||||
"IncludeScopes": false, | |||||
"Debug": { | |||||
"LogLevel": { | |||||
"Default": "Trace" | |||||
} | |||||
}, | |||||
"Console": { | |||||
"LogLevel": { | |||||
"Default": "Trace" | |||||
} | |||||
} | |||||
}, | |||||
"KafkaConsumerConfig": { | |||||
"BootstrapServers": "127.0.0.1:9092", | |||||
"EnableAutoCommit": true | |||||
} | |||||
} |
@@ -17,11 +17,11 @@ namespace JT809.KafkaService | |||||
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | ||||
{ | { | ||||
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | ||||
serviceDescriptors.AddSingleton(typeof(JT809Producer<byte[]>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<byte[]>), (service) => { | |||||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | ||||
return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig); | return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig); | ||||
}); | }); | ||||
serviceDescriptors.AddSingleton(typeof(JT809Producer<JT809GpsPosition>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<JT809GpsPosition>), (service) => { | |||||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | ||||
return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig); | return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig); | ||||
}); | }); | ||||
@@ -33,13 +33,13 @@ namespace JT809.KafkaService | |||||
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | ||||
serviceDescriptors.Configure(action); | serviceDescriptors.Configure(action); | ||||
serviceDescriptors.AddSingleton<IJT809ProducerPartitionFactory, JT809GpsPositionProducerPartitionFactoryImpl>(); | serviceDescriptors.AddSingleton<IJT809ProducerPartitionFactory, JT809GpsPositionProducerPartitionFactoryImpl>(); | ||||
serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer<byte[]>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<byte[]>), (service) => { | |||||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | ||||
var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | ||||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | ||||
return new JT809_Same_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.same" }, producerConfig, producerPartitionFactory, partitionOptions); | return new JT809_Same_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.same" }, producerConfig, producerPartitionFactory, partitionOptions); | ||||
}); | }); | ||||
serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer<JT809GpsPosition>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerOfT<JT809GpsPosition>), (service) => { | |||||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | ||||
var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | ||||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | ||||
@@ -51,13 +51,13 @@ namespace JT809.KafkaService | |||||
public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | ||||
{ | { | ||||
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | ||||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<byte[]>), (service)=> { | |||||
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<byte[]>), (service)=> { | |||||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | ||||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | ||||
consumerConfig.Value.GroupId = "JT809.same.Test"; | consumerConfig.Value.GroupId = "JT809.same.Test"; | ||||
return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory); | return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory); | ||||
}); | }); | ||||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<JT809GpsPosition>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<JT809GpsPosition>), (service) => { | |||||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | ||||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | ||||
consumerConfig.Value.GroupId = "JT809.gps.Test"; | consumerConfig.Value.GroupId = "JT809.gps.Test"; | ||||
@@ -70,14 +70,14 @@ namespace JT809.KafkaService | |||||
{ | { | ||||
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | ||||
serviceDescriptors.Configure(action); | serviceDescriptors.Configure(action); | ||||
serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer<byte[]>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<byte[]>), (service) => { | |||||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | ||||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | ||||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | ||||
consumerConfig.Value.GroupId = "JT809.partition.same.Test"; | consumerConfig.Value.GroupId = "JT809.partition.same.Test"; | ||||
return new JT809_Same_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.same" } , loggerFactory); | return new JT809_Same_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.same" } , loggerFactory); | ||||
}); | }); | ||||
serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer<JT809GpsPosition>), (service) => { | |||||
serviceDescriptors.AddSingleton(typeof(IJT808ConsumerOfT<JT809GpsPosition>), (service) => { | |||||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | ||||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | ||||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | ||||
@@ -1,5 +1,6 @@ | |||||
using JT809.GrpcProtos; | using JT809.GrpcProtos; | ||||
using JT809.KafkaService; | using JT809.KafkaService; | ||||
using JT809.PubSub.Abstractions; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest | |||||
{ | { | ||||
public class ConsumerTestPartitionService | public class ConsumerTestPartitionService | ||||
{ | { | ||||
public JT809PartitionConsumer<byte[]> SameConsumer { get; } | |||||
public JT809PartitionConsumer<JT809GpsPosition> GpsConsumer { get; } | |||||
public ConsumerTestPartitionService(JT809PartitionConsumer<byte[]> sameConsumer, JT809PartitionConsumer<JT809GpsPosition> gpsConsumer) | |||||
public IJT808ConsumerOfT<byte[]> SameConsumer { get; } | |||||
public IJT808ConsumerOfT<JT809GpsPosition> GpsConsumer { get; } | |||||
public ConsumerTestPartitionService(IJT808ConsumerOfT<byte[]> sameConsumer, IJT808ConsumerOfT<JT809GpsPosition> gpsConsumer) | |||||
{ | { | ||||
SameConsumer = sameConsumer; | SameConsumer = sameConsumer; | ||||
GpsConsumer = gpsConsumer; | GpsConsumer = gpsConsumer; | ||||
@@ -1,5 +1,6 @@ | |||||
using JT809.GrpcProtos; | using JT809.GrpcProtos; | ||||
using JT809.KafkaService; | using JT809.KafkaService; | ||||
using JT809.PubSub.Abstractions; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest | |||||
{ | { | ||||
public class ConsumerTestService | public class ConsumerTestService | ||||
{ | { | ||||
public JT809Consumer<byte[]> SameConsumer { get; } | |||||
public JT809Consumer<JT809GpsPosition> GpsConsumer { get; } | |||||
public ConsumerTestService(JT809Consumer<byte[]> sameConsumer, JT809Consumer<JT809GpsPosition> gpsConsumer) | |||||
public IJT808ConsumerOfT<byte[]> SameConsumer { get; } | |||||
public IJT808ConsumerOfT<JT809GpsPosition> GpsConsumer { get; } | |||||
public ConsumerTestService(IJT808ConsumerOfT<byte[]> sameConsumer, IJT808ConsumerOfT<JT809GpsPosition> gpsConsumer) | |||||
{ | { | ||||
SameConsumer = sameConsumer; | SameConsumer = sameConsumer; | ||||
GpsConsumer = gpsConsumer; | GpsConsumer = gpsConsumer; | ||||
@@ -1,5 +1,6 @@ | |||||
using JT809.GrpcProtos; | using JT809.GrpcProtos; | ||||
using JT809.KafkaService; | using JT809.KafkaService; | ||||
using JT809.PubSub.Abstractions; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest | |||||
{ | { | ||||
public class ProducerTestPartitionService | public class ProducerTestPartitionService | ||||
{ | { | ||||
public JT809PartitionProducer<byte[]> SameProducer { get; } | |||||
public JT809PartitionProducer<JT809GpsPosition> GpsProducer { get; } | |||||
public ProducerTestPartitionService(JT809PartitionProducer<byte[]> sameProducer, JT809PartitionProducer<JT809GpsPosition> gpsProducer) | |||||
public IJT809ProducerOfT<byte[]> SameProducer { get; } | |||||
public IJT809ProducerOfT<JT809GpsPosition> GpsProducer { get; } | |||||
public ProducerTestPartitionService(IJT809ProducerOfT<byte[]> sameProducer, IJT809ProducerOfT<JT809GpsPosition> gpsProducer) | |||||
{ | { | ||||
SameProducer = sameProducer; | SameProducer = sameProducer; | ||||
GpsProducer = gpsProducer; | GpsProducer = gpsProducer; | ||||
@@ -1,5 +1,6 @@ | |||||
using JT809.GrpcProtos; | using JT809.GrpcProtos; | ||||
using JT809.KafkaService; | using JT809.KafkaService; | ||||
using JT809.PubSub.Abstractions; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Text; | using System.Text; | ||||
@@ -8,9 +9,9 @@ namespace JT809.KafkaServiceTest | |||||
{ | { | ||||
public class ProducerTestService | public class ProducerTestService | ||||
{ | { | ||||
public JT809Producer<byte[]> SameProducer { get; } | |||||
public JT809Producer<JT809GpsPosition> GpsProducer { get; } | |||||
public ProducerTestService(JT809Producer<byte[]> sameProducer, JT809Producer<JT809GpsPosition> gpsProducer) | |||||
public IJT809ProducerOfT<byte[]> SameProducer { get; } | |||||
public IJT809ProducerOfT<JT809GpsPosition> GpsProducer { get; } | |||||
public ProducerTestService(IJT809ProducerOfT<byte[]> sameProducer, IJT809ProducerOfT<JT809GpsPosition> gpsProducer) | |||||
{ | { | ||||
SameProducer = sameProducer; | SameProducer = sameProducer; | ||||
GpsProducer = gpsProducer; | GpsProducer = gpsProducer; | ||||
@@ -0,0 +1,13 @@ | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT809.Superior.Server.Configs | |||||
{ | |||||
public class JT809GpsOptions : IOptions<JT809GpsOptions> | |||||
{ | |||||
public string FromChannel { get; set; } | |||||
public JT809GpsOptions Value =>this; | |||||
} | |||||
} |
@@ -0,0 +1,24 @@ | |||||
<Project Sdk="Microsoft.NET.Sdk"> | |||||
<PropertyGroup> | |||||
<OutputType>Exe</OutputType> | |||||
<TargetFramework>netcoreapp2.2</TargetFramework> | |||||
</PropertyGroup> | |||||
<ItemGroup> | |||||
<ProjectReference Include="..\..\..\JT809.DotNetty.Core\JT809.DotNetty.Core.csproj" /> | |||||
<ProjectReference Include="..\JT809.KafkaService\JT809.KafkaService.csproj" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Hosting" Version="2.2.0" /> | |||||
<PackageReference Include="Microsoft.Extensions.Logging.Console" Version="2.2.0" /> | |||||
</ItemGroup> | |||||
<ItemGroup> | |||||
<None Update="appsettings.json"> | |||||
<CopyToOutputDirectory>Always</CopyToOutputDirectory> | |||||
</None> | |||||
</ItemGroup> | |||||
</Project> |
@@ -0,0 +1,62 @@ | |||||
using JT809.DotNetty.Core.Handlers; | |||||
using JT809.DotNetty.Core.Interfaces; | |||||
using JT809.DotNetty.Core.Metadata; | |||||
using JT809.GrpcProtos; | |||||
using JT809.Protocol; | |||||
using JT809.Protocol.SubMessageBody; | |||||
using JT809.PubSub.Abstractions; | |||||
using JT809.Superior.Server.Configs; | |||||
using Microsoft.Extensions.Logging; | |||||
using Microsoft.Extensions.Options; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Text; | |||||
namespace JT809.Superior.Server | |||||
{ | |||||
public sealed class JT809SuperiorMsgIdReceiveHandler : JT809SuperiorMsgIdReceiveHandlerBase | |||||
{ | |||||
private readonly IJT809ProducerOfT<JT809GpsPosition> producer; | |||||
private readonly JT809GpsOptions gpsOptions; | |||||
public JT809SuperiorMsgIdReceiveHandler( | |||||
IOptions<JT809GpsOptions>jt809GpsAccessor, | |||||
IJT809ProducerOfT<JT809GpsPosition> producer, | |||||
ILoggerFactory loggerFactory, | |||||
IJT809SubordinateLoginService jT809SubordinateLoginService, | |||||
IJT809VerifyCodeGenerator verifyCodeGenerator) | |||||
: base(loggerFactory, jT809SubordinateLoginService, verifyCodeGenerator) | |||||
{ | |||||
this.producer = producer; | |||||
this.gpsOptions = jt809GpsAccessor.Value; | |||||
} | |||||
public override JT809Response Msg0x1200_0x1202(JT809Request request) | |||||
{ | |||||
var exchangeMessageBodies = request.Package.Bodies as JT809ExchangeMessageBodies; | |||||
var gpsBodies = exchangeMessageBodies.SubBodies as JT809_0x1200_0x1202; | |||||
JT809GpsPosition gpsPosition = new JT809GpsPosition(); | |||||
gpsPosition.Vno = exchangeMessageBodies.VehicleNo; | |||||
gpsPosition.VColor = (byte)exchangeMessageBodies.VehicleColor; | |||||
gpsPosition.Alarm = (int)gpsBodies.VehiclePosition.Alarm; | |||||
gpsPosition.Altitude = gpsBodies.VehiclePosition.Altitude; | |||||
gpsPosition.Direction = gpsBodies.VehiclePosition.Direction; | |||||
gpsPosition.Encrypt = (byte)gpsBodies.VehiclePosition.Encrypt; | |||||
gpsPosition.State = (int)gpsBodies.VehiclePosition.State; | |||||
gpsPosition.Lat = gpsBodies.VehiclePosition.Lat; | |||||
gpsPosition.Lon= gpsBodies.VehiclePosition.Lon; | |||||
gpsPosition.Vec1 = gpsBodies.VehiclePosition.Vec1; | |||||
gpsPosition.Vec2 = gpsBodies.VehiclePosition.Vec2; | |||||
gpsPosition.Vec3 =(int)gpsBodies.VehiclePosition.Vec3; | |||||
gpsPosition.GpsTime = (new DateTime( | |||||
gpsBodies.VehiclePosition.Year, | |||||
gpsBodies.VehiclePosition.Month, | |||||
gpsBodies.VehiclePosition.Day, | |||||
gpsBodies.VehiclePosition.Hour, | |||||
gpsBodies.VehiclePosition.Minute, | |||||
gpsBodies.VehiclePosition.Second).ToUniversalTime().Ticks - 621355968000000000) / 10000000; | |||||
gpsPosition.FromChannel = gpsOptions.FromChannel; | |||||
producer.ProduceAsync($"{0x1202}", $"{exchangeMessageBodies.VehicleNo}{exchangeMessageBodies.VehicleColor}", gpsPosition); | |||||
return base.Msg0x1200_0x1202(request); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,50 @@ | |||||
using JT809.DotNetty.Core; | |||||
using JT809.DotNetty.Core.Configurations; | |||||
using JT809.DotNetty.Core.Handlers; | |||||
using JT809.Protocol.Configs; | |||||
using JT809.Superior.Server.Configs; | |||||
using Microsoft.Extensions.Configuration; | |||||
using Microsoft.Extensions.DependencyInjection; | |||||
using Microsoft.Extensions.DependencyInjection.Extensions; | |||||
using Microsoft.Extensions.Hosting; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Threading.Tasks; | |||||
using JT809.KafkaService; | |||||
namespace JT809.Superior.Server | |||||
{ | |||||
class Program | |||||
{ | |||||
static async Task Main(string[] args) | |||||
{ | |||||
//5B0000005A02000000A81200013414CE010000000000270FD4C14131323334353600000000000000000000000002120200000024000E0407E316130F07EF4D80017018400032003300000096002D002D0000000300000000E08F5D | |||||
var serverHostBuilder = new HostBuilder() | |||||
.ConfigureAppConfiguration((hostingContext, config) => | |||||
{ | |||||
config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||||
config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true); | |||||
}) | |||||
.ConfigureLogging((context, logging) => | |||||
{ | |||||
logging.AddConsole(); | |||||
logging.SetMinimumLevel(LogLevel.Trace); | |||||
}) | |||||
.ConfigureServices((hostContext, services) => | |||||
{ | |||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||||
services.AddJT809Core(hostContext.Configuration) | |||||
.AddJT809SuperiorPlatform(options:options => { | |||||
options.TcpPort = 808; | |||||
}); | |||||
services.Configure<JT809GpsOptions>(hostContext.Configuration.GetSection("JT809GpsOptions")); | |||||
services.AddJT809KafkaProducerPartitionsService(hostContext.Configuration,options=> options.Partition=10); | |||||
services.Replace(new ServiceDescriptor(typeof(JT809SuperiorMsgIdReceiveHandlerBase), typeof(JT809SuperiorMsgIdReceiveHandler), ServiceLifetime.Singleton)); | |||||
}); | |||||
await serverHostBuilder.RunConsoleAsync(); | |||||
} | |||||
} | |||||
} |
@@ -0,0 +1,24 @@ | |||||
{ | |||||
"Logging": { | |||||
"IncludeScopes": false, | |||||
"Debug": { | |||||
"LogLevel": { | |||||
"Default": "Trace" | |||||
} | |||||
}, | |||||
"Console": { | |||||
"LogLevel": { | |||||
"Default": "Trace" | |||||
} | |||||
} | |||||
}, | |||||
"JT809Configuration": { | |||||
"SubordinateClientEnable": false | |||||
}, | |||||
"KafkaProducerConfig": { | |||||
"BootstrapServers": "127.0.0.1:9092" | |||||
}, | |||||
"JT809GpsOptions": { | |||||
"FromChannel": "test" | |||||
} | |||||
} |
@@ -45,7 +45,7 @@ namespace JT809.DotNetty.Host.Test | |||||
services.AddSingleton<ILoggerFactory, LoggerFactory>(); | services.AddSingleton<ILoggerFactory, LoggerFactory>(); | ||||
services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | services.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | ||||
services.AddJT809Core(hostContext.Configuration) | services.AddJT809Core(hostContext.Configuration) | ||||
.AddJT809SuperiorPlatform(options=> { | |||||
.AddJT809SuperiorPlatform(options:options=> { | |||||
options.TcpPort = 839; | options.TcpPort = 839; | ||||
}); | }); | ||||
}); | }); | ||||
@@ -25,7 +25,11 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.GrpcProtos", "JT809.D | |||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}" | |||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.Superior.Server", "JT809.DotNetty.Simples\Superior\JT809.Superior.Server\JT809.Superior.Server.csproj", "{8620735D-FBD5-4832-882F-A2F607DC6861}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.GpsConsumer", "JT809.DotNetty.Simples\Superior\JT809.GpsConsumer\JT809.GpsConsumer.csproj", "{FBC06008-6F18-4CC3-B7C2-5B476317F92D}" | |||||
EndProject | EndProject | ||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
@@ -65,6 +69,14 @@ Global | |||||
{22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.Build.0 = Debug|Any CPU | {22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.ActiveCfg = Release|Any CPU | {22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.Build.0 = Release|Any CPU | {22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{8620735D-FBD5-4832-882F-A2F607DC6861}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{8620735D-FBD5-4832-882F-A2F607DC6861}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{8620735D-FBD5-4832-882F-A2F607DC6861}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{8620735D-FBD5-4832-882F-A2F607DC6861}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{FBC06008-6F18-4CC3-B7C2-5B476317F92D}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(SolutionProperties) = preSolution | GlobalSection(SolutionProperties) = preSolution | ||||
HideSolutionNode = FALSE | HideSolutionNode = FALSE | ||||
@@ -77,6 +89,8 @@ Global | |||||
{D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | {D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | ||||
{8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | {8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | ||||
{22F008D5-61F8-4889-80DB-91B37591322F} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | {22F008D5-61F8-4889-80DB-91B37591322F} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | ||||
{8620735D-FBD5-4832-882F-A2F607DC6861} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | |||||
{FBC06008-6F18-4CC3-B7C2-5B476317F92D} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4} | |||||
EndGlobalSection | EndGlobalSection | ||||
GlobalSection(ExtensibilityGlobals) = postSolution | GlobalSection(ExtensibilityGlobals) = postSolution | ||||
SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D} | SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D} | ||||