@@ -1,4 +1,4 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<Import Project="..\SharedProperties.props" /> | |||
<PropertyGroup> | |||
<PackageId>JT808.DotNetty.Kafka</PackageId> | |||
@@ -6,5 +6,12 @@ | |||
<Description>基于Kafka的JT808消息发布与订阅</Description> | |||
<PackageReleaseNotes>基于Kafka的JT808消息发布与订阅</PackageReleaseNotes> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Confluent.Kafka" Version="1.1.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,15 @@ | |||
using Confluent.Kafka; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808ConsumerConfig: ConsumerConfig, IOptions<JT808ConsumerConfig> | |||
{ | |||
public string TopicName { get; set; } | |||
public JT808ConsumerConfig Value => this; | |||
} | |||
} |
@@ -0,0 +1,83 @@ | |||
using Confluent.Kafka; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808MsgConsumer : IJT808MsgConsumer | |||
{ | |||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||
private readonly IConsumer<string, byte[]> consumer; | |||
private readonly ILogger logger; | |||
public string TopicName { get; } | |||
public JT808MsgConsumer( | |||
IOptions<JT808ConsumerConfig> consumerConfigAccessor, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | |||
logger = loggerFactory.CreateLogger("JT808MsgConsumer"); | |||
} | |||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||
{ | |||
Task.Run(() => | |||
{ | |||
while (!Cts.IsCancellationRequested) | |||
{ | |||
try | |||
{ | |||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||
var data = consumer.Consume(Cts.Token); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||
} | |||
callback((data.Key, data.Value)); | |||
} | |||
catch (ConsumeException ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
catch (OperationCanceledException ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
} | |||
}, Cts.Token); | |||
} | |||
public void Subscribe() | |||
{ | |||
consumer.Subscribe(TopicName); | |||
} | |||
public void Unsubscribe() | |||
{ | |||
consumer.Unsubscribe(); | |||
} | |||
public void Dispose() | |||
{ | |||
consumer.Close(); | |||
consumer.Dispose(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,38 @@ | |||
using Confluent.Kafka; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808MsgProducer : IJT808MsgProducer | |||
{ | |||
public string TopicName { get; } | |||
private readonly IProducer<string, byte[]> producer; | |||
public JT808MsgProducer( | |||
IOptions<JT808ProducerConfig> producerConfigAccessor) | |||
{ | |||
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); | |||
TopicName = producerConfigAccessor.Value.TopicName; | |||
} | |||
public void Dispose() | |||
{ | |||
producer.Dispose(); | |||
} | |||
public Task ProduceAsync(string terminalNo, byte[] data) | |||
{ | |||
producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||
{ | |||
Key = terminalNo, | |||
Value = data | |||
}); | |||
return Task.CompletedTask; | |||
} | |||
} | |||
} |
@@ -0,0 +1,83 @@ | |||
using Confluent.Kafka; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808MsgReplyConsumer : IJT808MsgConsumer | |||
{ | |||
public CancellationTokenSource Cts => new CancellationTokenSource(); | |||
private readonly IConsumer<string, byte[]> consumer; | |||
private readonly ILogger logger; | |||
public string TopicName { get; } | |||
public JT808MsgReplyConsumer( | |||
IOptions<JT808ConsumerConfig> consumerConfigAccessor, | |||
ILoggerFactory loggerFactory) | |||
{ | |||
consumer = new ConsumerBuilder<string, byte[]>(consumerConfigAccessor.Value).Build(); | |||
logger = loggerFactory.CreateLogger("JT808MsgReplyConsumer"); | |||
} | |||
public void OnMessage(Action<(string TerminalNo, byte[] Data)> callback) | |||
{ | |||
Task.Run(() => | |||
{ | |||
while (!Cts.IsCancellationRequested) | |||
{ | |||
try | |||
{ | |||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||
var data = consumer.Consume(Cts.Token); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} TopicPartitionOffset:{data.TopicPartitionOffset}"); | |||
} | |||
callback((data.Key, data.Value)); | |||
} | |||
catch (ConsumeException ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
catch (OperationCanceledException ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
catch (Exception ex) | |||
{ | |||
logger.LogError(ex, TopicName); | |||
Thread.Sleep(1000); | |||
} | |||
} | |||
}, Cts.Token); | |||
} | |||
public void Subscribe() | |||
{ | |||
consumer.Subscribe(TopicName); | |||
} | |||
public void Unsubscribe() | |||
{ | |||
consumer.Unsubscribe(); | |||
} | |||
public void Dispose() | |||
{ | |||
consumer.Close(); | |||
consumer.Dispose(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,38 @@ | |||
using Confluent.Kafka; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808MsgReplyProducer : IJT808MsgProducer | |||
{ | |||
public string TopicName { get;} | |||
private IProducer<string, byte[]> producer; | |||
public JT808MsgReplyProducer( | |||
IOptions<JT808ProducerConfig> producerConfigAccessor) | |||
{ | |||
producer = new ProducerBuilder<string, byte[]>(producerConfigAccessor.Value).Build(); | |||
TopicName = producerConfigAccessor.Value.TopicName; | |||
} | |||
public void Dispose() | |||
{ | |||
producer.Dispose(); | |||
} | |||
public Task ProduceAsync(string terminalNo, byte[] data) | |||
{ | |||
producer.ProduceAsync(TopicName, new Message<string, byte[]> | |||
{ | |||
Key = terminalNo, | |||
Value = data | |||
}); | |||
return Task.CompletedTask; | |||
} | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
using Confluent.Kafka; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.Kafka | |||
{ | |||
public class JT808ProducerConfig : ProducerConfig,IOptions<JT808ProducerConfig> | |||
{ | |||
public string TopicName { get; set; } | |||
public JT808ProducerConfig Value => this; | |||
} | |||
} |
@@ -7,5 +7,16 @@ | |||
<Description>基于RabbitMQ的JT808消息发布与订阅</Description> | |||
<PackageReleaseNotes>基于RabbitMQ的JT808消息发布与订阅</PackageReleaseNotes> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="EasyNetQ" Version="3.6.0" /> | |||
<PackageReference Include="RabbitMQ.Client" Version="5.1.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Logging.Abstractions" Version="2.2.0" /> | |||
<PackageReference Include="Microsoft.Extensions.Options" Version="2.2.0" /> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\JT808.DotNetty.Abstractions\JT808.DotNetty.Abstractions.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,35 @@ | |||
using EasyNetQ; | |||
using EasyNetQ.Topology; | |||
using JT808.DotNetty.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using System.Threading.Tasks; | |||
namespace JT808.DotNetty.RabbitMQ | |||
{ | |||
public class JT808MsgProducer : IJT808MsgProducer | |||
{ | |||
public string TopicName { get; } | |||
private readonly IBus bus; | |||
public JT808MsgProducer(IOptions<JT808ProducerConfig> producerConfigAccessor) | |||
{ | |||
bus = RabbitHutch.CreateBus(producerConfigAccessor.Value.ConnectionString); | |||
TopicName = producerConfigAccessor.Value.TopicName; | |||
} | |||
public void Dispose() | |||
{ | |||
bus.Dispose(); | |||
} | |||
public Task ProduceAsync(string terminalNo, byte[] data) | |||
{ | |||
var exchange = bus.Advanced.ExchangeDeclare(TopicName, ExchangeType.Fanout); | |||
bus.Advanced.Publish(exchange, "", false, new Message<byte[]>(data)); | |||
return Task.CompletedTask; | |||
} | |||
} | |||
} |
@@ -0,0 +1,14 @@ | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT808.DotNetty.RabbitMQ | |||
{ | |||
public class JT808ProducerConfig : IOptions<JT808ProducerConfig> | |||
{ | |||
public string TopicName { get; set; } | |||
public string ConnectionString { get; set; } | |||
public JT808ProducerConfig Value => this; | |||
} | |||
} |
@@ -37,7 +37,6 @@ namespace JT808.DotNetty.Tcp.Handlers | |||
this.JT808MsgProducer = jT808MsgProducer; | |||
this.jT808AtomicCounterService = jT808AtomicCounterServiceFactory.Create(JT808TransportProtocolType.tcp); | |||
this.JT808Serializer = jT808Config.GetSerializer(); | |||
jT808Config.SkipCRCCode = true; | |||
logger = loggerFactory.CreateLogger<JT808TcpServerHandler>(); | |||
} | |||
@@ -0,0 +1,21 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netcoreapp2.2</TargetFramework> | |||
<IsPackable>false</IsPackable> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="xunit" Version="2.4.1" /> | |||
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1"> | |||
<PrivateAssets>all</PrivateAssets> | |||
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets> | |||
</PackageReference> | |||
</ItemGroup> | |||
<ItemGroup> | |||
<ProjectReference Include="..\..\JT808.DotNetty.Kafka\JT808.DotNetty.Kafka.csproj" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,20 @@ | |||
using JT808.DotNetty.Abstractions; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using Xunit; | |||
namespace JT808.DotNetty.Kafka.Test | |||
{ | |||
public class JT808MsgProducerTest | |||
{ | |||
[Fact] | |||
public void Test1() | |||
{ | |||
using (IJT808MsgProducer jT808MsgProducer = new JT808MsgProducer(new JT808ProducerConfig { BootstrapServers = "192.168.3.11:9092" })) | |||
{ | |||
jT808MsgProducer.ProduceAsync("123456", new byte[] { 0x7E, 0, 0x7E }); | |||
} | |||
} | |||
} | |||
} |
@@ -0,0 +1,15 @@ | |||
<Project Sdk="Microsoft.NET.Sdk"> | |||
<PropertyGroup> | |||
<TargetFramework>netcoreapp2.2</TargetFramework> | |||
<IsPackable>false</IsPackable> | |||
</PropertyGroup> | |||
<ItemGroup> | |||
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" /> | |||
<PackageReference Include="MSTest.TestAdapter" Version="1.4.0" /> | |||
<PackageReference Include="MSTest.TestFramework" Version="1.4.0" /> | |||
</ItemGroup> | |||
</Project> |
@@ -0,0 +1,14 @@ | |||
using Microsoft.VisualStudio.TestTools.UnitTesting; | |||
namespace JT808.DotNetty.RabbitMQ.Test | |||
{ | |||
[TestClass] | |||
public class UnitTest1 | |||
{ | |||
//https://www.rabbitmq.com/getstarted.html | |||
[TestMethod] | |||
public void TestMethod1() | |||
{ | |||
} | |||
} | |||
} |
@@ -38,7 +38,6 @@ namespace JT808.DotNetty.Udp.Handlers | |||
this.jT808UdpSessionManager = jT808UdpSessionManager; | |||
logger = loggerFactory.CreateLogger<JT808UdpServerHandler>(); | |||
JT808Serializer = jT808Config.GetSerializer(); | |||
jT808Config.SkipCRCCode = true; | |||
} | |||
protected override void ChannelRead0(IChannelHandlerContext ctx, JT808UdpPackage msg) | |||
@@ -37,9 +37,13 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.SimpleServer | |||
EndProject | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.CleintBenchmark", "JT808.DotNetty.CleintBenchmark\JT808.DotNetty.CleintBenchmark.csproj", "{C2B1A0F4-2C49-45DA-9F48-7A016FC6E9E1}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.Kafka", "JT808.DotNetty.Kafka\JT808.DotNetty.Kafka.csproj", "{7050DC16-4CD8-406C-9F3B-F085407E94EB}" | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka", "JT808.DotNetty.Kafka\JT808.DotNetty.Kafka.csproj", "{576A8394-AA60-4DAE-864B-D4BBB67B8E75}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ", "JT808.DotNetty.RabbitMQ\JT808.DotNetty.RabbitMQ.csproj", "{48C53550-610E-4CE5-AFE4-E285280A8365}" | |||
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT808.DotNetty.Kafka.Test", "JT808.DotNetty.Tests\JT808.DotNetty.Kafka.Test\JT808.DotNetty.Kafka.Test.csproj", "{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ", "JT808.DotNetty.RabbitMQ\JT808.DotNetty.RabbitMQ.csproj", "{BE8F9C65-2F03-4E8A-88D2-0F846D871473}" | |||
EndProject | |||
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT808.DotNetty.RabbitMQ.Test", "JT808.DotNetty.Tests\JT808.DotNetty.RabbitMQ.Test\JT808.DotNetty.RabbitMQ.Test.csproj", "{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}" | |||
EndProject | |||
Global | |||
GlobalSection(SolutionConfigurationPlatforms) = preSolution | |||
@@ -107,14 +111,22 @@ Global | |||
{C2B1A0F4-2C49-45DA-9F48-7A016FC6E9E1}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{C2B1A0F4-2C49-45DA-9F48-7A016FC6E9E1}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{C2B1A0F4-2C49-45DA-9F48-7A016FC6E9E1}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{7050DC16-4CD8-406C-9F3B-F085407E94EB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{7050DC16-4CD8-406C-9F3B-F085407E94EB}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{7050DC16-4CD8-406C-9F3B-F085407E94EB}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{7050DC16-4CD8-406C-9F3B-F085407E94EB}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{48C53550-610E-4CE5-AFE4-E285280A8365}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{48C53550-610E-4CE5-AFE4-E285280A8365}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{48C53550-610E-4CE5-AFE4-E285280A8365}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{48C53550-610E-4CE5-AFE4-E285280A8365}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{576A8394-AA60-4DAE-864B-D4BBB67B8E75}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{576A8394-AA60-4DAE-864B-D4BBB67B8E75}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{576A8394-AA60-4DAE-864B-D4BBB67B8E75}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{576A8394-AA60-4DAE-864B-D4BBB67B8E75}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{BE8F9C65-2F03-4E8A-88D2-0F846D871473}.Release|Any CPU.Build.0 = Release|Any CPU | |||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.ActiveCfg = Debug|Any CPU | |||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Debug|Any CPU.Build.0 = Debug|Any CPU | |||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.ActiveCfg = Release|Any CPU | |||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64}.Release|Any CPU.Build.0 = Release|Any CPU | |||
EndGlobalSection | |||
GlobalSection(SolutionProperties) = preSolution | |||
HideSolutionNode = FALSE | |||
@@ -127,6 +139,8 @@ Global | |||
{A0F2F006-5AEB-454E-83C5-ABFB58DE17A9} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} | |||
{E6F61CE8-BFB4-4946-A0D3-AECCE77824E5} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6} | |||
{CCE6AEFB-1AB0-4BD9-8EA2-8B4CDD097E88} = {2459FB59-8A33-49A4-ADBC-A0B12C5886A6} | |||
{50A94BD5-5CDF-4777-AE4C-80BA769AEDAB} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} | |||
{D3CA0D73-1CCF-41BA-88D8-5BE50515CA64} = {3BD7FF02-8516-4A77-A385-9FDCDD792E22} | |||
EndGlobalSection | |||
GlobalSection(ExtensibilityGlobals) = postSolution | |||
SolutionGuid = {FC0FFCEA-E1EF-4C97-A1C5-F89418B6834B} | |||