@@ -25,6 +25,7 @@ namespace JT808.DotNetty.Kafka | |||||
ILoggerFactory loggerFactory) | ILoggerFactory loggerFactory) | ||||
{ | { | ||||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | ||||
TopicName = consumerConfigAccessor.Value.TopicName; | |||||
logger = loggerFactory.CreateLogger("JT808MsgConsumer"); | logger = loggerFactory.CreateLogger("JT808MsgConsumer"); | ||||
} | } | ||||
@@ -25,14 +25,13 @@ namespace JT808.DotNetty.Kafka | |||||
producer.Dispose(); | producer.Dispose(); | ||||
} | } | ||||
public Task ProduceAsync(string terminalNo, byte[] data) | |||||
public async Task ProduceAsync(string terminalNo, byte[] data) | |||||
{ | { | ||||
producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
await producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
{ | { | ||||
Key = terminalNo, | Key = terminalNo, | ||||
Value = data | Value = data | ||||
}); | }); | ||||
return Task.CompletedTask; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -25,6 +25,7 @@ namespace JT808.DotNetty.Kafka | |||||
ILoggerFactory loggerFactory) | ILoggerFactory loggerFactory) | ||||
{ | { | ||||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | ||||
TopicName = consumerConfigAccessor.Value.TopicName; | |||||
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); | logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); | ||||
} | } | ||||
@@ -25,14 +25,13 @@ namespace JT808.DotNetty.Kafka | |||||
producer.Dispose(); | producer.Dispose(); | ||||
} | } | ||||
public Task ProduceAsync(string terminalNo, byte[] data) | |||||
public async Task ProduceAsync(string terminalNo, byte[] data) | |||||
{ | { | ||||
producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
await producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||||
{ | { | ||||
Key = terminalNo, | Key = terminalNo, | ||||
Value = data | Value = data | ||||
}); | }); | ||||
return Task.CompletedTask; | |||||
} | } | ||||
} | } | ||||
} | } |
@@ -7,6 +7,7 @@ | |||||
</PropertyGroup> | </PropertyGroup> | ||||
<ItemGroup> | <ItemGroup> | ||||
<PackageReference Include="Microsoft.Extensions.Logging" Version="2.2.0" /> | |||||
<PackageReference Include="xunit" Version="2.4.1" /> | <PackageReference Include="xunit" Version="2.4.1" /> | ||||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1"> | <PackageReference Include="xunit.runner.visualstudio" Version="2.4.1"> | ||||
<PrivateAssets>all</PrivateAssets> | <PrivateAssets>all</PrivateAssets> | ||||
@@ -0,0 +1,39 @@ | |||||
using JT808.DotNetty.Abstractions; | |||||
using Microsoft.Extensions.Logging; | |||||
using System; | |||||
using System.Collections.Generic; | |||||
using System.Diagnostics; | |||||
using System.Text; | |||||
using System.Threading; | |||||
using Xunit; | |||||
namespace JT808.DotNetty.Kafka.Test | |||||
{ | |||||
public class JT808MsgConsumerTest | |||||
{ | |||||
public const string BootstrapServers = "172.16.19.120:9092"; | |||||
//public const string BootstrapServers = "192.168.3.11:9092"; | |||||
public JT808ConsumerConfig JT808ConsumerConfig = new JT808ConsumerConfig | |||||
{ | |||||
GroupId="jt808.gps.test", | |||||
TopicName = "jt808test", | |||||
BootstrapServers = BootstrapServers | |||||
}; | |||||
[Fact] | |||||
public void Test1() | |||||
{ | |||||
using (IJT808MsgConsumer JT808MsgConsumer = new JT808MsgConsumer(JT808ConsumerConfig, new LoggerFactory())) | |||||
{ | |||||
JT808MsgConsumer.Subscribe(); | |||||
JT808MsgConsumer.OnMessage(item => | |||||
{ | |||||
Debug.WriteLine($"{item.TerminalNo}-{item.Data.Length}"); | |||||
}); | |||||
Thread.Sleep(30000); | |||||
JT808MsgConsumer.Unsubscribe(); | |||||
} | |||||
} | |||||
} | |||||
} |
@@ -1,6 +1,9 @@ | |||||
using JT808.DotNetty.Abstractions; | |||||
using Confluent.Kafka; | |||||
using Confluent.Kafka.Admin; | |||||
using JT808.DotNetty.Abstractions; | |||||
using System; | using System; | ||||
using System.Collections.Generic; | using System.Collections.Generic; | ||||
using System.Diagnostics; | |||||
using System.Text; | using System.Text; | ||||
using Xunit; | using Xunit; | ||||
@@ -8,12 +11,47 @@ namespace JT808.DotNetty.Kafka.Test | |||||
{ | { | ||||
public class JT808MsgProducerTest | public class JT808MsgProducerTest | ||||
{ | { | ||||
public const string BootstrapServers = "172.16.19.120:9092"; | |||||
//public const string BootstrapServers = "192.168.3.11:9092"; | |||||
public JT808ProducerConfig JT808ProducerConfig = new JT808ProducerConfig | |||||
{ | |||||
TopicName = "jt808test", | |||||
BootstrapServers = BootstrapServers | |||||
}; | |||||
public JT808MsgProducerTest() | |||||
{ | |||||
using (var adminClient = new AdminClientBuilder(new AdminClientConfig { BootstrapServers = BootstrapServers }).Build()) | |||||
{ | |||||
try | |||||
{ | |||||
adminClient.DeleteTopicsAsync(new List<string>() { JT808ProducerConfig.TopicName }).Wait(); | |||||
} | |||||
catch (CreateTopicsException e) | |||||
{ | |||||
Debug.WriteLine($"An error occured creating topic {e.Results[0].Topic}: {e.Results[0].Error.Reason}"); | |||||
} | |||||
} | |||||
} | |||||
[Fact] | [Fact] | ||||
public void Test1() | public void Test1() | ||||
{ | { | ||||
using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808ProducerConfig { BootstrapServers = "192.168.3.11:9092" })) | |||||
using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(JT808ProducerConfig)) | |||||
{ | |||||
jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); | |||||
} | |||||
} | |||||
[Fact] | |||||
public void Test2() | |||||
{ | |||||
using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(JT808ProducerConfig)) | |||||
{ | { | ||||
jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }); | |||||
jT808MsgProducer.ProduceAsync("123457", new byte[] { 0x7E, 0, 0x7E }).Wait(); | |||||
jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }).Wait(); | |||||
} | } | ||||
} | } | ||||
} | } | ||||
@@ -41,9 +41,7 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka", "JT8 | |||||
EndProject | EndProject | ||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test", "JT808.DotNetty.Tests\JT808.DotNetty.Kafka.Test\JT808.DotNetty.Kafka.Test.csproj", "{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}" | Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test", "JT808.DotNetty.Tests\JT808.DotNetty.Kafka.Test\JT808.DotNetty.Kafka.Test.csproj", "{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}" | ||||
EndProject | EndProject | ||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ", "JT808.DotNetty.RabbitMQ\JT808.DotNetty.RabbitMQ.csproj", "{BE8F9C65-2F03-4E8A-88D2-0F846D871473}" | |||||
EndProject | |||||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" | |||||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" | |||||
EndProject | EndProject | ||||
Global | Global | ||||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | GlobalSection(SolutionConfigurationPlatforms) = preSolution | ||||
@@ -119,10 +117,6 @@ Global | |||||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Debug|Any CPU.Build.0 = Debug|Any CPU | {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.ActiveCfg = Release|Any CPU | {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.Build.0 = Release|Any CPU | {50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.Build.0 = Release|Any CPU | ||||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.Build.0 = Release|Any CPU | |||||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | ||||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU | {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU | ||||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU | {D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU | ||||