@@ -1,58 +1,89 @@ | |||
using Confluent.Kafka; | |||
using JT809.GrpcProtos; | |||
using JT809.KafkaService.Partitions; | |||
using JT809.PubSub.Abstractions; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT809.KafkaService | |||
{ | |||
/// <summary> | |||
/// | |||
/// https://github.com/aspnet/Extensions/blob/master/src/DependencyInjection/DI.Specification.Tests/src/DependencyInjectionSpecificationTests.cs | |||
/// https://github.com/aspnet/Extensions/pull/536 | |||
/// </summary> | |||
public static class JT809KafkaServiceExtensions | |||
{ | |||
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||
{ | |||
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | |||
serviceDescriptors.AddSingleton(typeof(JT809Producer<byte[]>), typeof(JT809_Same_Producer)); | |||
serviceDescriptors.AddSingleton(typeof(JT809Producer<JT809GpsPosition>), typeof(JT809_GpsPositio_Producer)); | |||
serviceDescriptors.AddSingleton(typeof(JT809Producer<byte[]>), (service) => { | |||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | |||
return new JT809_Same_Producer(new JT809TopicOptions { TopicName = "jt809.same" }, producerConfig); | |||
}); | |||
serviceDescriptors.AddSingleton(typeof(JT809Producer<JT809GpsPosition>), (service) => { | |||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | |||
return new JT809_GpsPositio_Producer(new JT809TopicOptions { TopicName = "jt809.gps" }, producerConfig); | |||
}); | |||
return serviceDescriptors; | |||
} | |||
public static IServiceCollection AddJT809KafkaProducerPartitionsService<TPartitionFactory>(this IServiceCollection serviceDescriptors,Action<JT809PartitionOptions> action) | |||
where TPartitionFactory: IJT809ProducerPartitionFactory | |||
public static IServiceCollection AddJT809KafkaProducerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action<JT809PartitionOptions> action) | |||
{ | |||
serviceDescriptors.Configure(action); | |||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); | |||
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig")); | |||
serviceDescriptors.Configure(action); | |||
serviceDescriptors.AddSingleton<IJT809ProducerPartitionFactory, JT809GpsPositionProducerPartitionFactoryImpl>(); | |||
serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer<byte[]>), (service) => { | |||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | |||
var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | |||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | |||
return new JT809_Same_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.same" }, producerConfig, producerPartitionFactory, partitionOptions); | |||
}); | |||
serviceDescriptors.AddSingleton(typeof(JT809PartitionProducer<JT809GpsPosition>), (service) => { | |||
var producerConfig = service.GetRequiredService<IOptions<ProducerConfig>>(); | |||
var producerPartitionFactory = service.GetRequiredService<IJT809ProducerPartitionFactory>(); | |||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | |||
return new JT809_GpsPositio_Partition_Producer(new JT809TopicOptions { TopicName = "jt809.partition.gps" }, producerConfig, producerPartitionFactory, partitionOptions); | |||
}); | |||
return serviceDescriptors; | |||
} | |||
public static IServiceCollection AddJT809KafkaProducerPartitionsService<TPartitionFactory>(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||
where TPartitionFactory : IJT809ProducerPartitionFactory | |||
public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration) | |||
{ | |||
serviceDescriptors.Configure<JT809PartitionOptions>(configuration.GetSection("JT809PartitionOptions")); | |||
serviceDescriptors.AddSingleton(typeof(IJT809ProducerPartitionFactory), typeof(TPartitionFactory)); | |||
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | |||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<byte[]>), (service)=> { | |||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | |||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | |||
consumerConfig.Value.GroupId = "JT809.same.Test"; | |||
return new JT809_Same_Consumer(new JT809TopicOptions { TopicName = "jt809.same" }, consumerConfig, loggerFactory); | |||
}); | |||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<JT809GpsPosition>), (service) => { | |||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | |||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | |||
consumerConfig.Value.GroupId = "JT809.gps.Test"; | |||
return new JT809_GpsPosition_Consumer(new JT809TopicOptions { TopicName = "jt809.gps" }, consumerConfig, loggerFactory); | |||
}); | |||
return serviceDescriptors; | |||
} | |||
public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action<JT809PartitionOptions> action = null) | |||
public static IServiceCollection AddJT809KafkaConsumerPartitionsService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action<JT809PartitionOptions> action) | |||
{ | |||
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig")); | |||
if (configuration.GetSection("JT809PartitionOptions").Exists()) | |||
{ | |||
serviceDescriptors.Configure<JT809PartitionOptions>(configuration.GetSection("JT809PartitionOptions")); | |||
} | |||
if (action != null) | |||
{ | |||
serviceDescriptors.Configure<JT809PartitionOptions>(action); | |||
} | |||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<byte[]>), typeof(JT809_Same_Consumer)); | |||
serviceDescriptors.AddSingleton(typeof(JT809Consumer<JT809GpsPosition>), typeof(JT809_GpsPosition_Consumer)); | |||
serviceDescriptors.Configure(action); | |||
serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer<byte[]>), (service) => { | |||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | |||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | |||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | |||
consumerConfig.Value.GroupId = "JT809.partition.same.Test"; | |||
return new JT809_Same_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.same" } , loggerFactory); | |||
}); | |||
serviceDescriptors.AddSingleton(typeof(JT809PartitionConsumer<JT809GpsPosition>), (service) => { | |||
var loggerFactory = service.GetRequiredService<ILoggerFactory>(); | |||
var consumerConfig = service.GetRequiredService<IOptions<ConsumerConfig>>(); | |||
var partitionOptions = service.GetRequiredService<IOptions<JT809PartitionOptions>>(); | |||
consumerConfig.Value.GroupId = "JT809.partition.gps.Test"; | |||
return new JT809_GpsPosition_Partition_Consumer(consumerConfig, partitionOptions,new JT809TopicOptions { TopicName = "jt809.partition.gps" }, loggerFactory); | |||
}); | |||
return serviceDescriptors; | |||
} | |||
} | |||
@@ -48,10 +48,6 @@ namespace JT809.KafkaService | |||
{ | |||
consumerBuilder.SetValueDeserializer(Deserializer); | |||
} | |||
consumerBuilder.SetPartitionsAssignedHandler((c, p) => | |||
{ | |||
p.Add(topicPartition); | |||
}); | |||
consumers.Add(consumerBuilder.Build()); | |||
} | |||
return consumers; | |||
@@ -92,7 +88,7 @@ namespace JT809.KafkaService | |||
{ | |||
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据 | |||
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据 | |||
//consumers[n].Assign(topicPartitionList[n]); | |||
Consumers[n].Assign(topicPartitionList[n]); | |||
var data = Consumers[n].Consume(Cts.Token); | |||
if (logger.IsEnabled(LogLevel.Debug)) | |||
{ | |||
@@ -31,6 +31,24 @@ namespace JT809.KafkaService | |||
{ | |||
producerBuilder.SetValueSerializer(Serializer); | |||
} | |||
return producerBuilder.Build(); | |||
} | |||
protected JT809PartitionProducer( | |||
IOptions<JT809TopicOptions> topicOptionAccessor, | |||
IOptions<ProducerConfig> producerConfig, | |||
IJT809ProducerPartitionFactory producerPartitionFactory, | |||
IOptions<JT809PartitionOptions> partitionOptionsAccessor) | |||
: base(topicOptionAccessor.Value.TopicName, producerConfig.Value) | |||
{ | |||
PartitionOptions = partitionOptionsAccessor.Value; | |||
ProducerPartitionFactory = producerPartitionFactory; | |||
Producer = CreateProducer(); | |||
CreatePartition(); | |||
} | |||
private void CreatePartition() | |||
{ | |||
if (PartitionOptions != null) | |||
{ | |||
TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>(); | |||
@@ -77,19 +95,6 @@ namespace JT809.KafkaService | |||
} | |||
} | |||
} | |||
return producerBuilder.Build(); | |||
} | |||
protected JT809PartitionProducer( | |||
IOptions<JT809TopicOptions> topicOptionAccessor, | |||
ProducerConfig producerConfig, | |||
IJT809ProducerPartitionFactory producerPartitionFactory, | |||
IOptions<JT809PartitionOptions> partitionOptionsAccessor) | |||
: base(topicOptionAccessor.Value.TopicName, producerConfig) | |||
{ | |||
PartitionOptions = partitionOptionsAccessor.Value; | |||
ProducerPartitionFactory = producerPartitionFactory; | |||
Producer = CreateProducer(); | |||
} | |||
public override void Dispose() | |||
@@ -0,0 +1,17 @@ | |||
using Confluent.Kafka; | |||
using Google.Protobuf; | |||
using JT809.GrpcProtos; | |||
using JT809.PubSub.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
namespace JT809.KafkaService | |||
{ | |||
public sealed class JT809_GpsPositio_Partition_Producer : JT809PartitionProducer<JT809GpsPosition> | |||
{ | |||
public JT809_GpsPositio_Partition_Producer(IOptions<JT809TopicOptions> topicOptionAccessor, IOptions<ProducerConfig> producerConfig, IJT809ProducerPartitionFactory producerPartitionFactory, IOptions<JT809PartitionOptions> partitionOptionsAccessor) : base(topicOptionAccessor, producerConfig, producerPartitionFactory, partitionOptionsAccessor) | |||
{ | |||
} | |||
protected override Serializer<JT809GpsPosition> Serializer => (position) => position.ToByteArray(); | |||
} | |||
} |
@@ -0,0 +1,26 @@ | |||
using Confluent.Kafka; | |||
using Google.Protobuf; | |||
using JT809.GrpcProtos; | |||
using JT809.PubSub.Abstractions; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT809.KafkaService | |||
{ | |||
public sealed class JT809_GpsPosition_Partition_Consumer : JT809PartitionConsumer<JT809GpsPosition> | |||
{ | |||
public JT809_GpsPosition_Partition_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor, IOptions<JT809PartitionOptions> partitionOptionsAccessor, IOptions<JT809TopicOptions> topicOptionsAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor, partitionOptionsAccessor, topicOptionsAccessor, loggerFactory) | |||
{ | |||
} | |||
protected override Deserializer<JT809GpsPosition> Deserializer => (data, isNull) => { | |||
if (isNull) return default; | |||
return new MessageParser<JT809GpsPosition>(() => new JT809GpsPosition()) | |||
.ParseFrom(data.ToArray()); | |||
}; | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using Confluent.Kafka; | |||
using Google.Protobuf; | |||
using JT809.GrpcProtos; | |||
using JT809.PubSub.Abstractions; | |||
using Microsoft.Extensions.Logging; | |||
using Microsoft.Extensions.Options; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Threading; | |||
using System.Threading.Tasks; | |||
namespace JT809.KafkaService | |||
{ | |||
public sealed class JT809_Same_Partition_Consumer : JT809PartitionConsumer<byte[]> | |||
{ | |||
public JT809_Same_Partition_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor, IOptions<JT809PartitionOptions> partitionOptionsAccessor, IOptions<JT809TopicOptions> topicOptionsAccessor, ILoggerFactory loggerFactory) : base(consumerConfigAccessor, partitionOptionsAccessor, topicOptionsAccessor, loggerFactory) | |||
{ | |||
} | |||
} | |||
} |
@@ -0,0 +1,16 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
using Confluent.Kafka; | |||
using JT809.PubSub.Abstractions; | |||
using Microsoft.Extensions.Options; | |||
namespace JT809.KafkaService | |||
{ | |||
public sealed class JT809_Same_Partition_Producer : JT809PartitionProducer<byte[]> | |||
{ | |||
public JT809_Same_Partition_Producer(IOptions<JT809TopicOptions> topicOptionAccessor, IOptions<ProducerConfig> producerConfig, IJT809ProducerPartitionFactory producerPartitionFactory, IOptions<JT809PartitionOptions> partitionOptionsAccessor) : base(topicOptionAccessor, producerConfig, producerPartitionFactory, partitionOptionsAccessor) | |||
{ | |||
} | |||
} | |||
} |
@@ -0,0 +1,26 @@ | |||
using System; | |||
using Xunit; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using JT809.Protocol.Enums; | |||
using JT809.Protocol.Extensions; | |||
using System.Threading; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class ConsumerAssignPartitionTest : TestConsumerAssignPartitionsBase | |||
{ | |||
[Fact] | |||
public void Test1() | |||
{ | |||
ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestPartitionService>(); | |||
consumerTestService.GpsConsumer.OnMessage((Message)=> | |||
{ | |||
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId); | |||
//Assert.Equal("粤A23456", Message.Data.Vno); | |||
Assert.Equal(2, Message.Data.VColor); | |||
//Assert.Equal("smallchi", Message.Data.FromChannel); | |||
}); | |||
Thread.Sleep(100000); | |||
} | |||
} | |||
} |
@@ -0,0 +1,50 @@ | |||
using System; | |||
using Xunit; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using JT809.Protocol.Enums; | |||
using JT809.Protocol.Extensions; | |||
using System.Threading; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class ConsumerPartitionTest : TestConsumerPartitionBase | |||
{ | |||
[Fact] | |||
public void Test1() | |||
{ | |||
ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestPartitionService>(); | |||
consumerTestService.GpsConsumer.OnMessage((Message)=> | |||
{ | |||
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId); | |||
//Assert.Equal("粤A23456", Message.Data.Vno); | |||
Assert.Equal(2, Message.Data.VColor); | |||
//Assert.Equal("smallchi", Message.Data.FromChannel); | |||
}); | |||
Thread.Sleep(100000); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestPartitionService>(); | |||
consumerTestService.SameConsumer.OnMessage((Message) => | |||
{ | |||
Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId); | |||
//Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data); | |||
}); | |||
Thread.Sleep(100000); | |||
} | |||
[Fact] | |||
public void Test3() | |||
{ | |||
ConsumerTestPartitionService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestPartitionService>(); | |||
consumerTestService.SameConsumer.OnMessage((Message) => | |||
{ | |||
Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId); | |||
//Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data); | |||
}); | |||
Thread.Sleep(100000); | |||
} | |||
} | |||
} |
@@ -12,15 +12,15 @@ namespace JT809.KafkaServiceTest | |||
[Fact] | |||
public void Test1() | |||
{ | |||
ConsumerTestService producerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>(); | |||
producerTestService.GpsConsumer.OnMessage((Message)=> | |||
ConsumerTestService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>(); | |||
consumerTestService.GpsConsumer.OnMessage((Message)=> | |||
{ | |||
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId); | |||
Assert.Equal("粤A23456", Message.Data.Vno); | |||
Assert.Equal(2, Message.Data.VColor); | |||
Assert.Equal("smallchi", Message.Data.FromChannel); | |||
}); | |||
producerTestService.GpsConsumer.Subscribe(); | |||
consumerTestService.GpsConsumer.Subscribe(); | |||
Thread.Sleep(100000); | |||
} | |||
@@ -28,13 +28,13 @@ namespace JT809.KafkaServiceTest | |||
[Fact] | |||
public void Test2() | |||
{ | |||
ConsumerTestService producerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>(); | |||
producerTestService.SameConsumer.OnMessage((Message) => | |||
ConsumerTestService consumerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>(); | |||
consumerTestService.SameConsumer.OnMessage((Message) => | |||
{ | |||
Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId); | |||
Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data); | |||
}); | |||
producerTestService.SameConsumer.Subscribe(); | |||
consumerTestService.SameConsumer.Subscribe(); | |||
Thread.Sleep(100000); | |||
} | |||
@@ -0,0 +1,19 @@ | |||
using JT809.GrpcProtos; | |||
using JT809.KafkaService; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class ConsumerTestPartitionService | |||
{ | |||
public JT809PartitionConsumer<byte[]> SameConsumer { get; } | |||
public JT809PartitionConsumer<JT809GpsPosition> GpsConsumer { get; } | |||
public ConsumerTestPartitionService(JT809PartitionConsumer<byte[]> sameConsumer, JT809PartitionConsumer<JT809GpsPosition> gpsConsumer) | |||
{ | |||
SameConsumer = sameConsumer; | |||
GpsConsumer = gpsConsumer; | |||
} | |||
} | |||
} |
@@ -6,33 +6,50 @@ using JT809.Protocol.Extensions; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class ProducerPartitionTest : TestProducerBase | |||
public class ProducerPartitionTest : TestProducerPartitionBase | |||
{ | |||
[Fact] | |||
public void Test1() | |||
{ | |||
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>(); | |||
ProducerTestPartitionService producerTestService = ServiceProvider.GetRequiredService<ProducerTestPartitionService>(); | |||
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition | |||
{ | |||
Vno= "粤A23456", | |||
VColor=2, | |||
GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, | |||
FromChannel="smallchi" | |||
FromChannel="smallchi1" | |||
}); | |||
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition | |||
{ | |||
Vno = "粤A23457", | |||
VColor = 2, | |||
GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, | |||
FromChannel = "smallchi2" | |||
}); | |||
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition | |||
{ | |||
Vno = "粤A23458", | |||
VColor = 2, | |||
GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, | |||
FromChannel = "smallchi3" | |||
}); | |||
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition | |||
{ | |||
Vno = "粤A23459", | |||
VColor = 2, | |||
GpsTime = (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000, | |||
FromChannel = "smallchi4" | |||
}); | |||
} | |||
[Fact] | |||
public void Test2() | |||
{ | |||
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>(); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 }); | |||
} | |||
[Fact] | |||
public void Test3() | |||
{ | |||
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>(); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 }); | |||
ProducerTestPartitionService producerTestService = ServiceProvider.GetRequiredService<ProducerTestPartitionService>(); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23452_2", new byte[] { 0x01, 0x02, 0x03 }); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23453_2", new byte[] { 0x02, 0x03, 0x04 }); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23455_2", new byte[] { 0x03, 0x04, 0x05 }); | |||
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23452_2", new byte[] { 0x04, 0x05, 0x06 }); | |||
} | |||
} | |||
} |
@@ -0,0 +1,20 @@ | |||
using JT809.GrpcProtos; | |||
using JT809.KafkaService; | |||
using System; | |||
using System.Collections.Generic; | |||
using System.Text; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class ProducerTestPartitionService | |||
{ | |||
public JT809PartitionProducer<byte[]> SameProducer { get; } | |||
public JT809PartitionProducer<JT809GpsPosition> GpsProducer { get; } | |||
public ProducerTestPartitionService(JT809PartitionProducer<byte[]> sameProducer, JT809PartitionProducer<JT809GpsPosition> gpsProducer) | |||
{ | |||
SameProducer = sameProducer; | |||
GpsProducer = gpsProducer; | |||
} | |||
} | |||
} |
@@ -0,0 +1,41 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Text; | |||
using JT809.KafkaService; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class TestConsumerAssignPartitionsBase | |||
{ | |||
public IServiceProvider ServiceProvider { get; } | |||
public IConfigurationRoot ConfigurationRoot { get; } | |||
public TestConsumerAssignPartitionsBase() | |||
{ | |||
var builder = new ConfigurationBuilder(); | |||
builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||
builder.AddJsonFile("appsettings.Partition.json"); | |||
ConfigurationRoot = builder.Build(); | |||
ServiceCollection serviceDescriptors = new ServiceCollection(); | |||
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
serviceDescriptors.AddLogging(configure => | |||
{ | |||
configure.AddDebug(); | |||
configure.SetMinimumLevel(LogLevel.Trace); | |||
}); | |||
serviceDescriptors.AddJT809KafkaConsumerPartitionsService(ConfigurationRoot,partition=> | |||
{ | |||
partition.Partition = 4; | |||
partition.AssignPartitions = new List<int> { 1, 3 }; | |||
}); | |||
serviceDescriptors.AddSingleton<ConsumerTestPartitionService>(); | |||
ServiceProvider = serviceDescriptors.BuildServiceProvider(); | |||
} | |||
} | |||
} |
@@ -0,0 +1,37 @@ | |||
using System; | |||
using System.Collections.Generic; | |||
using System.IO; | |||
using System.Text; | |||
using JT809.KafkaService; | |||
using Microsoft.Extensions.Configuration; | |||
using Microsoft.Extensions.DependencyInjection; | |||
using Microsoft.Extensions.Logging; | |||
namespace JT809.KafkaServiceTest | |||
{ | |||
public class TestConsumerPartitionBase | |||
{ | |||
public IServiceProvider ServiceProvider { get; } | |||
public IConfigurationRoot ConfigurationRoot { get; } | |||
public TestConsumerPartitionBase() | |||
{ | |||
var builder = new ConfigurationBuilder(); | |||
builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory); | |||
builder.AddJsonFile("appsettings.Partition.json"); | |||
ConfigurationRoot = builder.Build(); | |||
ServiceCollection serviceDescriptors = new ServiceCollection(); | |||
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>(); | |||
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>)); | |||
serviceDescriptors.AddLogging(configure => | |||
{ | |||
configure.AddDebug(); | |||
configure.SetMinimumLevel(LogLevel.Trace); | |||
}); | |||
serviceDescriptors.AddJT809KafkaConsumerPartitionsService(ConfigurationRoot,partition=> { partition.Partition = 4; }); | |||
serviceDescriptors.AddSingleton<ConsumerTestPartitionService>(); | |||
ServiceProvider = serviceDescriptors.BuildServiceProvider(); | |||
} | |||
} | |||
} |
@@ -29,8 +29,8 @@ namespace JT809.KafkaServiceTest | |||
configure.AddDebug(); | |||
configure.SetMinimumLevel(LogLevel.Trace); | |||
}); | |||
serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot); | |||
serviceDescriptors.AddSingleton<ProducerTestService>(); | |||
serviceDescriptors.AddJT809KafkaProducerPartitionsService(ConfigurationRoot,partition=> { partition.Partition = 4; }); | |||
serviceDescriptors.AddSingleton<ProducerTestPartitionService>(); | |||
ServiceProvider = serviceDescriptors.BuildServiceProvider(); | |||
} | |||
} | |||
@@ -17,7 +17,6 @@ | |||
}, | |||
"KafkaConsumerConfig": { | |||
"BootstrapServers": "127.0.0.1:9092", | |||
"EnableAutoCommit": true, | |||
"GroupId": "JT809.Gps.Test" | |||
"EnableAutoCommit": true | |||
} | |||
} |