Browse Source

调整生产消费模式及增加单元测试

tags/old
SmallChi 6 years ago
parent
commit
74a70a3e8a
25 changed files with 817 additions and 267 deletions
  1. +47
    -105
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs
  2. +34
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs
  3. +2
    -2
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs
  4. +160
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs
  5. +153
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs
  6. +21
    -95
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs
  7. +24
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs
  8. +2
    -18
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs
  9. +4
    -16
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs
  10. +2
    -13
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs
  11. +2
    -17
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs
  12. +42
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs
  13. +19
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs
  14. +37
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj
  15. +38
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs
  16. +31
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs
  17. +20
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs
  18. +37
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs
  19. +37
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs
  20. +37
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs
  21. +23
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json
  22. +23
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json
  23. +1
    -1
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs
  24. +14
    -0
      src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs
  25. +7
    -0
      src/JT809.DotNetty.sln

+ 47
- 105
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Consumer.cs View File

@@ -1,6 +1,7 @@
using Confluent.Kafka;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
@@ -8,137 +9,81 @@ using System.Threading.Tasks;

namespace JT809.KafkaService
{
public abstract class JT809Consumer<T> : IJT808ConsumerOfT<T>
public abstract class JT809Consumer<T> : JT809ConsumerBase<T>
{
private bool _disposed = false;
public CancellationTokenSource Cts => new CancellationTokenSource();
public override CancellationTokenSource Cts => new CancellationTokenSource();

public virtual string TopicName => JT809Constants.JT809TopicName;
protected ILogger logger { get; }

private readonly ILogger logger;

private List<TopicPartition> topicPartitionList;

private IList<IConsumer<string, T>> consumers;

protected abstract JT809PartitionOptions PartitionOptions { get; }

protected virtual Deserializer<T> Deserializer { get; }
protected override IList<IConsumer<string, T>> Consumers { get; }

protected JT809Consumer(
ConsumerConfig consumerConfig,
ILoggerFactory loggerFactory)
IOptions<JT809TopicOptions> topicOptionsAccessor,
IOptions<ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory)
: base(topicOptionsAccessor.Value.TopicName, consumerConfigAccessor.Value)
{
logger = loggerFactory.CreateLogger("JT809Consumer");
CreateTopicPartition();
consumers = new List<IConsumer<string, T>>();
foreach(var topicPartition in topicPartitionList)
{
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(consumerConfig);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
if (Deserializer != null)
{
consumerBuilder.SetValueDeserializer(Deserializer);
}
if (PartitionOptions.Partition > 1)
{
consumerBuilder.SetPartitionsAssignedHandler((c, p) => {
p.Add(topicPartition);
});
}
consumers.Add(consumerBuilder.Build());
}
}

private void CreateTopicPartition()
{
topicPartitionList = new List<TopicPartition>();
if (PartitionOptions.Partition > 1)
Consumers = new List<IConsumer<string, T>>();
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(ConsumerConfig);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
if(PartitionOptions.AssignPartitions!=null && PartitionOptions.AssignPartitions.Count>0)
{
foreach(var p in PartitionOptions.AssignPartitions)
{
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(p)));
}
}
else
{
for (int i = 0; i < PartitionOptions.Partition; i++)
{
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i)));
}
}
}
else
logger.LogError(error.Reason);
});
if (Deserializer != null)
{
for (int i = 0; i < PartitionOptions.Partition; i++)
{
topicPartitionList.Add(new TopicPartition(TopicName, new Partition(i)));
}
consumerBuilder.SetValueDeserializer(Deserializer);
}
Consumers.Add(consumerBuilder.Build());
}

public void OnMessage(Action<(string MsgId, T data)> callback)
public override void OnMessage(Action<(string MsgId, T Data)> callback)
{
logger.LogDebug($"consumers:{consumers.Count},topicPartitionList:{topicPartitionList.Count}");
for (int i = 0; i < consumers.Count; i++)
Task.Run(() =>
{
Task.Factory.StartNew((num) =>
while (!Cts.IsCancellationRequested)
{
int n = (int)num;
while (!Cts.IsCancellationRequested)
try
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = consumers[n].Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
catch (Exception ex)
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = Consumers[0].Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
}, i, Cts.Token);
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
}
}, Cts.Token);
}

public void Subscribe()
public override void Subscribe()
{
if (_disposed) return;
//仅有一个分区才需要订阅
if (topicPartitionList.Count == 1)
{
consumers[0].Subscribe(TopicName);
}
Consumers[0].Subscribe(TopicName);
}

public void Unsubscribe()
public override void Unsubscribe()
{
if (_disposed) return;
foreach(var c in consumers)
{
c.Unsubscribe();
}
Consumers[0].Unsubscribe();
}

public void Dispose()
public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
@@ -156,11 +101,8 @@ namespace JT809.KafkaService
if (disposing)
{
Cts.Cancel();
foreach (var c in consumers)
{
c.Close();
c.Dispose();
}
Consumers[0].Close();
Consumers[0].Dispose();
Cts.Dispose();
}
_disposed = true;


+ 34
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ConsumerBase.cs View File

@@ -0,0 +1,34 @@
using Confluent.Kafka;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.KafkaService
{
public abstract class JT809ConsumerBase<T> : IJT808ConsumerOfT<T>
{
public string TopicName { get; }

public ConsumerConfig ConsumerConfig { get; }

protected JT809ConsumerBase(string topicName, ConsumerConfig config)
{
ConsumerConfig = config;
TopicName = topicName;
}

public abstract CancellationTokenSource Cts { get; }
protected abstract IList<IConsumer<string, T>> Consumers { get; }

protected virtual Deserializer<T> Deserializer { get; set; }

public abstract void Dispose();
public abstract void OnMessage(Action<(string MsgId, T Data)> callback);
public abstract void Subscribe();
public abstract void Unsubscribe();
}
}

+ 2
- 2
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809KafkaServiceExtensions.cs View File

@@ -18,7 +18,7 @@ namespace JT809.KafkaService
{
public static IServiceCollection AddJT809KafkaProducerService(this IServiceCollection serviceDescriptors, IConfiguration configuration)
{
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("JT809ProducerConfig"));
serviceDescriptors.Configure<ProducerConfig>(configuration.GetSection("KafkaProducerConfig"));
serviceDescriptors.AddSingleton(typeof(JT809Producer<byte[]>), typeof(JT809_Same_Producer));
serviceDescriptors.AddSingleton(typeof(JT809Producer<JT809GpsPosition>), typeof(JT809_GpsPositio_Producer));
return serviceDescriptors;
@@ -42,7 +42,7 @@ namespace JT809.KafkaService

public static IServiceCollection AddJT809KafkaConsumerService(this IServiceCollection serviceDescriptors, IConfiguration configuration, Action<JT809PartitionOptions> action = null)
{
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("JT809ConsumerConfig"));
serviceDescriptors.Configure<ConsumerConfig>(configuration.GetSection("KafkaConsumerConfig"));
if (configuration.GetSection("JT809PartitionOptions").Exists())
{
serviceDescriptors.Configure<JT809PartitionOptions>(configuration.GetSection("JT809PartitionOptions"));


+ 160
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionConsumer.cs View File

@@ -0,0 +1,160 @@
using Confluent.Kafka;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Logging;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;

namespace JT809.KafkaService
{
public abstract class JT809PartitionConsumer<T> : JT809ConsumerBase<T>
{
private bool _disposed = false;
public override CancellationTokenSource Cts => new CancellationTokenSource();

protected ILogger logger { get; }

private List<TopicPartition> topicPartitionList;

private JT809PartitionOptions partitionOptions;

protected override IList<IConsumer<string, T>> Consumers { get; }

protected JT809PartitionConsumer(
IOptions<ConsumerConfig> consumerConfigAccessor,
IOptions<JT809PartitionOptions> partitionOptionsAccessor,
IOptions<JT809TopicOptions> topicOptionsAccessor,
ILoggerFactory loggerFactory) : base(topicOptionsAccessor.Value.TopicName, consumerConfigAccessor.Value)
{
logger = loggerFactory.CreateLogger("JT809PartitionConsumer");
partitionOptions = partitionOptionsAccessor.Value;
topicPartitionList = CreateTopicPartition();
Consumers = CreateConsumers();
}

protected virtual IList<IConsumer<string, T>> CreateConsumers()
{
List<IConsumer<string, T>> consumers = new List<IConsumer<string, T>>();
foreach (var topicPartition in topicPartitionList)
{
ConsumerBuilder<string, T> consumerBuilder = new ConsumerBuilder<string, T>(ConsumerConfig);
consumerBuilder.SetErrorHandler((consumer, error) =>
{
logger.LogError(error.Reason);
});
if (Deserializer != null)
{
consumerBuilder.SetValueDeserializer(Deserializer);
}
consumerBuilder.SetPartitionsAssignedHandler((c, p) =>
{
p.Add(topicPartition);
});
consumers.Add(consumerBuilder.Build());
}
return consumers;
}

protected virtual List<TopicPartition> CreateTopicPartition()
{
var topicPartitions = new List<TopicPartition>();
if (partitionOptions.AssignPartitions != null && partitionOptions.AssignPartitions.Count > 0)
{
foreach (var p in partitionOptions.AssignPartitions)
{
topicPartitions.Add(new TopicPartition(TopicName, new Partition(p)));
}
}
else
{
for (int i = 0; i < partitionOptions.Partition; i++)
{
topicPartitions.Add(new TopicPartition(TopicName, new Partition(i)));
}
}
return topicPartitions;
}

public override void OnMessage(Action<(string MsgId, T Data)> callback)
{
if(logger.IsEnabled( LogLevel.Debug))
logger.LogDebug($"consumers:{Consumers.Count},topicPartitionList:{topicPartitionList.Count}");
for (int i = 0; i < Consumers.Count; i++)
{
Task.Factory.StartNew((num) =>
{
int n = (int)num;
while (!Cts.IsCancellationRequested)
{
try
{
//如果不指定分区,根据kafka的机制会从多个分区中拉取数据
//如果指定分区,根据kafka的机制会从相应的分区中拉取数据
//consumers[n].Assign(topicPartitionList[n]);
var data = Consumers[n].Consume(Cts.Token);
if (logger.IsEnabled(LogLevel.Debug))
{
logger.LogDebug($"Topic: {data.Topic} Key: {data.Key} Partition: {data.Partition} Offset: {data.Offset} Data:{string.Join("", data.Value)} TopicPartitionOffset:{data.TopicPartitionOffset}");
}
callback((data.Key, data.Value));
}
catch (ConsumeException ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
catch (Exception ex)
{
logger.LogError(ex, TopicName);
Thread.Sleep(1000);
}
}
}, i, Cts.Token);
}
}

public override void Subscribe()
{
if (_disposed) return;
}

public override void Unsubscribe()
{
if (_disposed) return;
foreach (var c in Consumers)
{
c.Unsubscribe();
}
}

public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);

}

~JT809PartitionConsumer()
{
Dispose(false);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Cts.Cancel();
foreach (var c in Consumers)
{
c.Close();
c.Dispose();
}
Cts.Dispose();
}
_disposed = true;
}
}
}

+ 153
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809PartitionProducer.cs View File

@@ -0,0 +1,153 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;

namespace JT809.KafkaService
{
/// <summary>
///
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class JT809PartitionProducer<T> : JT809ProducerBase<T>
{
private bool _disposed = false;

private ConcurrentDictionary<string, TopicPartition> TopicPartitionCache;

private readonly IJT809ProducerPartitionFactory ProducerPartitionFactory;

private readonly JT809PartitionOptions PartitionOptions;

protected override IProducer<string, T> Producer { get; }

protected virtual IProducer<string, T> CreateProducer()
{
ProducerBuilder<string, T> producerBuilder = new ProducerBuilder<string, T>(ProducerConfig);
if (Serializer != null)
{
producerBuilder.SetValueSerializer(Serializer);
}
if (PartitionOptions != null)
{
TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>();
if (PartitionOptions.Partition > 1)
{
using (var adminClient = new AdminClient(Producer.Handle))
{
try
{
adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = TopicName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait();
}
catch (AggregateException ex)
{
//{Confluent.Kafka.Admin.CreateTopicsException: An error occurred creating topics: [jt809]: [Topic 'jt809' already exists.].}
if (ex.InnerException is Confluent.Kafka.Admin.CreateTopicsException exception)
{

}
else
{
//记录日志
//throw ex.InnerException;
}
}
try
{
//topic IncreaseTo 只增不减
adminClient.CreatePartitionsAsync(
new List<PartitionsSpecification>
{
new PartitionsSpecification
{
IncreaseTo = PartitionOptions.Partition,
Topic=TopicName
}
}
).Wait();
}
catch (AggregateException ex)
{
//记录日志
// throw ex.InnerException;
}
}
}
}
return producerBuilder.Build();
}

protected JT809PartitionProducer(
IOptions<JT809TopicOptions> topicOptionAccessor,
ProducerConfig producerConfig,
IJT809ProducerPartitionFactory producerPartitionFactory,
IOptions<JT809PartitionOptions> partitionOptionsAccessor)
: base(topicOptionAccessor.Value.TopicName, producerConfig)
{
PartitionOptions = partitionOptionsAccessor.Value;
ProducerPartitionFactory = producerPartitionFactory;
Producer = CreateProducer();
}

public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
}

protected virtual void Dispose(bool disposing)
{
if (_disposed) return;
if (disposing)
{
Producer.Dispose();
}
_disposed = true;
}

public override async void ProduceAsync(string msgId, string vno_color, T data)
{
if (_disposed) return;
if (PartitionOptions != null)
{
if (PartitionOptions.Partition > 1)
{
if (!TopicPartitionCache.TryGetValue(vno_color, out TopicPartition topicPartition))
{
topicPartition = new TopicPartition(TopicName, new Partition(ProducerPartitionFactory.CreatePartition(TopicName, msgId, vno_color)));
TopicPartitionCache.TryAdd(vno_color, topicPartition);
}
await Producer.ProduceAsync(topicPartition, new Message<string, T>
{
Key = msgId,
Value = data
});
}
else
{
await Producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
}
else
{
await Producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
}

~JT809PartitionProducer()
{
Dispose(false);
}
}
}

+ 21
- 95
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809Producer.cs View File

@@ -1,6 +1,7 @@
using Confluent.Kafka;
using Confluent.Kafka.Admin;
using JT809.PubSub.Abstractions;
using Microsoft.Extensions.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
@@ -11,79 +12,31 @@ namespace JT809.KafkaService
///
/// </summary>
/// <typeparam name="T"></typeparam>
public abstract class JT809Producer<T> : IJT809ProducerOfT<T>
public abstract class JT809Producer<T> : JT809ProducerBase<T>
{
private bool _disposed = false;

public virtual string TopicName => JT809Constants.JT809TopicName;

private ConcurrentDictionary<string, TopicPartition> TopicPartitionCache;

private IProducer<string, T> producer;

protected virtual Serializer<T> Serializer { get; }

protected virtual IJT809ProducerPartitionFactory ProducerPartitionFactory { get; }

protected virtual JT809PartitionOptions PartitionOptions { get; }

protected JT809Producer(ProducerConfig producerConfig)
protected virtual IProducer<string, T> CreateProducer()
{
ProducerBuilder<string, T> producerBuilder= new ProducerBuilder<string, T>(producerConfig);
ProducerBuilder<string, T> producerBuilder = new ProducerBuilder<string, T>(ProducerConfig);
if (Serializer != null)
{
producerBuilder.SetValueSerializer(Serializer);
}
producer = producerBuilder.Build();
if (PartitionOptions != null)
{
TopicPartitionCache = new ConcurrentDictionary<string, TopicPartition>();
if (PartitionOptions.Partition > 1)
{
using (var adminClient = new AdminClient(producer.Handle))
{
try
{
adminClient.CreateTopicsAsync(new TopicSpecification[] { new TopicSpecification { Name = TopicName, NumPartitions = 1, ReplicationFactor = 1 } }).Wait();
}
catch (AggregateException ex)
{
//{Confluent.Kafka.Admin.CreateTopicsException: An error occurred creating topics: [jt809]: [Topic 'jt809' already exists.].}
if (ex.InnerException is Confluent.Kafka.Admin.CreateTopicsException exception)
{
return producerBuilder.Build();
}

}
else
{
//记录日志
//throw ex.InnerException;
}
}
try
{
//topic IncreaseTo 只增不减
adminClient.CreatePartitionsAsync(
new List<PartitionsSpecification>
{
new PartitionsSpecification
{
IncreaseTo = PartitionOptions.Partition,
Topic=TopicName
}
}
).Wait();
}
catch (AggregateException ex)
{
//记录日志
// throw ex.InnerException;
}
}
}
}
protected override IProducer<string, T> Producer { get; }

protected JT809Producer(
IOptions<JT809TopicOptions> topicOptionAccessor,
IOptions<ProducerConfig> producerConfigAccessor)
: base(topicOptionAccessor.Value.TopicName, producerConfigAccessor.Value)
{
Producer = CreateProducer();
}

public void Dispose()
public override void Dispose()
{
Dispose(true);
GC.SuppressFinalize(true);
@@ -95,46 +48,19 @@ namespace JT809.KafkaService
if (_disposed) return;
if (disposing)
{
producer.Dispose();
Producer.Dispose();
}
_disposed = true;
}

public void ProduceAsync(string msgId, string vno_color, T data)
public override async void ProduceAsync(string msgId, string vno_color, T data)
{
if (_disposed) return;
if (PartitionOptions != null)
await Producer.ProduceAsync(TopicName, new Message<string, T>
{
if (PartitionOptions.Partition > 1)
{
if (!TopicPartitionCache.TryGetValue(vno_color, out TopicPartition topicPartition))
{
topicPartition = new TopicPartition(TopicName, new Partition(ProducerPartitionFactory.CreatePartition(TopicName, msgId, vno_color)));
TopicPartitionCache.TryAdd(vno_color, topicPartition);
}
producer.ProduceAsync(topicPartition, new Message<string, T>
{
Key = msgId,
Value = data
});
}
else
{
producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
}
else
{
producer.ProduceAsync(TopicName, new Message<string, T>
{
Key = msgId,
Value = data
});
}
Key = msgId,
Value = data
});
}

~JT809Producer()


+ 24
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809ProducerBase.cs View File

@@ -0,0 +1,24 @@
using Confluent.Kafka;
using JT809.PubSub.Abstractions;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaService
{
public abstract class JT809ProducerBase<T> : IJT809ProducerOfT<T>
{
protected JT809ProducerBase(string topicName,ProducerConfig config)
{
ProducerConfig = config;
TopicName = topicName;
}

public ProducerConfig ProducerConfig { get;}
public string TopicName { get; }
protected abstract IProducer<string, T> Producer { get;}
protected virtual Serializer<T> Serializer { get; set; }
public abstract void Dispose();
public abstract void ProduceAsync(string msgId, string vno_color, T data);
}
}

+ 2
- 18
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPositio_Producer.cs View File

@@ -8,26 +8,10 @@ namespace JT809.KafkaService
{
public sealed class JT809_GpsPositio_Producer : JT809Producer<JT809GpsPosition>
{
protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; }

protected override Serializer<JT809GpsPosition> Serializer => (position) => position.ToByteArray();

protected override JT809PartitionOptions PartitionOptions { get; }

public JT809_GpsPositio_Producer(IOptions<ProducerConfig> producerConfigAccessor)
: this(producerConfigAccessor,null, null )
public JT809_GpsPositio_Producer(IOptions<JT809TopicOptions> topicOptionAccessor, IOptions<ProducerConfig> producerConfigAccessor) : base(topicOptionAccessor, producerConfigAccessor)
{

}

public JT809_GpsPositio_Producer(
IOptions<ProducerConfig> producerConfigAccessor,
IJT809ProducerPartitionFactory partitionFactory,
IOptions<JT809PartitionOptions> partitionAccessor
):base(producerConfigAccessor.Value)
{
ProducerPartitionFactory = partitionFactory;
PartitionOptions = partitionAccessor?.Value;
}
protected override Serializer<JT809GpsPosition> Serializer => (position) => position.ToByteArray();
}
}

+ 4
- 16
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_GpsPosition_Consumer.cs View File

@@ -13,26 +13,14 @@ namespace JT809.KafkaService
{
public sealed class JT809_GpsPosition_Consumer : JT809Consumer<JT809GpsPosition>
{
public JT809_GpsPosition_Consumer(IOptions<JT809TopicOptions> topicOptionsAccessor, IOptions<ConsumerConfig> consumerConfigAccessor, ILoggerFactory loggerFactory) : base(topicOptionsAccessor, consumerConfigAccessor, loggerFactory)
{
}

protected override Deserializer<JT809GpsPosition> Deserializer => (data, isNull) => {
if (isNull) return default;
return new MessageParser<JT809GpsPosition>(() => new JT809GpsPosition())
.ParseFrom(data.ToArray());
};

protected override JT809PartitionOptions PartitionOptions { get; }

public JT809_GpsPosition_Consumer(
IOptions<ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory,
IOptions<JT809PartitionOptions> partitionAccessor
) : base(consumerConfigAccessor.Value, loggerFactory)
{
PartitionOptions = partitionAccessor.Value;
}

public JT809_GpsPosition_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor,ILoggerFactory loggerFactory)
: this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions())
{
}
}
}

+ 2
- 13
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Consumer.cs View File

@@ -13,19 +13,8 @@ namespace JT809.KafkaService
{
public sealed class JT809_Same_Consumer : JT809Consumer<byte[]>
{
protected override JT809PartitionOptions PartitionOptions { get; }

public JT809_Same_Consumer(
IOptions<ConsumerConfig> consumerConfigAccessor,
ILoggerFactory loggerFactory,
IOptions<JT809PartitionOptions> partitionAccessor
) : base(consumerConfigAccessor.Value, loggerFactory)
{
PartitionOptions = partitionAccessor.Value;
}

public JT809_Same_Consumer(IOptions<ConsumerConfig> consumerConfigAccessor,ILoggerFactory loggerFactory)
: this(consumerConfigAccessor,loggerFactory, new JT809PartitionOptions())
public JT809_Same_Consumer(IOptions<JT809TopicOptions> topicOptionsAccessor, IOptions<ConsumerConfig> consumerConfigAccessor, ILoggerFactory loggerFactory)
: base(topicOptionsAccessor, consumerConfigAccessor, loggerFactory)
{
}
}


+ 2
- 17
src/JT809.DotNetty.Simples/Superior/JT809.KafkaService/JT809_Same_Producer.cs View File

@@ -9,24 +9,9 @@ namespace JT809.KafkaService
{
public sealed class JT809_Same_Producer : JT809Producer<byte[]>
{
protected override IJT809ProducerPartitionFactory ProducerPartitionFactory { get; }

protected override JT809PartitionOptions PartitionOptions { get; }

public JT809_Same_Producer(IOptions<ProducerConfig> producerConfigAccessor)
: this(producerConfigAccessor, null, null)
{

}

public JT809_Same_Producer(
IOptions<ProducerConfig> producerConfigAccessor,
IJT809ProducerPartitionFactory partitionFactory,
IOptions<JT809PartitionOptions> partitionAccessor
):base(producerConfigAccessor.Value)
public JT809_Same_Producer(IOptions<JT809TopicOptions> topicOptionAccessor, IOptions<ProducerConfig> producerConfigAccessor)
: base(topicOptionAccessor, producerConfigAccessor)
{
ProducerPartitionFactory = partitionFactory;
PartitionOptions = partitionAccessor?.Value;
}
}
}

+ 42
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTest.cs View File

@@ -0,0 +1,42 @@
using System;
using Xunit;
using Microsoft.Extensions.DependencyInjection;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;
using System.Threading;

namespace JT809.KafkaServiceTest
{
public class ConsumerTest: TestConsumerBase
{
[Fact]
public void Test1()
{
ConsumerTestService producerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>();
producerTestService.GpsConsumer.OnMessage((Message)=>
{
Assert.Equal(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), Message.MsgId);
Assert.Equal("粤A23456", Message.Data.Vno);
Assert.Equal(2, Message.Data.VColor);
Assert.Equal("smallchi", Message.Data.FromChannel);
});
producerTestService.GpsConsumer.Subscribe();

Thread.Sleep(100000);
}

[Fact]
public void Test2()
{
ConsumerTestService producerTestService = ServiceProvider.GetRequiredService<ConsumerTestService>();
producerTestService.SameConsumer.OnMessage((Message) =>
{
Assert.Equal(JT809SubBusinessType.None.ToValueString(), Message.MsgId);
Assert.Equal(new byte[] { 0x01, 0x02, 0x03 }, Message.Data);
});
producerTestService.SameConsumer.Subscribe();

Thread.Sleep(100000);
}
}
}

+ 19
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ConsumerTestService.cs View File

@@ -0,0 +1,19 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaServiceTest
{
public class ConsumerTestService
{
public JT809Consumer<byte[]> SameConsumer { get; }
public JT809Consumer<JT809GpsPosition> GpsConsumer { get; }
public ConsumerTestService(JT809Consumer<byte[]> sameConsumer, JT809Consumer<JT809GpsPosition> gpsConsumer)
{
SameConsumer = sameConsumer;
GpsConsumer = gpsConsumer;
}
}
}

+ 37
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/JT809.KafkaServiceTest.csproj View File

@@ -0,0 +1,37 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.2</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="JT809" Version="1.2.1" />
<PackageReference Include="Microsoft.Extensions.Configuration.FileExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.DependencyInjection" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Logging.Debug" Version="2.2.0" />
<PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="2.2.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.0.1" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1">
<PrivateAssets>all</PrivateAssets>
<IncludeAssets>runtime; build; native; contentfiles; analyzers; buildtransitive</IncludeAssets>
</PackageReference>
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\JT809.KafkaService\JT809.KafkaService.csproj" />
</ItemGroup>

<ItemGroup>
<None Update="appsettings.Partition.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
<None Update="appsettings.json">
<CopyToOutputDirectory>Always</CopyToOutputDirectory>
</None>
</ItemGroup>

</Project>

+ 38
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerPartitionTest.cs View File

@@ -0,0 +1,38 @@
using System;
using Xunit;
using Microsoft.Extensions.DependencyInjection;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;

namespace JT809.KafkaServiceTest
{
public class ProducerPartitionTest : TestProducerBase
{
[Fact]
public void Test1()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition
{
Vno= "粤A23456",
VColor=2,
GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
FromChannel="smallchi"
});
}

[Fact]
public void Test2()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 });
}

[Fact]
public void Test3()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 });
}
}
}

+ 31
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTest.cs View File

@@ -0,0 +1,31 @@
using System;
using Xunit;
using Microsoft.Extensions.DependencyInjection;
using JT809.Protocol.Enums;
using JT809.Protocol.Extensions;

namespace JT809.KafkaServiceTest
{
public class ProducerTest: TestProducerBase
{
[Fact]
public void Test1()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.GpsProducer.ProduceAsync(JT809SubBusinessType.实时上传车辆定位信息.ToValueString(), "粤A23456_2", new GrpcProtos.JT809GpsPosition
{
Vno= "粤A23456",
VColor=2,
GpsTime= (DateTime.Now.ToUniversalTime().Ticks - 621355968000000000) / 10000000,
FromChannel="smallchi"
});
}

[Fact]
public void Test2()
{
ProducerTestService producerTestService = ServiceProvider.GetRequiredService<ProducerTestService>();
producerTestService.SameProducer.ProduceAsync(JT809SubBusinessType.None.ToValueString(), "粤A23457_2", new byte[] { 0x01, 0x02, 0x03 });
}
}
}

+ 20
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/ProducerTestService.cs View File

@@ -0,0 +1,20 @@
using JT809.GrpcProtos;
using JT809.KafkaService;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.KafkaServiceTest
{
public class ProducerTestService
{
public JT809Producer<byte[]> SameProducer { get; }
public JT809Producer<JT809GpsPosition> GpsProducer { get; }
public ProducerTestService(JT809Producer<byte[]> sameProducer, JT809Producer<JT809GpsPosition> gpsProducer)
{
SameProducer = sameProducer;
GpsProducer = gpsProducer;
}

}
}

+ 37
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestConsumerBase.cs View File

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using JT809.KafkaService;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace JT809.KafkaServiceTest
{
public class TestConsumerBase
{
public IServiceProvider ServiceProvider { get; }

public IConfigurationRoot ConfigurationRoot { get; }
public TestConsumerBase()
{
var builder = new ConfigurationBuilder();
builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
builder.AddJsonFile("appsettings.json");
ConfigurationRoot = builder.Build();

ServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddLogging(configure =>
{
configure.AddDebug();
configure.SetMinimumLevel(LogLevel.Trace);
});
serviceDescriptors.AddJT809KafkaConsumerService(ConfigurationRoot);
serviceDescriptors.AddSingleton<ConsumerTestService>();
ServiceProvider = serviceDescriptors.BuildServiceProvider();
}
}
}

+ 37
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerBase.cs View File

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using JT809.KafkaService;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace JT809.KafkaServiceTest
{
public class TestProducerBase
{
public IServiceProvider ServiceProvider { get; }

public IConfigurationRoot ConfigurationRoot { get; }
public TestProducerBase()
{
var builder = new ConfigurationBuilder();
builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
builder.AddJsonFile("appsettings.json");
ConfigurationRoot = builder.Build();

ServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddLogging(configure =>
{
configure.AddDebug();
configure.SetMinimumLevel(LogLevel.Trace);
});
serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot);
serviceDescriptors.AddSingleton<ProducerTestService>();
ServiceProvider = serviceDescriptors.BuildServiceProvider();
}
}
}

+ 37
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/TestProducerPartitionBase.cs View File

@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Text;
using JT809.KafkaService;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;

namespace JT809.KafkaServiceTest
{
public class TestProducerPartitionBase
{
public IServiceProvider ServiceProvider { get; }

public IConfigurationRoot ConfigurationRoot { get; }
public TestProducerPartitionBase()
{
var builder = new ConfigurationBuilder();
builder.SetBasePath(AppDomain.CurrentDomain.BaseDirectory);
builder.AddJsonFile("appsettings.Partition.json");
ConfigurationRoot = builder.Build();

ServiceCollection serviceDescriptors = new ServiceCollection();
serviceDescriptors.AddSingleton<ILoggerFactory, LoggerFactory>();
serviceDescriptors.AddSingleton(typeof(ILogger<>), typeof(Logger<>));
serviceDescriptors.AddLogging(configure =>
{
configure.AddDebug();
configure.SetMinimumLevel(LogLevel.Trace);
});
serviceDescriptors.AddJT809KafkaProducerService(ConfigurationRoot);
serviceDescriptors.AddSingleton<ProducerTestService>();
ServiceProvider = serviceDescriptors.BuildServiceProvider();
}
}
}

+ 23
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.Partition.json View File

@@ -0,0 +1,23 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
},
"KafkaProducerConfig": {
"BootstrapServers": "127.0.0.1:9092"
},
"KafkaConsumerConfig": {
"BootstrapServers": "127.0.0.1:9092",
"EnableAutoCommit": true,
"GroupId": "JT809.Gps.Test"
}
}

+ 23
- 0
src/JT809.DotNetty.Simples/Superior/JT809.KafkaServiceTest/appsettings.json View File

@@ -0,0 +1,23 @@
{
"Logging": {
"IncludeScopes": false,
"Debug": {
"LogLevel": {
"Default": "Trace"
}
},
"Console": {
"LogLevel": {
"Default": "Trace"
}
}
},
"KafkaProducerConfig": {
"BootstrapServers": "127.0.0.1:9092"
},
"KafkaConsumerConfig": {
"BootstrapServers": "127.0.0.1:9092",
"EnableAutoCommit": true,
"GroupId": "JT809.Gps.Test"
}
}

+ 1
- 1
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/IJT809Consumer.cs View File

@@ -11,7 +11,7 @@ namespace JT809.PubSub.Abstractions
}
public interface IJT808ConsumerOfT<T> :IDisposable
{
void OnMessage(Action<(string MsgId, T data)> callback);
void OnMessage(Action<(string MsgId, T Data)> callback);
CancellationTokenSource Cts { get; }
void Subscribe();
void Unsubscribe();


+ 14
- 0
src/JT809.DotNetty.Simples/Superior/JT809.PubSub.Abstractions/JT809TopicOptions.cs View File

@@ -0,0 +1,14 @@
using Microsoft.Extensions.Options;
using System;
using System.Collections.Generic;
using System.Text;

namespace JT809.PubSub.Abstractions
{
public class JT809TopicOptions:IOptions<JT809TopicOptions>
{
public string TopicName { get; set; } = JT809Constants.JT809TopicName;

public JT809TopicOptions Value => this;
}
}

+ 7
- 0
src/JT809.DotNetty.sln View File

@@ -25,6 +25,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.GrpcProtos", "JT809.D
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "JT809.KafkaService", "JT809.DotNetty.Simples\Superior\JT809.KafkaService\JT809.KafkaService.csproj", "{8119D905-241F-4EFF-B300-1FB474B8C665}"
EndProject
Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "JT809.KafkaServiceTest", "JT809.DotNetty.Simples\Superior\JT809.KafkaServiceTest\JT809.KafkaServiceTest.csproj", "{22F008D5-61F8-4889-80DB-91B37591322F}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -59,6 +61,10 @@ Global
{8119D905-241F-4EFF-B300-1FB474B8C665}.Debug|Any CPU.Build.0 = Debug|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.ActiveCfg = Release|Any CPU
{8119D905-241F-4EFF-B300-1FB474B8C665}.Release|Any CPU.Build.0 = Release|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Debug|Any CPU.Build.0 = Debug|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.ActiveCfg = Release|Any CPU
{22F008D5-61F8-4889-80DB-91B37591322F}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -70,6 +76,7 @@ Global
{5F8AFD67-FDA8-40A6-A655-FD855E2CCF26} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{D64F2F77-DC0C-4120-80DA-45012A794CDF} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{8119D905-241F-4EFF-B300-1FB474B8C665} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
{22F008D5-61F8-4889-80DB-91B37591322F} = {E9DC871D-EFCE-4D53-A5B5-8A88D2D52EA4}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {0FC2A52E-3B7A-4485-9C3B-9080C825419D}


Loading…
Cancel
Save