diff --git a/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs b/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs
index c03bcd9..d53c421 100644
--- a/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs
+++ b/src/JT808.DotNetty.Kafka/JT808ClientKafkaExtensions.cs
@@ -39,6 +39,18 @@ namespace JT808.DotNetty.Kafka
///
///
///
+ ///
+ /// GetSection("JT808MsgReplyConsumerConfig")
+ ///
+ public static IJT808ClientBuilder AddMsgReplyConsumer(this IJT808ClientBuilder jT808ClientBuilder, IConfiguration configuration)
+ {
+ jT808ClientBuilder.JT808Builder.Services.Configure(configuration.GetSection("JT808MsgReplyConsumerConfig"));
+ jT808ClientBuilder.JT808Builder.Services.Replace(new ServiceDescriptor(typeof(IJT808MsgReplyConsumer), typeof(JT808MsgReplyConsumer), ServiceLifetime.Singleton));
+ return jT808ClientBuilder;
+ }
+ ///
+ ///
+ ///
///
/// GetSection("JT808SessionConsumerConfig")
///
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808.DotNetty.MsgIdHandler.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808.DotNetty.MsgIdHandler.Test.csproj
index 21dff5c..d2f9013 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808.DotNetty.MsgIdHandler.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808.DotNetty.MsgIdHandler.Test.csproj
@@ -5,4 +5,20 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808DotNettyMsgIdHandlerDefaultImpl.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808DotNettyMsgIdHandlerDefaultImpl.cs
new file mode 100644
index 0000000..cfbf95e
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/JT808DotNettyMsgIdHandlerDefaultImpl.cs
@@ -0,0 +1,42 @@
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using JT808.Protocol.Extensions;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.DotNetty.MsgIdHandler.Test
+{
+ public class JT808DotNettyMsgIdHandlerDefaultImpl : IJT808DotNettyMsgIdHandler
+ {
+ public readonly ILogger logger;
+ public JT808DotNettyMsgIdHandlerDefaultImpl(ILoggerFactory loggerFactory,
+ IServiceProvider serviceProvider) {
+ logger = loggerFactory.CreateLogger();
+ Task.Run(()=> {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808MsgProducerConfig
+ {
+ BootstrapServers = "127.0.0.1:9092",
+ TopicName = "JT808Msg"
+ }))
+ {
+ jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait();
+ }
+ }
+ });
+ }
+
+ public void Processor((string TerminalNo, byte[] Data) parameter)
+ {
+ logger.LogDebug($"{parameter.TerminalNo}:{parameter.Data.ToHexString()}");
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/Program.cs
index fd94ecb..26587d8 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgIdHandler.Test/Program.cs
@@ -1,12 +1,38 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.MsgIdHandler.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var serverHostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostingContext,config) => {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{ hostingContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging(configLogging => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddMsgConsumer(hostContext.Configuration)
+ .AddJT808MsgIdHandler();
+ });
+ await serverHostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808.DotNetty.MsgLogging.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808.DotNetty.MsgLogging.Test.csproj
index 21dff5c..69eb37f 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808.DotNetty.MsgLogging.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808.DotNetty.MsgLogging.Test.csproj
@@ -5,4 +5,21 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808MsgLoggingImpl.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808MsgLoggingImpl.cs
new file mode 100644
index 0000000..6c88628
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/JT808MsgLoggingImpl.cs
@@ -0,0 +1,49 @@
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol.Extensions;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.DotNetty.MsgLogging.Test
+{
+ public class JT808MsgLoggingImpl : IJT808MsgLogging
+ {
+ public readonly ILogger logger;
+ public JT808MsgLoggingImpl(ILoggerFactory loggerFactory) {
+ logger = loggerFactory.CreateLogger();
+ Task.Run(() => {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808MsgProducerConfig
+ {
+ BootstrapServers = "127.0.0.1:9092",
+ TopicName = "JT808Msg"
+ }))
+ {
+ jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0,0,0,0, 0x7E }).Wait();
+ }
+
+ JT808MsgReplyProducerConfig JT808MsgProducerConfig = new JT808MsgReplyProducerConfig
+ {
+ TopicName = "JT808MsgReply",
+ BootstrapServers = "127.0.0.1:9092",
+ };
+ using (IJT808MsgReplyProducer jT808MsgProducer = new JT808MsgReplyProducer(JT808MsgProducerConfig))
+ {
+ jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E,1,1,1,1, 0x7E }).Wait();
+ }
+ }
+ });
+ }
+
+ public void Processor((string TerminalNo, byte[] Data) parameter, JT808MsgLoggingType jT808MsgLoggingType)
+ {
+ logger.LogDebug($"{parameter.TerminalNo}:{parameter.Data.ToHexString()},方向:{jT808MsgLoggingType.ToString()}");
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/Program.cs
index 7b71d69..c594bee 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.MsgLogging.Test/Program.cs
@@ -1,12 +1,42 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Logging.Console;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.MsgLogging.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var hostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostContext,config)=> {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging((hostContext, configLogging) => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddMsgConsumer(hostContext.Configuration)
+ .AddMsgReplyConsumer(hostContext.Configuration)
+ .AddJT808MsgLogging();
+ })
+ ;
+
+ await hostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808.DotNetty.ReplyMessage.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808.DotNetty.ReplyMessage.Test.csproj
index 21dff5c..8135ec6 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808.DotNetty.ReplyMessage.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808.DotNetty.ReplyMessage.Test.csproj
@@ -5,4 +5,21 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808DotNettyReplyMessageServiceInherited.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808DotNettyReplyMessageServiceInherited.cs
new file mode 100644
index 0000000..0898fd4
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/JT808DotNettyReplyMessageServiceInherited.cs
@@ -0,0 +1,46 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using JT808.Protocol.Extensions;
+using Microsoft.Extensions.Logging;
+
+namespace JT808.DotNetty.ReplyMessage.Test
+{
+ public class JT808DotNettyReplyMessageServiceInherited : JT808DotNettyReplyMessageService
+ {
+ public readonly ILogger logger;
+
+ public JT808DotNettyReplyMessageServiceInherited(IJT808Config jT808Config,
+ IJT808MsgReplyProducer jT808MsgReplyProducer,
+ ILoggerFactory loggerFactory)
+ : base(jT808Config, jT808MsgReplyProducer)
+ {
+ logger = loggerFactory.CreateLogger();
+ Task.Run(() => {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808MsgProducerConfig
+ {
+ BootstrapServers = "127.0.0.1:9092",
+ TopicName = "JT808Msg"
+ }))
+ {
+ jT808MsgProducer.ProduceAsync("011111111111", "7E02000032011111111111012E00000000000C00000160E42506C30C82002C00000000180914142057010400001DC003020000250400000000300115310100977E".ToHexBytes()).Wait();
+ }
+ }
+ });
+ }
+
+ public override void Processor((string TerminalNo, byte[] Data) parameter)
+ {
+ logger.LogDebug($"{parameter.TerminalNo}:{parameter.Data.ToHexString()}");
+ base.Processor(parameter);
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/Program.cs
index 567bf0f..5b7510f 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.ReplyMessage.Test/Program.cs
@@ -1,12 +1,41 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.ReplyMessage.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var hostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostContext, config) => {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging((hostContext, configLogging) => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddMsgConsumer(hostContext.Configuration)
+ .AddMsgReplyProducer(hostContext.Configuration)
+ .AddInprocJT808ReplyMessage();
+ })
+ ;
+
+ await hostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808.DotNetty.SessionNotice.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808.DotNetty.SessionNotice.Test.csproj
index 21dff5c..cae489b 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808.DotNetty.SessionNotice.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808.DotNetty.SessionNotice.Test.csproj
@@ -5,4 +5,21 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808DotNettySessionNoticeServiceInherited.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808DotNettySessionNoticeServiceInherited.cs
new file mode 100644
index 0000000..949afcc
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/JT808DotNettySessionNoticeServiceInherited.cs
@@ -0,0 +1,39 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using Microsoft.Extensions.Logging;
+
+namespace JT808.DotNetty.SessionNotice.Test
+{
+ public class JT808DotNettySessionNoticeServiceInherited : JT808DotNettySessionNoticeService
+ {
+ public JT808DotNettySessionNoticeServiceInherited(ILoggerFactory loggerFactory) : base(loggerFactory)
+ {
+ Task.Run(()=> {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ JT808SessionProducerConfig JT808ProducerConfig = new JT808SessionProducerConfig
+ {
+ TopicName = "JT808Session",
+ BootstrapServers = "127.0.0.1:9092"
+ };
+ using (IJT808SessionProducer jT808MsgProducer = new JT808SessionProducer(JT808ProducerConfig))
+ {
+ jT808MsgProducer.ProduceAsync("online", "123456").Wait();
+ jT808MsgProducer.ProduceAsync("offline", "123457").Wait();
+ }
+ }
+ });
+ }
+
+ public override void Processor((string Notice, string TerminalNo) parameter)
+ {
+ base.Processor(parameter);
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/Program.cs
index 60385e5..7aa840e 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.SessionNotice.Test/Program.cs
@@ -1,12 +1,40 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.SessionNotice.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var hostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostContext, config) => {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging((hostContext, configLogging) => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddSessionConsumer(hostContext.Configuration)
+ .AddInprocJT808SessionNotice();
+ })
+ ;
+
+ await hostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808.DotNetty.Traffic.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808.DotNetty.Traffic.Test.csproj
index 21dff5c..648d005 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808.DotNetty.Traffic.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808.DotNetty.Traffic.Test.csproj
@@ -5,4 +5,21 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808DotNettyTrafficServiceTest.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808DotNettyTrafficServiceTest.cs
new file mode 100644
index 0000000..e33f9e8
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/JT808DotNettyTrafficServiceTest.cs
@@ -0,0 +1,39 @@
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol.Extensions;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.DotNetty.Traffic.Test
+{
+ public class JT808DotNettyTrafficServiceTest
+ {
+ private readonly CSRedis.CSRedisClient redisClien;
+ public readonly ILogger logger;
+ public JT808DotNettyTrafficServiceTest(ILoggerFactory loggerFactory) {
+ redisClien = new CSRedis.CSRedisClient("127.0.0.1:6379,password=smallchi");
+ RedisHelper.Initialization(redisClien);
+ logger = loggerFactory.CreateLogger();
+ Task.Run(() => {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808MsgProducerConfig
+ {
+ BootstrapServers = "127.0.0.1:9092",
+ TopicName = "JT808Msg"
+ }))
+ {
+ jT808MsgProducer.ProduceAsync("011111111111", "7E02000032011111111111012E00000000000C00000160E42506C30C82002C00000000180914142057010400001DC003020000250400000000300115310100977E".ToHexBytes()).Wait();
+ }
+ var length= RedisHelper.HGet("011111111111", DateTime.Now.ToString("yyyyMMdd"));
+ logger.LogDebug($"{011111111111}:{length}");
+ }
+ });
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/Program.cs
index 89805f0..b86a523 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Traffic.Test/Program.cs
@@ -1,12 +1,43 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.Traffic.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var hostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostContext, config) => {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging((hostContext, configLogging) => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddSingleton();
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddMsgConsumer(hostContext.Configuration)
+ .AddInprocJT808Traffic();
+
+ services.BuildServiceProvider().GetRequiredService();
+ })
+ ;
+
+ await hostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808.DotNetty.Transmit.Test.csproj b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808.DotNetty.Transmit.Test.csproj
index 21dff5c..450f3dc 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808.DotNetty.Transmit.Test.csproj
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808.DotNetty.Transmit.Test.csproj
@@ -5,4 +5,21 @@
netcoreapp2.2
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Always
+
+
+
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808DotNettyTransmitServiceTest.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808DotNettyTransmitServiceTest.cs
new file mode 100644
index 0000000..bee4288
--- /dev/null
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/JT808DotNettyTransmitServiceTest.cs
@@ -0,0 +1,34 @@
+using JT808.DotNetty.Abstractions;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol.Extensions;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Collections.Generic;
+using System.Text;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT808.DotNetty.Transmit.Test
+{
+ public class JT808DotNettyTransmitServiceTest
+ {
+ public readonly ILogger logger;
+ public JT808DotNettyTransmitServiceTest(ILoggerFactory loggerFactory) {
+ logger = loggerFactory.CreateLogger();
+ Task.Run(() => {
+ while (true)
+ {
+ Thread.Sleep(5000);
+ using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808MsgProducerConfig
+ {
+ BootstrapServers = "127.0.0.1:9092",
+ TopicName = "JT808Msg"
+ }))
+ {
+ jT808MsgProducer.ProduceAsync("011111111111", "7E02000032011111111111012E00000000000C00000160E42506C30C82002C00000000180914142057010400001DC003020000250400000000300115310100977E".ToHexBytes()).Wait();
+ }
+ }
+ });
+ }
+ }
+}
diff --git a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/Program.cs b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/Program.cs
index bc612c0..7f55500 100644
--- a/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/Program.cs
+++ b/src/JT808.DotNetty.Services.Tests/JT808.DotNetty.Transmit.Test/Program.cs
@@ -1,12 +1,42 @@
-using System;
+using JT808.DotNetty.Kafka;
+using JT808.Protocol;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Hosting;
+using Microsoft.Extensions.Logging;
+using System;
+using System.Threading.Tasks;
namespace JT808.DotNetty.Transmit.Test
{
class Program
{
- static void Main(string[] args)
+ async static Task Main(string[] args)
{
- Console.WriteLine("Hello World!");
+ var hostBuilder = new HostBuilder()
+ .UseEnvironment(args[0].Split('=')[1])
+ .ConfigureAppConfiguration((hostContext, config) => {
+ config.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ config.AddJsonFile("appsettings.json", optional: false, reloadOnChange: true)
+ .AddJsonFile($"appsettings.{hostContext.HostingEnvironment.EnvironmentName}.json", optional: true, reloadOnChange: true);
+ })
+ .ConfigureLogging((hostContext, configLogging) => {
+ configLogging.AddConsole();
+ configLogging.SetMinimumLevel(LogLevel.Trace);
+ })
+ .ConfigureServices((hostContext, services) => {
+ services.AddSingleton();
+ services.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ services.AddSingleton();
+ services.AddJT808Configure()
+ .AddJT808ClientKafka()
+ .AddMsgConsumer(hostContext.Configuration)
+ .AddInprocJT808Transmit(hostContext.Configuration);
+ services.BuildServiceProvider().GetRequiredService();
+ })
+ ;
+
+ await hostBuilder.RunConsoleAsync();
}
}
}
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808.DotNetty.MsgIdHandler.csproj b/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808.DotNetty.MsgIdHandler.csproj
index f1ec3f6..dbc9cf7 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808.DotNetty.MsgIdHandler.csproj
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808.DotNetty.MsgIdHandler.csproj
@@ -16,6 +16,7 @@
+
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/IJT808DotNettyMsgIdHandlerExtensions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808DotNettyMsgIdHandlerExtensions.cs
similarity index 92%
rename from src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/IJT808DotNettyMsgIdHandlerExtensions.cs
rename to src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808DotNettyMsgIdHandlerExtensions.cs
index 2802021..74fb417 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/IJT808DotNettyMsgIdHandlerExtensions.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.MsgIdHandler/JT808DotNettyMsgIdHandlerExtensions.cs
@@ -6,7 +6,7 @@ using System.Text;
namespace JT808.DotNetty.MsgIdHandler
{
- public static class IJT808DotNettyMsgIdHandlerExtensions
+ public static class JT808DotNettyMsgIdHandlerExtensions
{
public static IJT808ClientBuilder AddJT808MsgIdHandler(this IJT808ClientBuilder jT808ClientBuilder)
where TJT808DotNettyMsgIdHandler: IJT808DotNettyMsgIdHandler
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.ReplyMessage/JT808DotNettyReplyMessageExtensions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.ReplyMessage/JT808DotNettyReplyMessageExtensions.cs
index 92f7208..156d64a 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.ReplyMessage/JT808DotNettyReplyMessageExtensions.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.ReplyMessage/JT808DotNettyReplyMessageExtensions.cs
@@ -28,7 +28,7 @@ namespace JT808.DotNetty.ReplyMessage
public static IJT808ClientBuilder AddInprocJT808ReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageService : JT808DotNettyReplyMessageService
{
- jT808ClientBuilder.JT808Builder.Services.AddSingleton();
+ jT808ClientBuilder.JT808Builder.Services.AddSingleton();
jT808ClientBuilder.JT808Builder.Services.AddHostedService();
return jT808ClientBuilder;
}
@@ -41,7 +41,7 @@ namespace JT808.DotNetty.ReplyMessage
public static IJT808ClientBuilder AddShareJT808ReplyMessage(this IJT808ClientBuilder jT808ClientBuilder)
where TReplyMessageService : JT808DotNettyReplyMessageService
{
- jT808ClientBuilder.JT808Builder.Services.AddSingleton();
+ jT808ClientBuilder.JT808Builder.Services.AddSingleton();
return jT808ClientBuilder;
}
///
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.SessionNotice/JT808DotNettySessionNoticeExtensions.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.SessionNotice/JT808DotNettySessionNoticeExtensions.cs
index 8f9ae0b..df7bce8 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.SessionNotice/JT808DotNettySessionNoticeExtensions.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.SessionNotice/JT808DotNettySessionNoticeExtensions.cs
@@ -29,7 +29,7 @@ namespace JT808.DotNetty.SessionNotice
public static IJT808ClientBuilder AddInprocJT808SessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
where TSessionNoticeService : JT808DotNettySessionNoticeService
{
- jT808ClientBuilder.JT808Builder.Services.AddSingleton();
+ jT808ClientBuilder.JT808Builder.Services.AddSingleton();
jT808ClientBuilder.JT808Builder.Services.AddHostedService();
return jT808ClientBuilder;
}
@@ -43,7 +43,7 @@ namespace JT808.DotNetty.SessionNotice
public static IJT808ClientBuilder AddShareJT808SessionNotice(this IJT808ClientBuilder jT808ClientBuilder)
where TSessionNoticeService : JT808DotNettySessionNoticeService
{
- jT808ClientBuilder.JT808Builder.Services.AddSingleton();
+ jT808ClientBuilder.JT808Builder.Services.AddSingleton();
return jT808ClientBuilder;
}
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Traffic/JT808DotNettyTrafficServiceHostedService.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Traffic/JT808DotNettyTrafficServiceHostedService.cs
index bb83417..9de189f 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.Traffic/JT808DotNettyTrafficServiceHostedService.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Traffic/JT808DotNettyTrafficServiceHostedService.cs
@@ -2,6 +2,7 @@
using JT808.DotNetty.Abstractions;
using Microsoft.Extensions.Hosting;
using System.Threading;
+using JT808.Protocol.Extensions;
namespace JT808.DotNetty.Traffic
{
@@ -22,6 +23,7 @@ namespace JT808.DotNetty.Traffic
{
jT808MsgConsumer.Subscribe();
jT808MsgConsumer.OnMessage((item)=> {
+ string str = item.Data.ToHexString();
jT808DotNettyTrafficService.Processor(item.TerminalNo, item.Data.Length);
});
return Task.CompletedTask;
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs
index b050198..5b0f299 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitHostedService.cs
@@ -1,22 +1,5 @@
-using DotNetty.Buffers;
-using DotNetty.Transport.Bootstrapping;
-using DotNetty.Transport.Channels;
-using DotNetty.Transport.Channels.Sockets;
-using System;
-using System.Collections.Generic;
-using System.Net;
-using System.Text;
-using System.Threading.Tasks;
-using DotNetty.Handlers.Logging;
-using Polly;
-using Microsoft.Extensions.Logging;
-using Microsoft.Extensions.Options;
-using JT808.DotNetty.Transmit.Configs;
-using System.Linq;
-using JT808.DotNetty.Transmit.Handlers;
+using System.Threading.Tasks;
using JT808.DotNetty.Abstractions;
-using JT808.Protocol;
-using JT808.Protocol.Interfaces;
using Microsoft.Extensions.Hosting;
using System.Threading;
@@ -37,7 +20,7 @@ namespace JT808.DotNetty.Transmit
public Task StartAsync(CancellationToken cancellationToken)
{
jT808MsgConsumer.Subscribe();
- jT808MsgConsumer.OnMessage(jT808DotNettyTransmitService.SendAsync);
+ jT808MsgConsumer.OnMessage(jT808DotNettyTransmitService.Send);
return Task.CompletedTask;
}
diff --git a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs
index 260a7c2..e9f6ce1 100644
--- a/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs
+++ b/src/JT808.DotNetty.Services/JT808.DotNetty.Transmit/JT808DotNettyTransmitService.cs
@@ -28,7 +28,7 @@ namespace JT808.DotNetty.Transmit
this.optionsMonitor = optionsMonitor;
InitialDispatcherClient();
}
- public void SendAsync((string TerminalNo, byte[] Data) parameter)
+ public void Send((string TerminalNo, byte[] Data) parameter)
{
if (optionsMonitor.CurrentValue.DataTransfer != null)
{