diff --git a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs index 76d84c2..5893384 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgConsumer.cs @@ -25,6 +25,7 @@ namespace JT808.DotNetty.Kafka ILoggerFactory loggerFactory) { consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; logger = loggerFactory.CreateLogger("JT808MsgConsumer"); } diff --git a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs index cae0138..3dfa14b 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgProducer.cs @@ -25,14 +25,13 @@ namespace JT808.DotNetty.Kafka producer.Dispose(); } - public Task ProduceAsync(string terminalNo, byte[] data) + public async Task ProduceAsync(string terminalNo, byte[] data) { - producer.ProduceAsync(TopicName, new Message + await producer.ProduceAsync(TopicName, new Message { Key = terminalNo, Value = data }); - return Task.CompletedTask; } } } diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs index 3903af7..0aafa19 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyConsumer.cs @@ -25,6 +25,7 @@ namespace JT808.DotNetty.Kafka ILoggerFactory loggerFactory) { consumer = new ConsumerBuilder(consumerConfigAccessor.Value).Build(); + TopicName = consumerConfigAccessor.Value.TopicName; logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); } diff --git a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs index bc49e93..138a0b5 100644 --- a/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs +++ b/src/JT808.DotNetty.Kafka/JT808MsgReplyProducer.cs @@ -25,14 +25,13 @@ namespace JT808.DotNetty.Kafka producer.Dispose(); } - public Task ProduceAsync(string terminalNo, byte[] data) + public async Task ProduceAsync(string terminalNo, byte[] data) { - producer.ProduceAsync(TopicName, new Message + await producer.ProduceAsync(TopicName, new Message { Key = terminalNo, Value = data }); - return Task.CompletedTask; } } } diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj index db1c6cf..916f895 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808.DotNetty.Kafka.Test.csproj @@ -7,6 +7,7 @@ + all diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs new file mode 100644 index 0000000..c0fe439 --- /dev/null +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgConsumerTest.cs @@ -0,0 +1,39 @@ +using JT808.DotNetty.Abstractions; +using Microsoft.Extensions.Logging; +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Text; +using System.Threading; +using Xunit; + +namespace JT808.DotNetty.Kafka.Test +{ + public class JT808MsgConsumerTest + { + public const string BootstrapServers = "172.16.19.120:9092"; + + //public const string BootstrapServers = "192.168.3.11:9092"; + + public JT808ConsumerConfig JT808ConsumerConfig = new JT808ConsumerConfig + { + GroupId="jt808.gps.test", + TopicName = "jt808test", + BootstrapServers = BootstrapServers + }; + [Fact] + public void Test1() + { + using (IJT808MsgConsumer JT808MsgConsumer = new JT808MsgConsumer(JT808ConsumerConfig, new LoggerFactory())) + { + JT808MsgConsumer.Subscribe(); + JT808MsgConsumer.OnMessage(item => + { + Debug.WriteLine($"{item.TerminalNo}-{item.Data.Length}"); + }); + Thread.Sleep(30000); + JT808MsgConsumer.Unsubscribe(); + } + } + } +} diff --git a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs index bb8d49f..4392568 100644 --- a/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs +++ b/src/JT808.DotNetty.Tests/JT808.DotNetty.Kafka.Test/JT808MsgProducerTest.cs @@ -1,6 +1,9 @@ -using JT808.DotNetty.Abstractions; +using Confluent.Kafka; +using Confluent.Kafka.Admin; +using JT808.DotNetty.Abstractions; using System; using System.Collections.Generic; +using System.Diagnostics; using System.Text; using Xunit; @@ -8,12 +11,47 @@ namespace JT808.DotNetty.Kafka.Test { public class JT808MsgProducerTest { + public const string BootstrapServers = "172.16.19.120:9092"; + + //public const string BootstrapServers = "192.168.3.11:9092"; + + public JT808ProducerConfig JT808ProducerConfig = new JT808ProducerConfig + { + TopicName = "jt808test", + BootstrapServers = BootstrapServers + }; + + public JT808MsgProducerTest() + { + using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = BootstrapServers }).Build()) + { + try + { + adminClient.DeleteTopicsAsync(new List() { JT808ProducerConfig.TopicName }).Wait(); + } + catch (CreateTopicsException e) + { + Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); + } + } + } + [Fact] public void Test1() { - using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808ProducerConfig { BootstrapServers = "192.168.3.11:9092" })) + using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(JT808ProducerConfig)) + { + jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); + } + } + + [Fact] + public void Test2() + { + using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(JT808ProducerConfig)) { - jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }); + jT808MsgProducer.ProduceAsync("123457", new byte[] { 0x7E, 0, 0x7E }).Wait(); + jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); } } } diff --git a/src/JT808.DotNetty.sln b/src/JT808.DotNetty.sln index 7d3576f..c50bbdc 100644 --- a/src/JT808.DotNetty.sln +++ b/src/JT808.DotNetty.sln @@ -41,9 +41,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka", "JT8 EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test", "JT808.DotNetty.Tests\JT808.DotNetty.Kafka.Test\JT808.DotNetty.Kafka.Test.csproj", "{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}" EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ", "JT808.DotNetty.RabbitMQ\JT808.DotNetty.RabbitMQ.csproj", "{BE8F9C65-2F03-4E8A-88D2-0F846D871473}" -EndProject -Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution @@ -119,10 +117,6 @@ Global {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Debug|Any CPU.Build.0 = Debug|Any CPU {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.ActiveCfg = Release|Any CPU {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.Build.0 = Release|Any CPU - {BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU - {BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.Build.0 = Debug|Any CPU - {BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.ActiveCfg = Release|Any CPU - {BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.Build.0 = Release|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU