From 2e86b46e31a9683ba2fea04441d67d8c62ce49d2 Mon Sep 17 00:00:00 2001
From: SmallChi <564952747@qq.com>
Date: Sun, 14 Apr 2019 12:09:13 +0800
Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0kafka=E7=9A=84=E5=88=86?=
=?UTF-8?q?=E5=8C=BA=E5=8F=8A=E6=8C=87=E6=B4=BE=E5=88=86=E5=8C=BA=E6=B5=8B?=
=?UTF-8?q?=E8=AF=95?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
---
.../JT809KafkaServiceExtensions.cs | 83 +++++++++++++------
.../JT809PartitionConsumer.cs | 6 +-
.../JT809PartitionProducer.cs | 31 ++++---
.../JT809_GpsPositio_Partition_Producer.cs | 17 ++++
.../JT809_GpsPosition_Partition_Consumer.cs | 26 ++++++
.../JT809_Same_Partition_Consumer.cs | 20 +++++
.../JT809_Same_Partition_Producer.cs | 16 ++++
.../ConsumerAssignPartitionTest.cs | 26 ++++++
.../ConsumerPartitionTest.cs | 50 +++++++++++
.../JT809.KafkaServiceTest/ConsumerTest.cs | 12 +--
.../ConsumerTestPartitionService.cs | 19 +++++
.../ProducerPartitionTest.cs | 41 ++++++---
.../ProducerTestPartitionService.cs | 20 +++++
.../TestConsumerAssignPartitionsBase.cs | 41 +++++++++
.../TestConsumerPartitionBase.cs | 37 +++++++++
.../TestProducerPartitionBase.cs | 4 +-
.../appsettings.Partition.json | 3 +-
17 files changed, 386 insertions(+), 66 deletions(-)
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Partition_Producer.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Partition_Consumer.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Consumer.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Producer.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerAssignPartitionTest.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerPartitionTest.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerAssignPartitionsBase.cs
create mode 100644 src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerPartitionBase.cs
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
index 8551578..2b45769 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
@@ -1,58 +1,89 @@
using Confluent.Kafka;
using JT809.GrpcProtos;
+using JT809.KafkaService.Partitions;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;
namespace JT809.KafkaService
{
- ///
- ///
- /// https://github.com/aspnet/Extensions/blob/master/src/DependencyInjection/DI.Specification.Tests/src/DependencyInjectionSpecificationTests.cs
- /// https://github.com/aspnet/Extensions/pull/536
- ///
public static class JT809KafkaServiceExtensions
{
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig"));
- serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_Same_Producer));
- serviceDescriptors.AddSingleton(typeof(JT809Producer), typeof(JT809_GpsPositio_Producer));
+ serviceDescriptors.AddSingleton(typeof(JT809Producer), (service) => {
+ var producerConfig = service.GetRequiredService>();
+ return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig);
+ });
+ serviceDescriptors.AddSingleton(typeof(JT809Producer), (service) => {
+ var producerConfig = service.GetRequiredService>();
+ return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig);
+ });
return serviceDescriptors;
}
- public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors,Action action)
- where TPartitionFactory: IJT809ProducerPartitionFactory
+ public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action action)
{
- serviceDescriptors.Configure(action);
- serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory));
+ serviceDescriptors.Configure(configuration.GetSection("KafkaProducerConfig"));
+ serviceDescriptors.Configure(action);
+ serviceDescriptors.AddSingleton();
+ serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer), (service) => {
+ var producerConfig = service.GetRequiredService>();
+ var producerPartitionFactory = service.GetRequiredService();
+ var partitionOptions = service.GetRequiredService>();
+ return new JT809_Same_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.same" }, producerConfig, producerPartitionFactory, partitionOptions);
+ });
+ serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer), (service) => {
+ var producerConfig = service.GetRequiredService>();
+ var producerPartitionFactory = service.GetRequiredService();
+ var partitionOptions = service.GetRequiredService>();
+ return new JT809_GpsPositio_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.gps" }, producerConfig, producerPartitionFactory, partitionOptions);
+ });
return serviceDescriptors;
}
- public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
- where TPartitionFactory : IJT809ProducerPartitionFactory
+ public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
- serviceDescriptors.Configure(configuration.GetSection("JT809PartitionOptions"));
- serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory));
+ serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig"));
+ serviceDescriptors.AddSingleton(typeof(JT809Consumer), (service)=> {
+ var loggerFactory = service.GetRequiredService();
+ var consumerConfig = service.GetRequiredService>();
+ consumerConfig.Value.GroupId = "JT809.same.Test";
+ return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory);
+ });
+ serviceDescriptors.AddSingleton(typeof(JT809Consumer), (service) => {
+ var loggerFactory = service.GetRequiredService();
+ var consumerConfig = service.GetRequiredService>();
+ consumerConfig.Value.GroupId = "JT809.gps.Test";
+ return new JT809_GpsPosition_Consumer(new JT809TopicOptions { TopicName = "jt809.gps" }, consumerConfig, loggerFactory);
+ });
return serviceDescriptors;
}
- public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action action = null)
+ public static IServiceCollection AddJT809KafkaConsumerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action action)
{
serviceDescriptors.Configure(configuration.GetSection("KafkaConsumerConfig"));
- if (configuration.GetSection("JT809PartitionOptions").Exists())
- {
- serviceDescriptors.Configure(configuration.GetSection("JT809PartitionOptions"));
- }
- if (action != null)
- {
- serviceDescriptors.Configure(action);
- }
- serviceDescriptors.AddSingleton(typeof(JT809Consumer), typeof(JT809_Same_Consumer));
- serviceDescriptors.AddSingleton(typeof(JT809Consumer), typeof(JT809_GpsPosition_Consumer));
+ serviceDescriptors.Configure(action);
+ serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer), (service) => {
+ var loggerFactory = service.GetRequiredService();
+ var consumerConfig = service.GetRequiredService>();
+ var partitionOptions = service.GetRequiredService>();
+ consumerConfig.Value.GroupId = "JT809.partition.same.Test";
+ return new JT809_Same_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.same" } , loggerFactory);
+ });
+ serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer), (service) => {
+ var loggerFactory = service.GetRequiredService();
+ var consumerConfig = service.GetRequiredService>();
+ var partitionOptions = service.GetRequiredService>();
+ consumerConfig.Value.GroupId = "JT809.partition.gps.Test";
+ return new JT809_GpsPosition_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.gps" }, loggerFactory);
+ });
return serviceDescriptors;
}
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs
index c18c75c..9902e03 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs
@@ -48,10 +48,6 @@ namespace JT809.KafkaService
{
consumerBuilder.SetValueDeserializer(Deserializer);
}
- consumerBuilder.SetPartitionsAssignedHandler((c, p) =>
- {
- p.Add(topicPartition);
- });
consumers.Add(consumerBuilder.Build());
}
return consumers;
@@ -92,7 +88,7 @@ namespace JT809.KafkaService
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
- //consumers[n].Assign(topicPartitionList[n]);
+ Consumers[n].Assign(topicPartitionList[n]);
var data = Consumers[n].Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs
index 79ad8e9..aaff17c 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs
@@ -31,6 +31,24 @@ namespace JT809.KafkaService
{
producerBuilder.SetValueSerializer(Serializer);
}
+ return producerBuilder.Build();
+ }
+
+ protected JT809PartitionProducer(
+ IOptions topicOptionAccessor,
+ IOptions producerConfig,
+ IJT809ProducerPartitionFactory producerPartitionFactory,
+ IOptions partitionOptionsAccessor)
+ : base(topicOptionAccessor.Value.TopicName, producerConfig.Value)
+ {
+ PartitionOptions = partitionOptionsAccessor.Value;
+ ProducerPartitionFactory = producerPartitionFactory;
+ Producer = CreateProducer();
+ CreatePartition();
+ }
+
+ private void CreatePartition()
+ {
if (PartitionOptions != null)
{
TopicPartitionCache = new ConcurrentDictionary();
@@ -77,19 +95,6 @@ namespace JT809.KafkaService
}
}
}
- return producerBuilder.Build();
- }
-
- protected JT809PartitionProducer(
- IOptions topicOptionAccessor,
- ProducerConfig producerConfig,
- IJT809ProducerPartitionFactory producerPartitionFactory,
- IOptions partitionOptionsAccessor)
- : base(topicOptionAccessor.Value.TopicName, producerConfig)
- {
- PartitionOptions = partitionOptionsAccessor.Value;
- ProducerPartitionFactory = producerPartitionFactory;
- Producer = CreateProducer();
}
public override void Dispose()
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Partition_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Partition_Producer.cs
new file mode 100644
index 0000000..35732a5
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Partition_Producer.cs
@@ -0,0 +1,17 @@
+using Confluent.Kafka;
+using Google.Protobuf;
+using JT809.GrpcProtos;
+using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Options;
+
+namespace JT809.KafkaService
+{
+ public sealed class JT809_GpsPositio_Partition_Producer : JT809PartitionProducer
+ {
+ public JT809_GpsPositio_Partition_Producer(IOptions topicOptionAccessor, IOptions producerConfig, IJT809ProducerPartitionFactory producerPartitionFactory, IOptions partitionOptionsAccessor) : base(topicOptionAccessor, producerConfig, producerPartitionFactory, partitionOptionsAccessor)
+ {
+ }
+
+ protected override Serializer Serializer => (position) => position.ToByteArray();
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Partition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Partition_Consumer.cs
new file mode 100644
index 0000000..185d47e
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Partition_Consumer.cs
@@ -0,0 +1,26 @@
+using Confluent.Kafka;
+using Google.Protobuf;
+using JT809.GrpcProtos;
+using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT809.KafkaService
+{
+ public sealed class JT809_GpsPosition_Partition_Consumer : JT809PartitionConsumer
+ {
+ public JT809_GpsPosition_Partition_Consumer(IOptions consumerConfigAccessor, IOptions partitionOptionsAccessor, IOptions topicOptionsAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor, partitionOptionsAccessor, topicOptionsAccessor, loggerFactory)
+ {
+ }
+
+ protected override Deserializer Deserializer => (data, isNull) => {
+ if (isNull) return default;
+ return new MessageParser(() => new JT809GpsPosition())
+ .ParseFrom(data.ToArray());
+ };
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Consumer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Consumer.cs
new file mode 100644
index 0000000..440fa3f
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Consumer.cs
@@ -0,0 +1,20 @@
+using Confluent.Kafka;
+using Google.Protobuf;
+using JT809.GrpcProtos;
+using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Logging;
+using Microsoft.Extensions.Options;
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+
+namespace JT809.KafkaService
+{
+ public sealed class JT809_Same_Partition_Consumer : JT809PartitionConsumer
+ {
+ public JT809_Same_Partition_Consumer(IOptions consumerConfigAccessor, IOptions partitionOptionsAccessor, IOptions topicOptionsAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor, partitionOptionsAccessor, topicOptionsAccessor, loggerFactory)
+ {
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Producer.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Producer.cs
new file mode 100644
index 0000000..dd5f7a8
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Partition_Producer.cs
@@ -0,0 +1,16 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Confluent.Kafka;
+using JT809.PubSub.Abstractions;
+using Microsoft.Extensions.Options;
+
+namespace JT809.KafkaService
+{
+ public sealed class JT809_Same_Partition_Producer : JT809PartitionProducer
+ {
+ public JT809_Same_Partition_Producer(IOptions topicOptionAccessor, IOptions producerConfig, IJT809ProducerPartitionFactory producerPartitionFactory, IOptions partitionOptionsAccessor) : base(topicOptionAccessor, producerConfig, producerPartitionFactory, partitionOptionsAccessor)
+ {
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerAssignPartitionTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerAssignPartitionTest.cs
new file mode 100644
index 0000000..87d92bb
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerAssignPartitionTest.cs
@@ -0,0 +1,26 @@
+using System;
+using Xunit;
+using Microsoft.Extensions.DependencyInjection;
+using JT809.Protocol.Enums;
+using JT809.Protocol.Extensions;
+using System.Threading;
+
+namespace JT809.KafkaServiceTest
+{
+ public class ConsumerAssignPartitionTest : TestConsumerAssignPartitionsBase
+ {
+ [Fact]
+ public void Test1()
+ {
+ ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.GpsConsumer.OnMessage((Message)=>
+ {
+ Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId);
+ //Assert.Equal("粤A23456", Message.Data.Vno);
+ Assert.Equal(2, Message.Data.VColor);
+ //Assert.Equal("smallchi", Message.Data.FromChannel);
+ });
+ Thread.Sleep(100000);
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerPartitionTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerPartitionTest.cs
new file mode 100644
index 0000000..ca800bd
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerPartitionTest.cs
@@ -0,0 +1,50 @@
+using System;
+using Xunit;
+using Microsoft.Extensions.DependencyInjection;
+using JT809.Protocol.Enums;
+using JT809.Protocol.Extensions;
+using System.Threading;
+
+namespace JT809.KafkaServiceTest
+{
+ public class ConsumerPartitionTest : TestConsumerPartitionBase
+ {
+ [Fact]
+ public void Test1()
+ {
+ ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.GpsConsumer.OnMessage((Message)=>
+ {
+ Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId);
+ //Assert.Equal("粤A23456", Message.Data.Vno);
+ Assert.Equal(2, Message.Data.VColor);
+ //Assert.Equal("smallchi", Message.Data.FromChannel);
+ });
+ Thread.Sleep(100000);
+ }
+
+ [Fact]
+ public void Test2()
+ {
+ ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.SameConsumer.OnMessage((Message) =>
+ {
+ Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId);
+ //Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data);
+ });
+ Thread.Sleep(100000);
+ }
+
+ [Fact]
+ public void Test3()
+ {
+ ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.SameConsumer.OnMessage((Message) =>
+ {
+ Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId);
+ //Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data);
+ });
+ Thread.Sleep(100000);
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs
index a4f9a4f..51f51dd 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs
@@ -12,15 +12,15 @@ namespace JT809.KafkaServiceTest
[Fact]
public void Test1()
{
- ConsumerTestService producerTestService = ServiceProvider.GetRequiredService();
- producerTestService.GpsConsumer.OnMessage((Message)=>
+ ConsumerTestService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.GpsConsumer.OnMessage((Message)=>
{
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId);
Assert.Equal("粤A23456", Message.Data.Vno);
Assert.Equal(2, Message.Data.VColor);
Assert.Equal("smallchi", Message.Data.FromChannel);
});
- producerTestService.GpsConsumer.Subscribe();
+ consumerTestService.GpsConsumer.Subscribe();
Thread.Sleep(100000);
}
@@ -28,13 +28,13 @@ namespace JT809.KafkaServiceTest
[Fact]
public void Test2()
{
- ConsumerTestService producerTestService = ServiceProvider.GetRequiredService();
- producerTestService.SameConsumer.OnMessage((Message) =>
+ ConsumerTestService consumerTestService = ServiceProvider.GetRequiredService();
+ consumerTestService.SameConsumer.OnMessage((Message) =>
{
Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId);
Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data);
});
- producerTestService.SameConsumer.Subscribe();
+ consumerTestService.SameConsumer.Subscribe();
Thread.Sleep(100000);
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
new file mode 100644
index 0000000..04b5ef6
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestPartitionService.cs
@@ -0,0 +1,19 @@
+using JT809.GrpcProtos;
+using JT809.KafkaService;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.KafkaServiceTest
+{
+ public class ConsumerTestPartitionService
+ {
+ public JT809PartitionConsumer SameConsumer { get; }
+ public JT809PartitionConsumer GpsConsumer { get; }
+ public ConsumerTestPartitionService(JT809PartitionConsumer sameConsumer, JT809PartitionConsumer gpsConsumer)
+ {
+ SameConsumer = sameConsumer;
+ GpsConsumer = gpsConsumer;
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs
index 2726ec7..c5bd293 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs
@@ -6,33 +6,50 @@ using JT809.Protocol.Extensions;
namespace JT809.KafkaServiceTest
{
- public class ProducerPartitionTest : TestProducerBase
+ public class ProducerPartitionTest : TestProducerPartitionBase
{
[Fact]
public void Test1()
{
- ProducerTestService producerTestService = ServiceProvider.GetRequiredService();
+ ProducerTestPartitionService producerTestService = ServiceProvider.GetRequiredService();
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition
{
Vno= "A23456",
VColor=2,
GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
- FromChannel="smallchi"
+ FromChannel="smallchi1"
+ });
+ producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition
+ {
+ Vno = "A23457",
+ VColor = 2,
+ GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
+ FromChannel = "smallchi2"
+ });
+ producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition
+ {
+ Vno = "A23458",
+ VColor = 2,
+ GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
+ FromChannel = "smallchi3"
+ });
+ producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.ʵʱϴλϢ.ToValueString(), "A23456_2", new GrpcProtos.JT809GpsPosition
+ {
+ Vno = "A23459",
+ VColor = 2,
+ GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
+ FromChannel = "smallchi4"
});
}
[Fact]
public void Test2()
{
- ProducerTestService producerTestService = ServiceProvider.GetRequiredService();
- producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 });
- }
-
- [Fact]
- public void Test3()
- {
- ProducerTestService producerTestService = ServiceProvider.GetRequiredService();
- producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23457_2", new byte[] { 0x01, 0x02, 0x03 });
+ ProducerTestPartitionService producerTestService = ServiceProvider.GetRequiredService();
+ producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23452_2", new byte[] { 0x01, 0x02, 0x03 });
+ producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23453_2", new byte[] { 0x02, 0x03, 0x04 });
+ producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23455_2", new byte[] { 0x03, 0x04, 0x05 });
+ producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "A23452_2", new byte[] { 0x04, 0x05, 0x06 });
}
}
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
new file mode 100644
index 0000000..90e9759
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestPartitionService.cs
@@ -0,0 +1,20 @@
+using JT809.GrpcProtos;
+using JT809.KafkaService;
+using System;
+using System.Collections.Generic;
+using System.Text;
+
+namespace JT809.KafkaServiceTest
+{
+ public class ProducerTestPartitionService
+ {
+ public JT809PartitionProducer SameProducer { get; }
+ public JT809PartitionProducer GpsProducer { get; }
+ public ProducerTestPartitionService(JT809PartitionProducer sameProducer, JT809PartitionProducer gpsProducer)
+ {
+ SameProducer = sameProducer;
+ GpsProducer = gpsProducer;
+ }
+
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerAssignPartitionsBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerAssignPartitionsBase.cs
new file mode 100644
index 0000000..204db66
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerAssignPartitionsBase.cs
@@ -0,0 +1,41 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using JT809.KafkaService;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace JT809.KafkaServiceTest
+{
+ public class TestConsumerAssignPartitionsBase
+ {
+ public IServiceProvider ServiceProvider { get; }
+
+ public IConfigurationRoot ConfigurationRoot { get; }
+ public TestConsumerAssignPartitionsBase()
+ {
+ var builder = new ConfigurationBuilder();
+ builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ builder.AddJsonFile("appsettings.Partition.json");
+ ConfigurationRoot = builder.Build();
+
+ ServiceCollection serviceDescriptors = new ServiceCollection();
+ serviceDescriptors.AddSingleton();
+ serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ serviceDescriptors.AddLogging(configure =>
+ {
+ configure.AddDebug();
+ configure.SetMinimumLevel(LogLevel.Trace);
+ });
+ serviceDescriptors.AddJT809KafkaConsumerPartitionsService(ConfigurationRoot,partition=>
+ {
+ partition.Partition = 4;
+ partition.AssignPartitions = new List { 1, 3 };
+ });
+ serviceDescriptors.AddSingleton();
+ ServiceProvider = serviceDescriptors.BuildServiceProvider();
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerPartitionBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerPartitionBase.cs
new file mode 100644
index 0000000..b2dba0c
--- /dev/null
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerPartitionBase.cs
@@ -0,0 +1,37 @@
+using System;
+using System.Collections.Generic;
+using System.IO;
+using System.Text;
+using JT809.KafkaService;
+using Microsoft.Extensions.Configuration;
+using Microsoft.Extensions.DependencyInjection;
+using Microsoft.Extensions.Logging;
+
+namespace JT809.KafkaServiceTest
+{
+ public class TestConsumerPartitionBase
+ {
+ public IServiceProvider ServiceProvider { get; }
+
+ public IConfigurationRoot ConfigurationRoot { get; }
+ public TestConsumerPartitionBase()
+ {
+ var builder = new ConfigurationBuilder();
+ builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
+ builder.AddJsonFile("appsettings.Partition.json");
+ ConfigurationRoot = builder.Build();
+
+ ServiceCollection serviceDescriptors = new ServiceCollection();
+ serviceDescriptors.AddSingleton();
+ serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
+ serviceDescriptors.AddLogging(configure =>
+ {
+ configure.AddDebug();
+ configure.SetMinimumLevel(LogLevel.Trace);
+ });
+ serviceDescriptors.AddJT809KafkaConsumerPartitionsService(ConfigurationRoot,partition=> { partition.Partition = 4; });
+ serviceDescriptors.AddSingleton();
+ ServiceProvider = serviceDescriptors.BuildServiceProvider();
+ }
+ }
+}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs
index 4b9fd03..5b80e8e 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs
@@ -29,8 +29,8 @@ namespace JT809.KafkaServiceTest
configure.AddDebug();
configure.SetMinimumLevel(LogLevel.Trace);
});
- serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot);
- serviceDescriptors.AddSingleton();
+ serviceDescriptors.AddJT809KafkaProducerPartitionsService(ConfigurationRoot,partition=> { partition.Partition = 4; });
+ serviceDescriptors.AddSingleton();
ServiceProvider = serviceDescriptors.BuildServiceProvider();
}
}
diff --git a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json
index 3c94db0..749e37d 100644
--- a/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json
+++ b/src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json
@@ -17,7 +17,6 @@
},
"KafkaConsumerConfig": {
"BootstrapServers": "127.0.0.1:9092",
- "EnableAutoCommit": true,
- "GroupId": "JT809.Gps.Test"
+ "EnableAutoCommit": true
}
}